API
This page is a dump of all the docstrings found in the code. This will be cleaned up in the future and organized to be more accessible.
Mercury.LoopRateLimiter
— TypeLoopRateLimiter
Runs a loop at a fixed rate. Works best for loops where the runtime is approximately the same every iteration. The loop runtime is kept approximately constant by sleeping for any time not used by the core computation. This is useful for situations where the computation should take place a regular, predictable intervals.
To achieve better accuracy, the rate limiter records the error between the expected runtime and actual runtime every N_batch
iterations, and adjusts the sleep time by the average. Unlike the standard sleep function in Julia, this limiter has a minimum sleep time of 1 microsecond, and rates above 1000Hz can be achieved with moderate accuracy.
Example
lrl = LoopRateLimiter(100) # Hz
reset!(lrl)
for i = 1:100
startloop(lrl) # start timing the loop
myexpensivefunction() # execute the core body of the loop
sleep(lrl) # sleep for the rest of the time
end
Mercury.Node
— TypeNode
A independent process that communicates with other processes via pub/sub ZMQ channels. The process is assumed to run indefinately.
Defining a new Node
Each node should contain a NodeData
element, which stores a list of the publishers and subscribers and some other associated data.
The publisher and subscribers for the node should be "registered" with the NodeData
using the add_publisher!
and add_subscriber!
methods. This allows the subscribers to be automatically launched as separate tasks when launching the nodes.
The constructor for the node should initialize any variables and register the needed publishers and subscribers with NodeData
.
Each loop of the process will call the compute
method once, which needs to be implemented by the user. A lock for each subscriber task is created in NodeData.sub_locks
. It's recommended that the user obtains the lock and copies the data into a local variable for internal use by the compute
function.
Launching the node
The blocking process that runs the node indefinately is called via run(node)
. It's recommended that this is launched asynchronously via
node_task = @task run(node)
schedule(node_task)
Mercury.NodeIO
— TypeNodeIO
Describes the input/output mechanisms for the node. Each node should store this type internally and add the necessary I/O mechanisms inside of the setupIO!(::NodeIO, ::Node)
method.
I/O mechanisms are added to a NodeIO
object via add_publisher!
and add_subscriber!
.
Mercury.PublishedMessage
— TypeSpecifies a publisher along with specific message type. This is useful for tracking multiple messages at once
Mercury.SubscribedMessage
— TypeSpecifies a subcriber along with specific message type. This is useful for tracking multiple messages at once
Mercury.SubscriberFlags
— TypeSubscriberFlags
Some useful flags when dealing with subscribers. Describes the state of the system. Particularly helpful when the subscriber is actively receiving messages in another thread and you want to query the state of the subscriber.
Mercury.ZmqPublisher
— TypePublisher
A simple wrapper around a ZMQ publisher, but only publishes protobuf messages.
Construction
Publisher(context::ZMQ.Context, ipaddr, port; name)
To create a publisher, pass in a ZMQ.Context
, which allows all related publisher / subscribers to be collected in a "group." The publisher also needs to be provided the IPv4 address (either as a string or as a Sockets.IPv4
object), and the port (either as an integer or a string).
A name can also be optionally provided via the name
keyword, which can be used to provide a helpful description about what the publisher is publishing. It defaults to "publisher_#" where #
is an increasing index.
If the port
Usage
To publish a message, just use the publish
method on a protobuf type:
publish(pub::Publisher, proto_msg::ProtoBuf.ProtoType)
Mercury.ZmqSubscriber
— TypeZmqSubscriber
A simple wrapper around a ZMQ subscriber, but only for protobuf messages.
Construction
Subscriber(context::ZMQ.Context, ipaddr, port; name)
To create a subscriber, pass in a ZMQ.Context
, which allows all related publisher / subscribers to be collected in a "group." The subscriber also needs to be provided the IPv4 address (either as a string or as a Sockets.IPv4
object), and the port (either as an integer or a string).
A name can also be optionally provided via the name
keyword, which can be used to provide a helpful description about what the subscriber is subscribing to. It defaults to "subscriber_#" where #
is an increasing index.
Usage
Use the blocking subscribe
method to continually listen to the socket and store data in a protobuf type:
subscribe(sub::Subscriber, proto_msg::ProtoBuf.ProtoType)
Note that this function contains an infinite while loop so will block the calling thread indefinately. It's usually best to assign the process to a separate thread / task:
sub_task = @task subscribe(sub, proto_msg)
schedule(sub_task)
Base.sleep
— Methodsleep(::LoopRateLimiter)
Sleep the OS for the amount of time needed to achieve the rate specified by the loop rate limiter. Has a minimum sleep time of 1 microsecond (relies on the usleep
C function).
Mercury.add_publisher!
— Methodadd_publisher!(nodeIO, msg, args...)
Adds / registers a publisher to nodeIO
. This method should only be called once per unique message, across all nodes in the network, since each message should only ever have one publisher. The msg
can be any ProtoBuf.ProtoType
message (usually generated using ProtoBuf.protoc
). Since this is stored as an abstract ProtoBuf.ProtoType
type internally, the user should store the original type inside the node. The remaining arguments are passed directly to the constructor for Publisher
.
This function adds a new PublishedMessage
to nodeIO.pubs
. During the compute
method, the user should modify the original concrete msg
stored in the node. The data can then be published by calling publish
on the corresponding PublishedMessage
.
Example
Inside of the node constructor:
...
test_msg = TestMsg(x = 1, y = 2, z= 3)
...
Inside of setupIO!
:
...
ctx = ZMQ.Context()
ipaddr = ip"127.0.0.1"
port = 5001
add_publisher!(nodeIO, node.test_msg, ctx, ipaddr, port, name="TestMsg_publisher")
...
Inside of compute
:
...
node.test_msg.x = 1 # modify the message as desired
publish(getIO(node).pubs[1]) # or whichever is the correct index
...
Mercury.add_subscriber!
— Methodadd_subscriber!(nodeIO, msg, args...)
Adds / registers a subscriber to nodeIO
. The msg
can be any ProtoBuf.ProtoType
message (usually generated using ProtoBuf.protoc
). Since this is stored as an abstract ProtoBuf.ProtoType
type internally, the user should store the original type inside the node. The remaining arguments are passed directly to the constructor for Subscriber
.
This function adds a new SubscribedMessage
to nodeIO.subs
. A separate asynchronous task is created for each subscriber when the node is launched. During the compute
method, the user can access the latest data by reading from the message stored in their node. To avoid data races and minimize synchronization, it's usually best practice to obtain the lock on the message (stored in SubscribedMessage
) and copy the data to a local variable (likely also stored in the node) that can be used by the rest of the compute
method without worrying about the data being overwritted by the ongoing subscriber task.
Example
In the node constructor:
...
test_msg = TestMessage(x = 0, y = 0, z = 0)
...
In setupIO!
:
...
ctx = ZMQ.Context()
ipaddr = ip"127.0.0.1"
port = 5001
add_subscriber!(nodeIO, node.test_msg, ctx, ipaddr, port, name = "TestMsg_subscriber")
...
In compute
:
...
testmsg = getIO(node).subs[1] # or whichever is the correct index
lock(testmsg.lock) do
node.local_test_msg = node.test_msg # or convert to a different type
end
# use node.local_test_msg in the rest of the code
...
Mercury.decode!
— MethodRead in the byte data into the message container buf. Returns the number of bytes read
Mercury.encode!
— MethodWrite out the byte data into the message container buf. Returns the number of bytes written
Mercury.getpublisher
— Functiongetpublisher(node, index)
getpublisher(node, name)
Get a PublishedMessage
attached to node
, either by it's integer index or it's name.
Mercury.getsubscriber
— Functiongetsubscriber(node, index)
getsubscriber(node, name)
Get a SubscribedMessage
attached to node
, either by it's integer index or it's name.
Mercury.got_new!
— MethodSet the has received flag on subscriber to true
Mercury.launch
— Methodlaunch(node)
Run the main loop of the node indefinately. This method automatically sets up any necessary subscriber tasks and then calls the compute
method at a fixed rate.
This method should typically be wrapped in an @async
or @spawn
call.
Mercury.numpublishers
— Functionnumpublishers(node)
Get the number of ZMQ publishers attached to the node
Mercury.numsubscribers
— Functionnumsubscribers(node)
Get the number of ZMQ subscribers attached to the node
Mercury.on_new
— Methodon_new(func::Function, submsg::SubscribedMessage)
Helpful function for executing code blocks when a SubscribedMessage type has recieved a new message on its Subscriber's Socket. The function func is expected to have a signature of func(msg::ProtoBuf.ProtoType)
where msg is the message which submsg
has recieved.
Example:
on_new(nodeio.subs[1]) do msg
println(msg.pos_x)
end
Mercury.publish_until_receive
— Functionpublish_until_receive(pub, sub, msg_out; [timeout])
Publish a message via the publisher pub
until it's received by the subscriber sub
. Both pub
and sub
should have the same port and IP address.
The function returns true
if a message was received before timeout
seconds have passed, and false
otherwise.
Mercury.read_new!
— MethodSet the has received flag on subscriber to false
Mercury.receive
— MethodReceive function, is expected to return true if a message was received, false otherwise
Mercury.reset!
— Methodreset!(::LoopRateLimiter)
Reset the loop rate limiter before a loop. Not necessary if the object is created directly before calling the loop.
Mercury.startloop
— Methodstartloop(::LoopRateLimiter)
Call this function at the beginning of a loop body to start timing the loop.
Mercury.usleep
— Methodusleep(us)
Sleep for us
microseconds. A wrapper around the C usleep
function in unistd.h
.
Mercury.@rate
— Macro@rate
Run a loop at a fixed rate, specified either by an integer literal or a LoopRateLimiter
object. It will run the loop so that it executes close to rate
iterations per second.
Examples
@rate for i = 1:100
myexpensivefunction()
end 200#Hz
lr = LoopRateLimiter(200, N_batch=10)
@rate while i < 100
myexpensivefunction()
i += 1
end lr
Note that the following does NOT work:
rate = 100
@rate for i = 1:100
myexpensivefunction()
end rate
Since Julia macros dispatch on the compile-time types instead of the run-time types.