messaging

Type-safe actor messaging approaches

For notify.me I hand-rolled a simple actor system to handle all Xmpp traffic. Every user in the system has its own actor that maintains their xmpp state, tracking online status, resources, resource capability, notification queues and command capabilities. When a message comes in either via our internal notification queues or from the user, a simple dispatcher sends the message on to the actor which handles the message and responds via a message that the dispatcher either hands off to the Xmpp bot for formatting and delivery to the client or sends it to our internal queues for propagation to other parts of the notify.me system.

This has worked flawlessly for over 2 years now, but its ad-hoc nature means it's a fairly high touch system in terms of extensibility. This has led me down building a more general actor system. Originally Xmpp was our backbone transport among actors in the notify.me system, but at this point, I would like to use Xmpp only as an edge transport, and otherwise use in-process mailboxes and serialize via protobuf for remote actors. I still love the Xmpp model for distributing work, since nodes can just come up anywhere, sign into a chatroom and report for work. You get broadcast, online monitoring, point-to-point messaging, etc. all for free. But it means all messages go across the xmpp backbone, which has a bit of overhead and with thousands of actors, i'd rather stay in process when possible. No point going out to the xmpp server and back just to talk to the actor next to you. I will likely still use Xmpp for Actor Host nodes to discover each other, but the actual inter-node communication will be direct Http-RPC (no, it's not RESTful, if it's just messaging).

Definining the messaging contract as an Interface

One design approach I'm currently playing with is using actors that expose their contract via an interface. Keeping the share-nothing philosophy of traditional actors, you still won't have a reference to an actor, but since you know its type, you know exactly what capabilities it has. That means rather than having a single receive point on the actor and making it responsible for routing the message internally based on message type (a capability that lends itself better to composition), messages can arrive directly at their endpoints by signature. Another benefit is that testing the actor behavior is separate from its routing rules.

public interface IXmppAgent {
    void Notify(string subject, string body);
    OnlineStatus QueryStatus();
}

Given this contract we could just proxy the calls. So our mailbox could have a proxy factory like this:

public interface IMailbox {
    TRecipient For<TRecipient>(string id);
}

allowing us to send messages like this:

var proxy = _mailbox.For<IXmppAgent>("foo@bar.com");
proxy.Notify("hey", "how'd you like that?");
var status = proxy.QueryStatus();

But messaging is supposed to be asynchronous

While this is simple and decoupled, it is implictly synchronous. Sure .Notify could be considered a fire-and-forget message, .QueryStatus definitely blocks. And if we wanted to communicate an error condition like not finding the recipient, we'd have to do it as an exception, moving errors into the synchronous pipeline as well. In order to retain the flexibility of a pure message architecture, we need a result handle that let's us handle results and/or errors via continuation.

My first pass at an API for this resulted in this calling convention:

public interface IMailbox {
    void Send<TRecipient>(string id, Expression<Action<TRecipient>> message);
    Result SendAndReceive<TRecipient>(string id, Expression<Action<TRecipient>>  message);
    Result<TResponse> SendAndReceive<TRecipient, TResponse>(
        string id,
        Expression<Func<TRecipient, TResponse>>  message
    );
}

transforming the messaging code to this:

_mailbox.Send<IXmppAgent>("foo@bar.com",a => a.Notify("hey", "how'd you like that?"));
var result = _mailbox.SendAndReceive<IXmppAgent, OnlineStatus>(
    "foo@bar.com",
    a => a.QueryStatus()
);

I'm using MindTouch Dream's Result<T> class here, instead of Task<T>, primarily because it's battle tested and I have not properly tested Task under mono yet, which is where this code has to run. In this API, .Send is meant for fire-and-forget style messaging while .SendAndReceive provides a result handle — and if Void were an actual Type, we could have dispensed with the overload. The result handle has the benefit of letting us choose how we want to deal with the asynchronous response. We could simply block:

var status = _mailbox.SendAndReceive<IXmppAgent, OnlineStatus>(
        "foo@bar.com",
        a => a.QueryStatus())
    .Wait();
Console.WriteLine("foo@bar.com status:", status);

or we could attach a continuation to handle it out of band of the current execution flow:

_mailbox.SendAndReceive<IXmppAgent, OnlineStatus>(
        "foo@bar.com",
        a => a.QueryStatus()
    )
    .WhenDone(r => {
        var status = r.Value;
        Console.WriteLine("foo@bar.com status:", status);
    });

or we could simply suspend our current execution flow, by invoking it from a coroutine:

var status = OnlineStatus.Offline;
yield return _mailbox.SendAndReceive<IXmppAgent, OnlineStatus>(
        "foo@bar.com",
        a => a.QueryStatus()
    )
    .Set(x => status = x);
Console.WriteLine("foo@bar.com status:", status);

Regardless of completion strategy, we have decoupled the handling of the result and error conditions from the message recipient's behavior, which is the true goal of the message passing decoupling of the actor system.

