Skip to content

libBeanstalk.NET, a Beanstalkd client for .NET/mono

Image curtesy of jepeters74A couple of years back I wrote a store-and-forward message queue called simpleMQ for vmix. A year later, Vmix was kind enough to let me open source it and I put it up on sourceforge (cause that was the place back in the day). But it never got any documentation or promotion love from me, being much too busy building notify.me and using simpleMQ as its messaging backbone. Over those last couple of years, simpleMQ was served us incredibly well at notify.me, passing what must be billions of messages by now. But it does have warts and I have a backlog of fixes/features i've been meaning to implement.

Beanstalkd: simple & fast workqueue

Rather than invest more time on simpleMQ, i've been watching other message/work queues to see whether there is another product i could use instead. I've yet to find a product that i truly like, but Beanstalkd is my favorite of the bunch. Very simple, fast and with optional persistence, it addresses most of my needs.

Beanstalkd's protocol is inspired by memcached. It uses a persistent TCP connection, but relies on connection state only to determine which "tube" (read: workqueue) you are using and to act as a work timeout safeguard. The protocol is ASCII verbs with binary payloads and uses yaml for structured responses.

Tubes are created on demand and destroyed once empty. By default beanstalkd is in-memory only, but can use a binary log to store items and recover the in-memory state by log playback. I had briefly looked at zeromq, but after finding out that its speed relies on no persistence, I decided to give it a skip. zeromq might be web scale, but i prefer a queue that doesn't degrade to behaving like /dev/null :) Maybe my transactional RDBMS roots are showing, but I have a soft spot for at least journaling data to disk.

One concept of beanstalkd that i'm still conflicted about is that work is given a processing time-out (time-to-run) by the producer of the work, rather than having the consumer of the work declare its intended responsiveness. Since the producer doesn't know how soon the work gets picked up, i.e. time-to-run is not a measure of work item staleness, I don't see a great benefit for having the producer dictate the work terms.

The other aspect of work distribution beanstalkd lacks for my taste is the idea of being able to produce work in one instance and have it forwarded to another instance when that instance is available, i.e. store-and-forward queues. I like to keep my queues on the current host so i can produce work without having to rely on the uptime of the consumer or some central facility. However, store-and-forward is an implementation detail I can easily fake with a daemon on each machine that acts as a local consumer and distributor of work items, so it's not something i hold against beanstalkd.

libBeanstalk.NET

Notify.me being a mix of perl and C#, i needed a .NET client. A protocol complete one not existing and given the simplicity of the Beanstalkd protocol, I opted to write my own and have released it under Apache 2.0 on github.

I've not put DLLs up for downlad, since the API is still somewhat in flux as I continue to add features, but the current release supports the entire 1.3 protocol. By default it considers all payloads as binary streams, but I've included extension methods to handle simple string payloads:

  // connect to beanstalkd
  using(var client = new BeanstalkClient(host, port)) {

    // put some data
    var put = client.Put("foo");

    // reserve data from queue
    var reserve = client.Reserve();
    Console.Writeline("data: {0}", reserve.Data);

    // delete reserved data from queue
    client.Delete(reserve.JobId);
  }

The binary surface is just as simple:

  // connect to beanstalkd
  using(var client = new BeanstalkClient(host, port)) {

    // put some data
    var put = client.Put(100, 0, 120, data, data.Length);

    // reserve data from queue
    var reserve = client.Reserve();

    // delete reserved data from queue
    client.Delete(reserve.JobId);
  }

I've tried to keep the interface IBeanstalkClient to be a close as possible to the protocol verb signatures and rely on extension methods to create simpler versions on top of that interface. To facilitate extensions that provide smart defaults, the client also has an instance of a Defaults member that can be used to initialize those values.

The main deviation from the protocol is how I handle producer and consumer tubes. Rather than have a separate getter and setter for the tube that put will enter work into, I simply have a settable property CurrentTube. And rather than surfacing watch, ignore and listing of consumer tubes, the client includes a special collection, WatchedTubes, with the following interface:

interface IWatchedTubeCollection : IEnumerable<string> {
    int Count { get; }
    void Add(string tube);
    bool Remove(string tube);
    bool Contains(string tube);
    void CopyTo(string[] array, int arrayIndex);
    void Refresh();
}

I was originally going to use ICollection<string>, but Clear() did not make sense and I wanted to have a manual method to reload the list from the server, which is exposed via Refresh(). Under the hood, watched tubes is a hashset, so adding the same tube multiple times has no effect, neither is order of tubes in the collection guaranteed.

Future work

The client is functional and can do everything that Beanstalkd offers, but it's really just a wire protocol, akin to dealing with files as stream. To make this a useful API, it really needs to take the 90% use cases and remove any friction and repetition they would encounter.

Connection pooling

BeanstalkClient isn't, nor is it meant to be, thread-safe. It assumes you create a client when you need it and govern access to it, rather than sharing a single instance. This was motivated by Beanstalkd's behavior of storing tube state as part of the connection. Given that I encourage clients to be created on the fly to enqueue work, it makes sense that under the hood clients should use a connection pool both to re-use existing connections rather than constantly open and close sockets and to limit the maximum sockets a single process tries to open to Beanstalkd. Pooling wouldn't mean sharing of a connection by clients, but handing off connections to new clients and putting them in a pool to be closed on idle timeout once the client is disposed.

Most of this work is complete and on a branch, but i want to put it through some more testing before merging it back to master, especially since it will introduce client API changes.

Distributed servers

The Beanstalkd FAQ has this to say about distribution:

Does beanstalk inherently support distributed servers?

Yes, although this is handled by the clients, just as with memcached. The beanstalkd server doesn't know anything about other beanstalkd instances that are running.

I need to take a look at the clients that do implement this and determine what that means for me. I.e. do they use some kind of consistent hashing to determine which node to use for a particular tube, etc. But I do want to have parity with other clients on this.

POCO Producers and Consumers

For me, the 90% use case for a work queue is produce work on some threads/processes/machines and consume that work on a number of workers. Generally that item will have some structured fields describing the work to be done and producers and consumers will use designated tubes for specific types of work. These use cases imply that producers and consumers are separate user stories, that they are tied to specific tubes and deal with structured data. My current plan is to address these user stories with two new interfaces that will look similar to these:

public interface IBeanstalkProducer<T> {
  BeanstalkProducerDefaults Defaults { get; }
  PutResponse Put(T);
}

public interface IBeanstalkConsumer<T> {
  BeanstalkConsumerDefaults Defaults { get; }
  Job<T> Reserve();
  bool Delete(Job<T> job);
  Release(Job<T> job);
}

The idea with each is that it's tied to a tube (or tubes for the consumer) at construction time and that the implementation will have a simple way of associating a serializer for the entity T (will provide protobuf and MetSys.Little support out of the box).

Rx support via IObservable<Job<T>>

Once there is the concept of a Job<T>, it makes sense that reservation of jobs should be exposed as a stream of work that can be processed via link. Although since items should only be reserved when the subscriber accepts the work, it should probably be encapsulated in something like this:

public interface Event<T> {
  Job<T> Take();
  Job<T> Job { get; }
}

This way, multiple subscribers can try to reserve an item and items not reserved by anyone are released automatically.

As I work on the future work items, I will also use the library in production so i can get better educated about the real world behavior of Beanstalkd and what uncovered scenarios the client runs into. There is ok test coverage over the provided behavior but I certainly want to increase that signficantly as i keep working on it.

For the time being, I hope the library proves useful to other .NET developers and would love to get feedback, contributions and issues you may encounter.