My inability to cleanly insert an intercepting actor to spawn new actors on demand discussed in my previous post about Calculon illustrated tunnelvision in my expression based messages. While these are a great way to express the message contract, they are finally just one way of defining message payloads. The pipeline should be ignorant of message payloads and be able to route any payload based on meta-data alone.
I had to stop hiding the IMessage format under hood and just accept that ExpressionTransport really represents a convenience extension for generating message payloads. This change not only makes it possible for any actor to accept any kind of message, but also to makes the routing a lot simpler and flexible. Now there is only a single, much simpler ITransport:
public interface ITransport {
ActorAddress Sender { get; }
void Send(IMessage message);
}
I've also attached ther sender onto the transport, which means there's one less dependency to inject. Given this, ExpressionTransport and MessageTransport simply becomes extension methods on ITransport:
public static class TransportEx {
public static void Send<TRecipient>(
this ITransport transport,
Expression<Action<TRecipient>> message
) {
transport.Send(new ExpressionActionMessage<TRecipient>(
transport.Sender,
ActorAddress.Create<TRecipient>(), message
));
}
// etc.
}
This provides a lot more flexibility for messaging, since new payloads can be created by implementing IMessage. It does mean that dynamic dispatch would be nice for dispatching the different payloads against their respective receivers. Oh well, we can work around that.
Now it's possible for an actor not of type TRecipient to receive an ExpressionMessage<TRecipient>, perform some action and re-dispatch it via Send(). By default routing is done by Id, but missing a direct receiver we can now insert additional pattern matching to determine a suitable recipient. For this I've created an interface to let actors expose their accept criteria:
public interface IPatternMatchingActor {
Expression<Func<MessageMeta, bool>> AcceptCriteria { get; }
}
While this could have been done by convention, I couldn't think of a reason other than current fashion to not use an interface contract to expose this actor capability, so an interface it was. The reason it's an Expression and operates on the MessageMeta rather than the expression is to facilitate serialization of the criteria for delivery across the process boundary so messages can be qualified for acceptance before attempting to cross that boundary.
I now have everything I need to switch the notify.me bots to Calculon. Once that is done, and working reliably, it'll be time to improve the dispatch plumbing to improve speed and reduce concurrency bottlenecks in dispatch.
I'm currently extending functionality in the notify.me bots and in order to make this easier, I'm refactoring the adhoc actor-like message processing system I built into one a bit more flexible for adding features quickly. Right now message senders and receivers are hard-coupled and use blocking dictionary lookups for dispatch. They also act on instances of each other, which allowed some insidious calls to sneak in during moments of weakness.
As I embarked on my refactor, I wanted to make sure the replacing infrastructure removed assumptions about the entities communicating among each other but also wanted to avoid the pitfall of designing something overly generic. For that I had to first define what it was I needed to be able to do, so I'd only build what I need. At the same time, I decided to pull the replacement into its own Assembly, so that implementation specific coupling wouldn't leak back into the plumbing again. The resulting system has been named in honor of the greatest of all acting robots, Calculon, and is available in its present work-in-progress form on github.
The bot is responsible for dispatching messages to users and receiving user messages and presence status. The bot passes messages for a user on to the user's UserAgent actor and receives messages to send to the user from the UserAgent. For distribution and maintenance simplicity, each bot and its related actors was implemented as a separate process.
UserAgents keep the state of the user, such as presence, including all resources (different clients) connected and queues up messages coming from the message queue until the user is in a state to receive messages. It has its own persistence layer, allowing idle users to expire and be recreated as incoming traffic from either the bot or the message queue requires it.
The message queue is a client to our store-and-forward queueing system. Messages from users are pushed into the this actor via long-polling and user data/actions that affect other notify.me systems (such as analytics) are pushed into the appropriate queues as they are handed to the MessageQueue by other actors (generally UserAgents).
At the root of the system, exists the Stage, which exposes the ActorBuilder:
_stage.AddActor<IXmppAgent>().WithId("bob@foo.com").Build();
The assumption is that actors may require a transport and their own address at construction time and that they are completely isolated, i.e. no reference is ever exposed. The builder will inject these framework owned dependencies if detected in a constructors signature. In order to allow for more flexible construction and the ability to have some kind of IoC container act as a factory, the builder exposes hooks like this:
_stage.AddActor<IXmppAgent>().WithId("bod@foo.com").BuildWithExpressionTransport(
(transport,address) => container.Resolve<IXmppAgent>(transport,address)
);
The above assumes a container such as Autofac which can resolve a type and be provided typed parameters to optionally inject.
This is the root of the dispatch system. I need to be able to send the message without a reference to receiver and let the transport worry about immediate delivery, queueing for later or routing it to some controller that will bring the recipient into existence. None of those concerns should be visible to the sender. Using semantics introduced in "Type-safe actor messaging approaches", and slightly tweaked by implementation, provides me with a way of asynchronously calling methods on unknown recipients:
public interface IExpressionTransport {
void Send<TRecipient>(Expression<Action<TRecipient>> message);
void Send<TRecipient>(Expression<Action<TRecipient, MessageMeta>> message);
Result SendAndReceive<TRecipient>(Expression<Action<TRecipient>> message);
Result SendAndReceive<TRecipient>(Expression<Action<TRecipient, MessageMeta>> message);
Result<TResponse> SendAndReceive<TRecipient, TResponse>(
Expression<Func<TRecipient, TResponse>> message
);
Result<TResponse> SendAndReceive<TRecipient, TResponse>(
Expression<Func<TRecipient, MessageMeta, TResponse>> message
);
IAddressedExpressionTransport<TRecipient> For<TRecipient>(string id);
}
The main addition is the ability to inject MessageMeta, a class containing meta information such as Sender and Recipient into the receiver without the Sender having to specify this data.
For UserAgent messages, there are thousands of actors each with a unique Id. While currently that Id is a Jid I don't want to tie the internals to Xmpp specific details, so Id should be an plain string and let the transport worry about the meaning and routing implications of that string.
The ability to send by Id is provided by IExpressionTransport.For<TRecipient>(string id). The returned interface IAddressedExpressionTransport<TRecipient> mirrors IExpressionTransport, representing a intermediate storage of the receiver id, thus providing a fluent interface that permits the following calling convention:
_transport.For<Recipient>(id).SendAndReceive(x => x.Notify("hey", "how'd you like that?"));
If I stay with the process-per-bot for the bot and messagequeue actors, there would be a single instance for these actors and I can address them directly by Type. The semantics for these message are already expressed by IExpressionTransport.
Of course, dealing with unkown recipients begs the question where do these recipients come from? I need to be able intercept messages for Id's that are not yet in the system and spawn those recipients on the fly. Wanting to stay with actors for anything but the base plumbing, this facility should be handled by actors that can receive these messages and tell the plumbing to instantiate a new actor.
The same interface to access the ActorBuilder exposed by the stage is encapsulated by the IDirector:
public interface IDirector {
ActorBuilder<TActor, IDirector> AddActor<TActor>();
void R(ActorAddress address);
}
The director being a framework owned actor can of course be called via messaging, allowing a new actor to be registered with:
_transport.Send<IDirector>(
x => x.AddActor<IXmppAgent>().WithId("bob@foo.com").Build()
);
That leaves the ability to intercept messages that don't have a recipient, and redispatch those messages once the interceptor was able to spawn the actor. Both of those are not compatible with Expression based messages since they are coupled to a pretty specific contract. This is the one piece I don't have in Calculon at the time of this writing and the problem is discussed below.
When a UserAgent sits idle for a while, it should be possible to remove it from the actor pool. Since the actor instance doesn't know that anything about the framework that owns it, there needs to be a message that can be sent to the actors mailbox that shuts it down, ideally disposing and IDisposable actors.
The interface IDirector introduced above includes a method for just that:
_transport.Send<IDirector>(
(x,m) => x.RetireActor(_address, m);
);
This could be send by an actor itself, or by a governing actor that is responsible for a number of actors in a pool. Under the hood, this is where un-typed messages come into play, since they can be sent without a matching method on the recipient, and therefore could have special meaning to the mailbox that manages the recipient. I.e. sending the retire message to the director, simply causes it to send an untyped retire message to the actors mailbox, which will then shut itself down and dispose the actor. The interface for untyped messages (providing a more traditional Receive(msg) actor messaging model) is provided by this interface:
public interface IMessageTransport {
void Send<TMessageData>(TMessageData messageData);
Result<TOut> SendAndReceive<TIn, TOut>(TIn messageData);
IAddressedMessageTransport For(string id);
}
Rather than force an interface on the receiving actor, messages not simply swallowed by the mailbox are delivered to the actor by convention, looking for a Receive method with appropriate TMessageData.
The above is the basic requirements to provide the same functionality already present in the notify.me bot daemons, but using generalized plumbing. It's certainly sufficient to get the code underway, but in itself doesn't provide a lot more than the status-quo other than simplifying the extensibility and maintainability of UserAgents.
To expand on the present feature set and move other parts of the notify.me system to this actor infrastructure I have the following additional design goals for Calculon:
One of my lead design goals was not to force any interface or baseclass requirements on actors, i.e. it should be possible to author actors as plain old C# objects (POCO). Actors should exist as their own isolated execution environment and their functionality testable without any part of Calculon in play. Dependencies such as transport and address are completely optional and injected by signature.
Another aspect I would like to see is the erlang-style let it crash philosophy. It should be possible for an actor to subscribe to another actor to monitor its health. I'm not sure what "crash" should mean at this time, since using Result as the completion handle already captures exceptions and marshalls them to the caller.
My plan is to let these semantics emerge from use cases, as I put Calculon into production.
One of primary benefits of actors for concurrency is that it cleanly decouples the pieces from each other and lets you move these pieces around for scalability. Being able to serialize the messages would allow dispatchers to send messages across the wire to other nodes in the actor network. For this, I need to determine a format for serializing the LINQ expressions used in ExpressionMessage. That means that any value captured by the expression needs to be serializable itself. Unfortunately checking whether an expression can be serialzied will be a runtime rather than compile time check.
Serializable messages are desirable even for local operation to enforce the share nothing philosophy. As it stands right now, shared object references could be used as message arguments, which defeats the purpose of this system. However, for performance reasons, I will likely employ Subzero to avoid unnecessary copying.
Once there exists remote actor capability, it is possible to traverse AppDomain boundaries easily. That means that we could launch actors in different AppDomain. Conceivably, we should be able to drop a new implementation dll into a directory, load it up and have a control actor shutdown existing actors and and subsume their capabilties with its own implementation. Since we're serializing messages, changes to an actor's implemtation or even interface do not matter, as long as the method signatures previously published still exist.
The "needed" capabilities, except for message intercept and re-dispatch, are currently implemented, although the infrastructure is a very simple implementation with lock contention issues under load. However those limitations are no more severe than my current setup, so it's good enough to migrate to, letting me improve and expand the plumbing against a working system.
The main stumbling block is dealing with interception. Right now delivery is done by Id and Type, and for expression based messages Type is fairly binding, at least to use the message. Of course the if the primary reason to intercept messages is to create the missing recipient, the interceptor would not need to be able to unwrap the message, just re-dispatch.
The simple way to implement this is to make interceptors hook into the dispatch framework, rather than actors in their own right. They could then be tied to internals and simply be part of the mailbox matching code and spawn and insert a new mailbox when triggered. However, I would rather stick with actors for everything and make the framework as invisible as possible, which also means that capture and re-dispatch should also be possible without exposing the internals of the framework. I.e. right now nobody outside of the plumbing ever even sees an expression message instance and I'd like to keep it that way.
Since I already know that I want to have actors that can choose to accept messages based on the sender, rather than a recipient id, it's clear that I need better pattern matching capabilities for actors to expose that let them indicate their interest in accepting a message and that I need some neutral payload format that can be re-addressed and re-dispatched. So that's still one part of the puzzle I have to solve.
Posted a new episode of the Concurrent Podcast over on the MindTouch developer blog. This time Steve and I delve into Coroutines, a programming pattern we use extensively in MindTouch 2009 and one that i’m also trying out as an alternative to my actor based Xmpp code in Notify.me.
Since there isn’t a native coroutine framework in C#, we’re using the one provided by MindTouch Dream. It’s built on top of the .NET iterator pattern (i.e. IEnumerable and yield) and makes the assumption that all Coroutines are asynchronous methods using Dream’s Result<T> object for coordinating the producer and consumer of a return values. Steve’s previously blogged about Result. Since those posts there’s also been a lot of performance improvements and capability improvements to Result committed to trunk, primarily providing robust cancellation with resource cleanup callbacks. For background on coroutines, you can also check out previous posts I’vee written.
The cool thing about asynchronous coroutines compared to an actor model is that call/response based actions can be written as a single linear block of code, rather than separate message handlers whose contiguous flow can only be determined by examining the message dispatcher. With a message dispatcher that can correlate message responses with suspended coroutines, sending and waiting for a message in a coroutine can be made to look like a method call without blocking the thread, which, especially with message passing concurrency, is vital, since a response isnn’t in any way guaranteed to happen.
I’m due to write another post on how to use Dream’s coroutine framework, but in the meantime i highly recommend checking out Dream from mindtouch’s svn. Lot’s of cool concurrency stuff in there. trunkis under heavy development, as we work towards Dream profile 2.0, but 1.7.0 is stable and production proven.
As usual, I’ve been blogging over on the MindTouch Developer blog, and since the topics i post about over there have a pretty strong overlap with what I’d post here, I figured i might as well start cross-posting about it here.
Aside from various technical posts, Steve Bjork and I have started recording a Podcast about concurrent programming. It’s currently 2 episodes strong, with a third one coming soon. Information on past and future posts can always be found here.
Today’s post on the MindTouch dev blog is about the producer/consumer pattern and how i moved from using dedicated workers with a blocking queue to using Dream’s new ElasticThreaPool to dispatch work.