Improving usability

Looking at the signatures there are two things we can still improve:

  1. If we send a lot of messages to the same recipient, the syntax is a bit repetitive and verbose
  2. Because we need to specify the recipient type, we also have to specify the return value type, even though it should be inferable

We can address both of these, by providing a factory method for a typed mailbox:

public interface IMailbox {
    IMailbox<TRecipient> To<TRecipient>(string id);
}

public interface IMailbox<TRecipient> {
    void Send(Expression<Action<TRecipient>> message);
    Result SendAndReceive<TResponse>(Expression<Action<TRecipient>>  message);
    Result<TResponse> SendAndReceive<TResponse>(
        Expression<Func<TRecipient, TResponse>>  message
    );
}

which let's us change our messaging to:

var actorMailbox = _mailbox.To<IXmppAgent>("foo@bar.com");
actorMailbox.Send(a => a.Notify("hey", "how'd you like that?"));
var result2 = actorMailbox.SendAndReceive(a => a.QueryStatus());

// or inline
_mailbox.To<IXmppAgent>("foo@bar.com")
    .Send(a => a.Notify("hey", "how'd you like that?"));
var result3 = _mailbox.To<IXmppAgent>("foo@bar.com")
    .SendAndReceive(a => a.QueryStatus());

I've included the inline version because it is still more compact than the explicit version, since it can infer the result type.

Supporting Remote Actors

The reason the mailbox uses Expression instead of raw Action and Func is that at any point an actor we're sending a message to could be remote. The moment we cross process boundaries, we need to serialize the message. That means we need to be able to programatically inspect the inspection, and build a serializable AST as well as serialize the captured data members used in the expression.

Since we're talking serializing, inspecting the expression also allows us to verify that all members are immutable. For value types, this is easy enough, but DTOs would need be prevented from changing so that local vs. remote invocation won't end up with different result just because the sender changed it's copy. We could handle this via serialization at message send time, although this looks like a perfect place to see how well the Freezable pattern works.

libBeanstalk.NET, a Beanstalkd client for .NET/mono

Image curtesy of jepeters74A couple of years back I wrote a store-and-forward message queue called simpleMQ for vmix. A year later, Vmix was kind enough to let me open source it and I put it up on sourceforge (cause that was the place back in the day). But it never got any documentation or promotion love from me, being much too busy building notify.me and using simpleMQ as its messaging backbone. Over those last couple of years, simpleMQ was served us incredibly well at notify.me, passing what must be billions of messages by now. But it does have warts and I have a backlog of fixes/features i've been meaning to implement.

Beanstalkd: simple & fast workqueue

Rather than invest more time on simpleMQ, i've been watching other message/work queues to see whether there is another product i could use instead. I've yet to find a product that i truly like, but Beanstalkd is my favorite of the bunch. Very simple, fast and with optional persistence, it addresses most of my needs.

Beanstalkd's protocol is inspired by memcached. It uses a persistent TCP connection, but relies on connection state only to determine which "tube" (read: workqueue) you are using and to act as a work timeout safeguard. The protocol is ASCII verbs with binary payloads and uses yaml for structured responses.

Tubes are created on demand and destroyed once empty. By default beanstalkd is in-memory only, but can use a binary log to store items and recover the in-memory state by log playback. I had briefly looked at zeromq, but after finding out that its speed relies on no persistence, I decided to give it a skip. zeromq might be web scale, but i prefer a queue that doesn't degrade to behaving like /dev/null :) Maybe my transactional RDBMS roots are showing, but I have a soft spot for at least journaling data to disk.

One concept of beanstalkd that i'm still conflicted about is that work is given a processing time-out (time-to-run) by the producer of the work, rather than having the consumer of the work declare its intended responsiveness. Since the producer doesn't know how soon the work gets picked up, i.e. time-to-run is not a measure of work item staleness, I don't see a great benefit for having the producer dictate the work terms.

The other aspect of work distribution beanstalkd lacks for my taste is the idea of being able to produce work in one instance and have it forwarded to another instance when that instance is available, i.e. store-and-forward queues. I like to keep my queues on the current host so i can produce work without having to rely on the uptime of the consumer or some central facility. However, store-and-forward is an implementation detail I can easily fake with a daemon on each machine that acts as a local consumer and distributor of work items, so it's not something i hold against beanstalkd.

libBeanstalk.NET

Notify.me being a mix of perl and C#, i needed a .NET client. A protocol complete one not existing and given the simplicity of the Beanstalkd protocol, I opted to write my own and have released it under Apache 2.0 on github.

