API

This page is a dump of all the docstrings found in the code. This will be cleaned up in the future and organized to be more accessible.

Mercury.LoopRateLimiterType
LoopRateLimiter

Runs a loop at a fixed rate. Works best for loops where the runtime is approximately the same every iteration. The loop runtime is kept approximately constant by sleeping for any time not used by the core computation. This is useful for situations where the computation should take place a regular, predictable intervals.

To achieve better accuracy, the rate limiter records the error between the expected runtime and actual runtime every N_batch iterations, and adjusts the sleep time by the average. Unlike the standard sleep function in Julia, this limiter has a minimum sleep time of 1 microsecond, and rates above 1000Hz can be achieved with moderate accuracy.

Example

lrl = LoopRateLimiter(100)  # Hz
reset!(lrl)
for i = 1:100
    startloop(lrl)          # start timing the loop
    myexpensivefunction()   # execute the core body of the loop
    sleep(lrl)              # sleep for the rest of the time
end
source
Mercury.NodeType
Node

A independent process that communicates with other processes via pub/sub ZMQ channels. The process is assumed to run indefinately.

Defining a new Node

Each node should contain a NodeData element, which stores a list of the publishers and subscribers and some other associated data.

The publisher and subscribers for the node should be "registered" with the NodeData using the add_publisher! and add_subscriber! methods. This allows the subscribers to be automatically launched as separate tasks when launching the nodes.

The constructor for the node should initialize any variables and register the needed publishers and subscribers with NodeData.

Each loop of the process will call the compute method once, which needs to be implemented by the user. A lock for each subscriber task is created in NodeData.sub_locks. It's recommended that the user obtains the lock and copies the data into a local variable for internal use by the compute function.

Launching the node

The blocking process that runs the node indefinately is called via run(node). It's recommended that this is launched asynchronously via

node_task = @task run(node)
schedule(node_task)
source
Mercury.NodeIOType
NodeIO

Describes the input/output mechanisms for the node. Each node should store this type internally and add the necessary I/O mechanisms inside of the setupIO!(::NodeIO, ::Node) method.

I/O mechanisms are added to a NodeIO object via add_publisher! and add_subscriber!.

source
Mercury.SubscriberFlagsType
SubscriberFlags

Some useful flags when dealing with subscribers. Describes the state of the system. Particularly helpful when the subscriber is actively receiving messages in another thread and you want to query the state of the subscriber.

source
Mercury.ZmqPublisherType
Publisher

A simple wrapper around a ZMQ publisher, but only publishes protobuf messages.

Construction

Publisher(context::ZMQ.Context, ipaddr, port; name)

To create a publisher, pass in a ZMQ.Context, which allows all related publisher / subscribers to be collected in a "group." The publisher also needs to be provided the IPv4 address (either as a string or as a Sockets.IPv4 object), and the port (either as an integer or a string).

A name can also be optionally provided via the name keyword, which can be used to provide a helpful description about what the publisher is publishing. It defaults to "publisher_#" where # is an increasing index.

If the port

Usage

To publish a message, just use the publish method on a protobuf type:

publish(pub::Publisher, proto_msg::ProtoBuf.ProtoType)
source
Mercury.ZmqSubscriberType
ZmqSubscriber

A simple wrapper around a ZMQ subscriber, but only for protobuf messages.

Construction

Subscriber(context::ZMQ.Context, ipaddr, port; name)

To create a subscriber, pass in a ZMQ.Context, which allows all related publisher / subscribers to be collected in a "group." The subscriber also needs to be provided the IPv4 address (either as a string or as a Sockets.IPv4 object), and the port (either as an integer or a string).

A name can also be optionally provided via the name keyword, which can be used to provide a helpful description about what the subscriber is subscribing to. It defaults to "subscriber_#" where # is an increasing index.

Usage

Use the blocking subscribe method to continually listen to the socket and store data in a protobuf type:

subscribe(sub::Subscriber, proto_msg::ProtoBuf.ProtoType)

Note that this function contains an infinite while loop so will block the calling thread indefinately. It's usually best to assign the process to a separate thread / task:

sub_task = @task subscribe(sub, proto_msg)
schedule(sub_task)
source
Base.sleepMethod
sleep(::LoopRateLimiter)

Sleep the OS for the amount of time needed to achieve the rate specified by the loop rate limiter. Has a minimum sleep time of 1 microsecond (relies on the usleep C function).

source
Mercury.add_publisher!Method
add_publisher!(nodeIO, msg, args...)

