Tags

, ,

Queues are a common part of modern software. Scheduling jobs and actions isn’t just for distributed systems, but for any system dealing with external events or asynchronous processing. It’s easy to grab a library implementation of a queue, but they are often overkill. Here I show how we efficiently implemented a few of the queues in Fuse, using just standard iteration and deferred deletion.

Open ended iteration

Our library has a lot of messages. It follows a standard subscriber pattern: various bits of code request to be notified when certain events happen. As a first code iteration one simply calls the subscribers whenever the event happens. This leads to chaos though. There is no way of knowing what these callbacks do, and often they end up recursing back into the code dispatching the event! This is extremely hard to deal with, so avoid it when possible.

Most messages are now dispatched a bit after the event actually happens. We have a AddDeferredAction function that will process some code once the stack has unwound to a reasonable point (what we call the message loop, as is a common term from other systems, even though it technically isn’t in our case).

The AddDeferredAction is similar to NodeJS’s process.nextTick function, which also defers processing until it returns to it’s message loop.

The trick for a queue here is that deferred actions can be added while we are processing the current list. We have a guarantee that all deferred actions are processed before proceeding to the next stage.

This could be handled by juggling a couple distinct collections and alternating between them, but that’s a bit messy. Instead I just used iteration by index on the collection:

1
2
3
for (var i=0; i < actions.Count; ++i) {
    actions[i].Action();
}

We get so used to using a foreach construct that it’s easy to forget the usefulness of the traditional for loop. The advantage of the above approach is that it doesn’t put any kind of lock, or iteration marker, on the collection. Our AddDeferredAction function can simply add a new action to the list and it will be picked up by this loop.

This part of our code is single-threaded, thus we don’t need any synchronization. The same technique can of course be adapted to a multi-threaded system.

Deferred deletion

Often a component decides it no longer wishes to have an action performed. It needs a way to delete the action from any pending list, such as the deferred list, or our recurring action list. One of the guarantees is that the action will not be called after removal, even if already in that stage of processing. This means the queue must be modified immediately.

Removing an item from a collection would of course break the iteration that I’m doing above: the index would not be stable and we’d end up skipping items. Instead of actually removing them I mark them deleted instead. This involves adding a small structure to the queue, one field is the actual action, and another is a deleted marker.

1
2
3
4
void RemoveAction( Action action ) {
    var q = FindAction(action);
    q.deleted = true;
}

Then I modify the iteration to check this flag:

1
2
3
4
5
for (var i=0; i < actions.Count; ++i) {
    if (!actions[i].deleted) {
        actions[i].Action();
    }
}

After the loop the deleted items can be actually removed to avoid endless list growth.

Simple techniques

These are of course both quite simple techniques; it’s often helpful to consider simplicity instead of going for the complex library approach right away.

Advertisements