The actor model of computation has gained in reputation over the past decade as engineers have learned to grapple with concurrent and distributed systems to achieve scalability and availability.
I was in the zone about a year ago developing a job scheduling framework for an application called Nebula, which we use to manage keywords and SEM campaigns. A simple actor library with durable mailboxes implemented over Apache ZooKeeper seemed like a good fit at the time. Looking back, I can say that the solution has worked well. At the time, we had some good reasons (e.g., special throttling requirements) to develop our own framework instead of customizing an existing one. Faced with the same decision today, I would seriously consider building something around Akka.
We developed two primary patterns to make actors highly reliable. The first is to model critical actor state and mailboxes in ZooKeeper. The second is to use the ZooKeeper watch mechanism for actor supervision and recovery.
Example: job type limiter
To explain how these patterns work, let's look at the design of a specific actor, the job type limiter. Each job is of a given type, and type limits are set by an administrator to prevent too many jobs of a certain type from running simultaneously in the grid. The type limiter is a singleton in the compute grid and ensures that job type limits are enforced.
For example, let's assume we have jobs that invoke third-party services to update the maximum bid for keyword ads. The call capacity is limited, so while the grid has enough slots to run hundreds of jobs, let's say we can run only 10 concurrent jobs against a partner API. The type limiter ensures that the 11th job waits until one of the 10 that are running has completed.
For each job type, the type limiter keeps track of the running count and the limit. When the running count hits the limit, the gate is closed until a job of the corresponding type terminates. So how do we ensure this critical state is maintained correctly even if you cut the power cord to the system on which the type limiter is running? More importantly, how do we eliminate drift to prevent our running counts from being off by one? If a job host crashes, how do we guarantee that the type limiter receives a message indicating the job has ended?
Actors, mailboxes, queues, and watches
An actor receives and acts upon messages sent to its mailbox. Implementing an actor is quite easy because there are no locks to contend with – you get a message, handle it, and optionally send new messages or start new actors as a side-effect. But the mailbox, the limits, and the counter values must be durable and highly available for the type limiter to accurately and efficiently keep track of running counts in a fault-tolerant manner.
To model actor mailboxes, I started with a simple queue in ZooKeeper. A set of persistent sequential znodes represent messages, and their parent represents the queue. In order to wake up an actor when new messages arrive in the queue, I prepare another znode called a touch-pad. A sending actor creates the message znode, and then invokes setData() on the touch-pad to trigger a watch. The receiving actor establishes a data watch on the touch-pad and, when the watch is triggered, invokes getChildren() on the queue to collect messages.
After retrieval from ZooKeeper, the messages are ordered by sequence number and entered into a local queue for processing by the target actor. The actor keeps track of messages it has received by writing the message sequence number back to the queue znode. It does so by calling the asynchronous setData() before actually handling the message. The reason for writing ahead is that messages are assumed to be idempotent; if processing led to an error that prevented the actor from writing the current message sequence number back to the queue, the actor may end up in an infinite recovery cycle: get-message, handle-message-fail, read-recover, get-message, handle-message-fail,….
Though writing back the sequence number ahead of processing prevents the infinite cycle, it's possible that the message isn't properly handled at all. What if the actor crashes right after the sequence number has been written, but before the message has been processed? There are different ways to recover from a situation like this. In our case the scheduling framework holds enough extra state in ZooKeeper to allow actors to recover and eventually self-heal. In some cases, an actor will rely on senders to repeat a message; in others, it will check its current state against expected values and take corrective measures if necessary. If an actor hasn't received any messages for some time, it might run an admin cycle where it checks its state against expected values in its environment, or publishes its assumptions (sends a message) so other actors can confirm.
More concretely, the type limiter doesn't just keep a running count for each job type. It also records the job handle for each job that it lets out of the gate. In fact, it records the handle in ZooKeeper before it lets the job out of the gate. The job handle encodes a reference to a job's state (e.g., command) and a run ID. Each time a job is scheduled, its run ID is incremented, so the handle for each run is unique. The framework also serializes runs for the same job (in case of reruns or retries).
Other actors, called job monitors – one per host – are responsible for monitoring jobs running locally. They know whether a job process is still alive and progressing. Job monitors can confirm whether a job handle recorded by the type limiter refers to a job that is actually running. They also emit messages when they're idle to indicate no jobs are running so actors like the type limiter can validate their state. If the whole cluster is idle, the type limiter verifies that there are no job handles in its records. In this manner, the type limiter may from time to time suffer a temporary glitch where its count might be off by one, but it quickly recovers and makes adjustments as it verifies its assumptions with help from other actors.
There are aspects to programming actors that turn out to be non-trivial, especially if the application relies on consistent state to make decisions. Asynchronous message exchanges among many actors across a grid can easily lead to inconsistent views (or views that are eventually consistent, but consistent too late). While ZooKeeper guarantees ordered processing, and messages are delivered to actors in sequence by the library (you could say we are deviating from the pure actor model here), an actor can easily fall out of synch if it doesn't carefully maintain extra, durable state for recovery and healing, as the type limiter demonstrates.
An actor may expect an acknowledgment after sending a message. But if it doesn't receive any, it gives up after a timeout, and assumes the receiver or the network is unavailable. In such circumstances it is tempting to retry immediately, but better to back off for a while before retrying. In a larger cluster with a lot of asynchronous communication between actors, congestion or delays in processing are quickly ameliorated if actors back off immediately and try again after a random period of time. Throughput suffers a little, but overall stability and fault tolerance are improved – not to mention the quality of uninterrupted sleep for certain humans.
What happens when the host on which the type limiter is running shuts down or crashes? A supervisor takes note and starts a new actor on another system. The new type limiter instance reads the current message sequence number, loads all job handles, calculates the running counts by job type (i.e., refreshes its caches), reads and deletes messages with sequence numbers less than or equal to the current sequence number, and then starts processing new messages – writing back the sequence number before it handles each message. Got it; but how does it really work? What if the supervisor dies?
Before diving in further, this is a good place to give a shout-out to the folks who designed, developed and maintain ZooKeeper. I think it's one of the most useful open-source contributions for distributed computing of the past decade. Most of the fault tolerance in Nebula can be reduced to simple patterns that make use of the ZooKeeper model (e.g., ephemeral, session-bound znodes), its serialized atomic updates, and the associated watch mechanism.
Ephemeral znodes, master election, and actor supervision
Now let's examine actor supervision, the second of the two patterns, and one that relies on ZooKeeper's ephemeral znodes and watch mechanism. We start with master election, where any number of participants compete to attain mastership for a certain role. You can think of these participants as little actors that come to life under prescribed circumstances – for example, when a system starts up. We require that each election participant run on a different host in the grid.
Let's consider the role that gives the master the privilege of starting a type limiter. Each role is uniquely identified by a well-known path in ZooKeeper where ephemeral sequential znodes are created by election participants. The winner is the participant who creates the znode with the lowest sequence number. This participant is elected master and acts as a local watchdog responsible for the life cycle of the main actor, in this case the type limiter. If the system shuts down cleanly, the type limiter terminates normally, and the master deletes its znode in ZooKeeper.
The deletion triggers a watch that's established by another participant – we call it the slave. The slave is the participant who creates the znode with the second-lowest sequence number during the election. Instead of walking away, the slave establishes a watch on the master's znode and acts like a supervisor for the master and its main actor. When the watch is triggered, the slave is notified, and it immediately takes over the master role by starting a new instance of the type limiter. In other words, the slave mutates into a master and now acts as a watchdog over the type limiter.
If a master and its main actor die abruptly because their host crashes, their ZooKeeper session eventually times out, and ZooKeeper automatically deletes the corresponding ephemeral znode. The watch is triggered, and the slave takes over as described above. Except for a minor delay due to the session timeout, and the possibility of incomplete message handling by a crashing actor, the fail-over and recovery are the same. Bring on the chaos monkeys and see if you can tell anything unusual is happening in the grid – peeking at the logs is cheating.
We're not done yet. There’s another actor called a grid monitor, which comes to life at random intervals on each host in the grid. It checks the health of local actors and then participates in an election to run a grid-level scan. One of the critical health checks is to ensure every actor has at least one active master and one active slave. The grid monitor doesn't communicate with other hosts; it just scans ZooKeeper to see if the expected master and slave znodes exist for each role. If it detects a missing slave (not uncommon after a host goes down and associated slaves take over master roles), it starts up a new participant, which can take on the slave's supervisor role (unless the master for the same role is running on the local host). With the grid monitor's help, we can ensure that most fail-overs are handled immediately. Even in the rare case where a slave and then its master are terminated in short order, grid monitor instances will bring them back to life on other hosts within a few seconds.
Actor supervision in Nebula is thus quite similar in principle to the supervisor hierarchies defined in Erlang OTP. The master is the immediate local supervisor acting as a watchdog, the slave operates at the next level and runs on a different host to support rapid fail-over, and the grid monitor acts like a meta-supervisor across the whole grid. One of the most useful properties of the Nebula grid is that all hosts are created equal. There is no single point of failure. An actor can run on any host, and the grid monitor instances ensure the load is balanced across the grid.
Sequential znode suffix limits
Let's look at one more detail regarding durable mailboxes in ZooKeeper. If you've used ZooKeeper, you know that the names of sequential znodes have a numeric 10-digit suffix. A counter value stored in the parent is incremented each time a new znode is created. If you rely on ZooKeeper sequence numbers to order messages in a queue, it's easy to see that you may eventually exhaust the 10-digit integer limit (2^31-1) in a busy queue. You can deal with this limitation in different ways.
For example, if the grid runs idle from time to time, an actor can delete and recreate its mailbox after the sequence number reaches a certain threshold. This will reset the sequence number to zero. Another solution would be to use different queues and a signal that indicates to senders which one is active. One might also consider implementing a mailbox as a fixed-size ring buffer with no dependency on sequence numbers; however, before sending a new message, senders would have to determine which znodes are free and which have yet to be processed.
In any case, setting an upper bound on the number of messages in a mailbox is a good idea. Otherwise, a group of senders could easily overwhelm a receiver. One way to do this is to have the message dispatcher invoke getData() on the queue’s znode before sending a message. The returned value could specify the queue's message limit, and the dispatcher could retrieve the number of messages via getNumChildren() on the znode’s stat object and compare it with the limit. If the number of children had already reached the limit, the dispatcher would back off, and the sender would try again after some delay. Again, we trade off a little throughput for higher resilience with a simple backpressure mechanism.
Optimizing for short messages
We haven’t discussed how to store the message payload, and though it may be obvious that each znode in the queue can have data representing payload, I should highlight an optimization that works for very small messages. If your message can be encoded in a short string, say less than 50 characters, you can store it as a prefix in the znode name. On the receiving end, the messages can then be retrieved with a single call to getChildren() without extra calls to getData() on each child.
By now you've probably concluded that ZooKeeper doesn't really provide an optimal foundation for messaging in general. I readily admit that your assessment is correct. If you’re dealing with a large grid where, say, tens of millions of messages are exchanged per day, or the message payloads are large, look for another solution. Also keep in mind that ZooKeeper doesn't scale horizontally. It was designed as a highly available coordination service with very specific guarantees that limit its scalability. So if you're building a massive grid, consider partitioning into autonomous cells and coordinate locally as much as possible. Global coordination, especially across data centers, requires special treatment.
The Nebula grid is relatively small (about 100 virtual hosts), and we usually have no more than 10 concurrent jobs running on any given host, so we are coordinating the progress of at most 1000 jobs at a time. For a grid of this size and kind of work load, our actor-based scheduling framework performs very well. The overhead in scheduling usually amounts to less than 5% of overall processing. We run ZooKeeper out of the box on an ensemble of five basic servers, and we use a database as an archive, where the state of completed jobs can be looked up if necessary. A simple REST service is provided for job submission and monitoring.