I've not put DLLs up for downlad, since the API is still somewhat in flux as I continue to add features, but the current release supports the entire 1.3 protocol. By default it considers all payloads as binary streams, but I've included extension methods to handle simple string payloads:

  // connect to beanstalkd
  using(var client = new BeanstalkClient(host, port)) {

    // put some data
    var put = client.Put("foo");
 
    // reserve data from queue
    var reserve = client.Reserve();
    Console.Writeline("data: {0}", reserve.Data);
    
    // delete reserved data from queue
    client.Delete(reserve.JobId);
  }

The binary surface is just as simple:

  // connect to beanstalkd
  using(var client = new BeanstalkClient(host, port)) {

    // put some data
    var put = client.Put(100, 0, 120, data, data.Length);
 
    // reserve data from queue
    var reserve = client.Reserve();
    
    // delete reserved data from queue
    client.Delete(reserve.JobId);
  }

I've tried to keep the interface IBeanstalkClient to be a close as possible to the protocol verb signatures and rely on extension methods to create simpler versions on top of that interface. To facilitate extensions that provide smart defaults, the client also has an instance of a Defaults member that can be used to initialize those values.

The main deviation from the protocol is how I handle producer and consumer tubes. Rather than have a separate getter and setter for the tube that put will enter work into, I simply have a settable property CurrentTube. And rather than surfacing watch, ignore and listing of consumer tubes, the client includes a special collection, WatchedTubes, with the following interface:

interface IWatchedTubeCollection : IEnumerable<string> {
    int Count { get; }
    void Add(string tube);
    bool Remove(string tube);
    bool Contains(string tube);
    void CopyTo(string[] array, int arrayIndex);
    void Refresh();
}

I was originally going to use ICollection<string>, but Clear() did not make sense and I wanted to have a manual method to reload the list from the server, which is exposed via Refresh(). Under the hood, watched tubes is a hashset, so adding the same tube multiple times has no effect, neither is order of tubes in the collection guaranteed.

Future work

The client is functional and can do everything that Beanstalkd offers, but it's really just a wire protocol, akin to dealing with files as stream. To make this a useful API, it really needs to take the 90% use cases and remove any friction and repetition they would encounter.

Connection pooling

BeanstalkClient isn't, nor is it meant to be, thread-safe. It assumes you create a client when you need it and govern access to it, rather than sharing a single instance. This was motivated by Beanstalkd's behavior of storing tube state as part of the connection. Given that I encourage clients to be created on the fly to enqueue work, it makes sense that under the hood clients should use a connection pool both to re-use existing connections rather than constantly open and close sockets and to limit the maximum sockets a single process tries to open to Beanstalkd. Pooling wouldn't mean sharing of a connection by clients, but handing off connections to new clients and putting them in a pool to be closed on idle timeout once the client is disposed.

Most of this work is complete and on a branch, but i want to put it through some more testing before merging it back to master, especially since it will introduce client API changes.

Distributed servers

The Beanstalkd FAQ has this to say about distribution:

Does beanstalk inherently support distributed servers?

Yes, although this is handled by the clients, just as with memcached. The beanstalkd server doesn’t know anything about other beanstalkd instances that are running.

I need to take a look at the clients that do implement this and determine what that means for me. I.e. do they use some kind of consistent hashing to determine which node to use for a particular tube, etc. But I do want to have parity with other clients on this.

POCO Producers and Consumers

For me, the 90% use case for a work queue is produce work on some threads/processes/machines and consume that work on a number of workers. Generally that item will have some structured fields describing the work to be done and producers and consumers will use designated tubes for specific types of work. These use cases imply that producers and consumers are separate user stories, that they are tied to specific tubes and deal with structured data. My current plan is to address these user stories with two new interfaces that will look similar to these:

public interface IBeanstalkProducer<T> {
  BeanstalkProducerDefaults Defaults { get; }
  PutResponse Put(T);
}

public interface IBeanstalkConsumer<T> {
  BeanstalkConsumerDefaults Defaults { get; }
  Job<T> Reserve();
  bool Delete(Job<T> job);
  Release(Job<T> job);
}

The idea with each is that it's tied to a tube (or tubes for the consumer) at construction time and that the implementation will have a simple way of associating a serializer for the entity T (will provide protobuf and MetSys.Little support out of the box).

Rx support via IObservable<Job<T>>

Once there is the concept of a Job<T>, it makes sense that reservation of jobs should be exposed as a stream of work that can be processed via link. Although since items should only be reserved when the subscriber accepts the work, it should probably be encapsulated in something like this:

public interface Event<T> {
  Job<T> Take();
  Job<T> Job { get; }
}

This way, multiple subscribers can try to reserve an item and items not reserved by anyone are released automatically.

As I work on the future work items, I will also use the library in production so i can get better educated about the real world behavior of Beanstalkd and what uncovered scenarios the client runs into. There is ok test coverage over the provided behavior but I certainly want to increase that signficantly as i keep working on it.

For the time being, I hope the library proves useful to other .NET developers and would love to get feedback, contributions and issues you may encounter.

By arne on | .net, geek, mono | 3 comments
Tags: , , ,