API
This page is a dump of all the docstrings found in the code. This will be cleaned up in the future and organized to be more accessible.
Mercury.LoopRateLimiter — TypeLoopRateLimiterRuns a loop at a fixed rate. Works best for loops where the runtime is approximately the same every iteration. The loop runtime is kept approximately constant by sleeping for any time not used by the core computation. This is useful for situations where the computation should take place a regular, predictable intervals.
To achieve better accuracy, the rate limiter records the error between the expected runtime and actual runtime every N_batch iterations, and adjusts the sleep time by the average. Unlike the standard sleep function in Julia, this limiter has a minimum sleep time of 1 microsecond, and rates above 1000Hz can be achieved with moderate accuracy.
Example
lrl = LoopRateLimiter(100) # Hz
reset!(lrl)
for i = 1:100
startloop(lrl) # start timing the loop
myexpensivefunction() # execute the core body of the loop
sleep(lrl) # sleep for the rest of the time
endMercury.Node — TypeNodeA independent process that communicates with other processes via pub/sub ZMQ channels. The process is assumed to run indefinately.
Defining a new Node
Each node should contain a NodeData element, which stores a list of the publishers and subscribers and some other associated data.
The publisher and subscribers for the node should be "registered" with the NodeData using the add_publisher! and add_subscriber! methods. This allows the subscribers to be automatically launched as separate tasks when launching the nodes.
The constructor for the node should initialize any variables and register the needed publishers and subscribers with NodeData.
Each loop of the process will call the compute method once, which needs to be implemented by the user. A lock for each subscriber task is created in NodeData.sub_locks. It's recommended that the user obtains the lock and copies the data into a local variable for internal use by the compute function.
Launching the node
The blocking process that runs the node indefinately is called via run(node). It's recommended that this is launched asynchronously via
node_task = @task run(node)
schedule(node_task)Mercury.NodeIO — TypeNodeIODescribes the input/output mechanisms for the node. Each node should store this type internally and add the necessary I/O mechanisms inside of the setupIO!(::NodeIO, ::Node) method.
I/O mechanisms are added to a NodeIO object via add_publisher! and add_subscriber!.
Mercury.PublishedMessage — TypeSpecifies a publisher along with specific message type. This is useful for tracking multiple messages at once
Mercury.SubscribedMessage — TypeSpecifies a subcriber along with specific message type. This is useful for tracking multiple messages at once
Mercury.SubscriberFlags — TypeSubscriberFlagsSome useful flags when dealing with subscribers. Describes the state of the system. Particularly helpful when the subscriber is actively receiving messages in another thread and you want to query the state of the subscriber.
Mercury.ZmqPublisher — TypePublisherA simple wrapper around a ZMQ publisher, but only publishes protobuf messages.
Construction
Publisher(context::ZMQ.Context, ipaddr, port; name)To create a publisher, pass in a ZMQ.Context, which allows all related publisher / subscribers to be collected in a "group." The publisher also needs to be provided the IPv4 address (either as a string or as a Sockets.IPv4 object), and the port (either as an integer or a string).
A name can also be optionally provided via the name keyword, which can be used to provide a helpful description about what the publisher is publishing. It defaults to "publisher_#" where # is an increasing index.
If the port
Usage
To publish a message, just use the publish method on a protobuf type:
publish(pub::Publisher, proto_msg::ProtoBuf.ProtoType)Mercury.ZmqSubscriber — TypeZmqSubscriberA simple wrapper around a ZMQ subscriber, but only for protobuf messages.
Construction
Subscriber(context::ZMQ.Context, ipaddr, port; name)To create a subscriber, pass in a ZMQ.Context, which allows all related publisher / subscribers to be collected in a "group." The subscriber also needs to be provided the IPv4 address (either as a string or as a Sockets.IPv4 object), and the port (either as an integer or a string).
A name can also be optionally provided via the name keyword, which can be used to provide a helpful description about what the subscriber is subscribing to. It defaults to "subscriber_#" where # is an increasing index.
Usage
Use the blocking subscribe method to continually listen to the socket and store data in a protobuf type:
subscribe(sub::Subscriber, proto_msg::ProtoBuf.ProtoType)Note that this function contains an infinite while loop so will block the calling thread indefinately. It's usually best to assign the process to a separate thread / task:
sub_task = @task subscribe(sub, proto_msg)
schedule(sub_task)Base.sleep — Methodsleep(::LoopRateLimiter)Sleep the OS for the amount of time needed to achieve the rate specified by the loop rate limiter. Has a minimum sleep time of 1 microsecond (relies on the usleep C function).
Mercury.add_publisher! — Methodadd_publisher!(nodeIO, msg, args...)Adds / registers a publisher to nodeIO. This method should only be called once per unique message, across all nodes in the network, since each message should only ever have one publisher. The msg can be any ProtoBuf.ProtoType message (usually generated using ProtoBuf.protoc). Since this is stored as an abstract ProtoBuf.ProtoType type internally, the user should store the original type inside the node. The remaining arguments are passed directly to the constructor for Publisher.
This function adds a new PublishedMessage to nodeIO.pubs. During the compute method, the user should modify the original concrete msg stored in the node. The data can then be published by calling publish on the corresponding PublishedMessage.
Example
Inside of the node constructor:
...
test_msg = TestMsg(x = 1, y = 2, z= 3)
...Inside of setupIO!:
...
ctx = ZMQ.Context()
ipaddr = ip"127.0.0.1"
port = 5001
add_publisher!(nodeIO, node.test_msg, ctx, ipaddr, port, name="TestMsg_publisher")
...Inside of compute:
...
node.test_msg.x = 1 # modify the message as desired
publish(getIO(node).pubs[1]) # or whichever is the correct index
...Mercury.add_subscriber! — Methodadd_subscriber!(nodeIO, msg, args...)Adds / registers a subscriber to nodeIO. The msg can be any ProtoBuf.ProtoType message (usually generated using ProtoBuf.protoc). Since this is stored as an abstract ProtoBuf.ProtoType type internally, the user should store the original type inside the node. The remaining arguments are passed directly to the constructor for Subscriber.
This function adds a new SubscribedMessage to nodeIO.subs. A separate asynchronous task is created for each subscriber when the node is launched. During the compute method, the user can access the latest data by reading from the message stored in their node. To avoid data races and minimize synchronization, it's usually best practice to obtain the lock on the message (stored in SubscribedMessage) and copy the data to a local variable (likely also stored in the node) that can be used by the rest of the compute method without worrying about the data being overwritted by the ongoing subscriber task.
Example
In the node constructor:
...
test_msg = TestMessage(x = 0, y = 0, z = 0)
...In setupIO!:
...
ctx = ZMQ.Context()
ipaddr = ip"127.0.0.1"
port = 5001
add_subscriber!(nodeIO, node.test_msg, ctx, ipaddr, port, name = "TestMsg_subscriber")
...In compute:
...
testmsg = getIO(node).subs[1] # or whichever is the correct index
lock(testmsg.lock) do
node.local_test_msg = node.test_msg # or convert to a different type
end
# use node.local_test_msg in the rest of the code
...Mercury.decode! — MethodRead in the byte data into the message container buf. Returns the number of bytes read
Mercury.encode! — MethodWrite out the byte data into the message container buf. Returns the number of bytes written
Mercury.getpublisher — Functiongetpublisher(node, index)
getpublisher(node, name)Get a PublishedMessage attached to node, either by it's integer index or it's name.
Mercury.getsubscriber — Functiongetsubscriber(node, index)
getsubscriber(node, name)Get a SubscribedMessage attached to node, either by it's integer index or it's name.
Mercury.got_new! — MethodSet the has received flag on subscriber to true
Mercury.launch — Methodlaunch(node)Run the main loop of the node indefinately. This method automatically sets up any necessary subscriber tasks and then calls the compute method at a fixed rate.
This method should typically be wrapped in an @async or @spawn call.
Mercury.numpublishers — Functionnumpublishers(node)Get the number of ZMQ publishers attached to the node
Mercury.numsubscribers — Functionnumsubscribers(node)Get the number of ZMQ subscribers attached to the node
Mercury.on_new — Methodon_new(func::Function, submsg::SubscribedMessage)Helpful function for executing code blocks when a SubscribedMessage type has recieved a new message on its Subscriber's Socket. The function func is expected to have a signature of func(msg::ProtoBuf.ProtoType) where msg is the message which submsg has recieved.
Example:
on_new(nodeio.subs[1]) do msg
println(msg.pos_x)
endMercury.publish_until_receive — Functionpublish_until_receive(pub, sub, msg_out; [timeout])Publish a message via the publisher pub until it's received by the subscriber sub. Both pub and sub should have the same port and IP address.
The function returns true if a message was received before timeout seconds have passed, and false otherwise.
Mercury.read_new! — MethodSet the has received flag on subscriber to false
Mercury.receive — MethodReceive function, is expected to return true if a message was received, false otherwise
Mercury.reset! — Methodreset!(::LoopRateLimiter)Reset the loop rate limiter before a loop. Not necessary if the object is created directly before calling the loop.
Mercury.startloop — Methodstartloop(::LoopRateLimiter)Call this function at the beginning of a loop body to start timing the loop.
Mercury.usleep — Methodusleep(us)Sleep for us microseconds. A wrapper around the C usleep function in unistd.h.
Mercury.@rate — Macro@rateRun a loop at a fixed rate, specified either by an integer literal or a LoopRateLimiter object. It will run the loop so that it executes close to rate iterations per second.
Examples
@rate for i = 1:100
myexpensivefunction()
end 200#Hzlr = LoopRateLimiter(200, N_batch=10)
@rate while i < 100
myexpensivefunction()
i += 1
end lrNote that the following does NOT work:
rate = 100
@rate for i = 1:100
myexpensivefunction()
end rateSince Julia macros dispatch on the compile-time types instead of the run-time types.