Adds / registers a publisher to nodeIO. This method should only be called once per unique message, across all nodes in the network, since each message should only ever have one publisher. The msg can be any ProtoBuf.ProtoType message (usually generated using ProtoBuf.protoc). Since this is stored as an abstract ProtoBuf.ProtoType type internally, the user should store the original type inside the node. The remaining arguments are passed directly to the constructor for Publisher.

This function adds a new PublishedMessage to nodeIO.pubs. During the compute method, the user should modify the original concrete msg stored in the node. The data can then be published by calling publish on the corresponding PublishedMessage.

Example

Inside of the node constructor:

...
test_msg = TestMsg(x = 1, y = 2, z= 3)
...

Inside of setupIO!:

...
ctx = ZMQ.Context()
ipaddr = ip"127.0.0.1"
port = 5001
add_publisher!(nodeIO, node.test_msg, ctx, ipaddr, port, name="TestMsg_publisher")
...

Inside of compute:

...
node.test_msg.x = 1  # modify the message as desired
publish(getIO(node).pubs[1])  # or whichever is the correct index
...
source
Mercury.add_subscriber!Method
add_subscriber!(nodeIO, msg, args...)

Adds / registers a subscriber to nodeIO. The msg can be any ProtoBuf.ProtoType message (usually generated using ProtoBuf.protoc). Since this is stored as an abstract ProtoBuf.ProtoType type internally, the user should store the original type inside the node. The remaining arguments are passed directly to the constructor for Subscriber.

This function adds a new SubscribedMessage to nodeIO.subs. A separate asynchronous task is created for each subscriber when the node is launched. During the compute method, the user can access the latest data by reading from the message stored in their node. To avoid data races and minimize synchronization, it's usually best practice to obtain the lock on the message (stored in SubscribedMessage) and copy the data to a local variable (likely also stored in the node) that can be used by the rest of the compute method without worrying about the data being overwritted by the ongoing subscriber task.

Example

In the node constructor:

...
test_msg = TestMessage(x = 0, y = 0, z = 0)
...

In setupIO!:

...
ctx = ZMQ.Context()
ipaddr = ip"127.0.0.1"
port = 5001
add_subscriber!(nodeIO, node.test_msg, ctx, ipaddr, port, name = "TestMsg_subscriber")
...

In compute:

...
testmsg = getIO(node).subs[1]  # or whichever is the correct index
lock(testmsg.lock) do
    node.local_test_msg = node.test_msg  # or convert to a different type
end
# use node.local_test_msg in the rest of the code
...
source
Mercury.decode!Method

Read in the byte data into the message container buf. Returns the number of bytes read

source
Mercury.encode!Method

Write out the byte data into the message container buf. Returns the number of bytes written

source
Mercury.launchMethod
launch(node)

Run the main loop of the node indefinately. This method automatically sets up any necessary subscriber tasks and then calls the compute method at a fixed rate.

This method should typically be wrapped in an @async or @spawn call.

source
Mercury.on_newMethod
on_new(func::Function, submsg::SubscribedMessage)

Helpful function for executing code blocks when a SubscribedMessage type has recieved a new message on its Subscriber's Socket. The function func is expected to have a signature of func(msg::ProtoBuf.ProtoType) where msg is the message which submsg has recieved.

Example:

on_new(nodeio.subs[1]) do msg
    println(msg.pos_x)
end
source
Mercury.publish_until_receiveFunction
publish_until_receive(pub, sub, msg_out; [timeout])

Publish a message via the publisher pub until it's received by the subscriber sub. Both pub and sub should have the same port and IP address.

The function returns true if a message was received before timeout seconds have passed, and false otherwise.

source
Mercury.receiveMethod

Receive function, is expected to return true if a message was received, false otherwise

source
Mercury.reset!Method
reset!(::LoopRateLimiter)

Reset the loop rate limiter before a loop. Not necessary if the object is created directly before calling the loop.

source
Mercury.startloopMethod
startloop(::LoopRateLimiter)

Call this function at the beginning of a loop body to start timing the loop.

source
Mercury.usleepMethod
usleep(us)

Sleep for us microseconds. A wrapper around the C usleep function in unistd.h.

source
Mercury.@rateMacro
@rate

Run a loop at a fixed rate, specified either by an integer literal or a LoopRateLimiter object. It will run the loop so that it executes close to rate iterations per second.

Examples

@rate for i = 1:100
    myexpensivefunction()
end 200#Hz
lr = LoopRateLimiter(200, N_batch=10)
@rate while i < 100
    myexpensivefunction()
    i += 1
end lr

Note that the following does NOT work:

rate = 100
@rate for i = 1:100
    myexpensivefunction()
end rate

Since Julia macros dispatch on the compile-time types instead of the run-time types.

source