Friday 25 April 2008

A C# BlockingQueue

Ok, things didn't go as I planned, so I'll stop promising and start delivering. This time I'll share with you a code I've used before.

The problem is quite simple and is not new: exchange messages between threads. One possible solution is message queuing, but it's too bulky to use inside a single process. The simplest way to make threads communicate is through shared variables, but it's obviously too simple.

So, what kind of feature I was looking for? To begin with, it needed to be fast and light. Secondly, it had to work in a fire-and-forget fashion, i.e., I wanted a thread to be able to send a message (or task, job, whatever) to another thread and go on. Obviously, in order to such a thing to function, it would have to use some sort of internal buffer. So, it looked like what I needed was a thread-safe FIFO queue.

All right, I could simply use a Queue<t> and wrap its calls around a lock statement. Or, even easier, use a non-generic Queue and transform if by calling the Queue.Synchronized() method. In both ways, we'd get a simply synchronized thread-safe queue.

Is that all? Of course not. Suppose we have a producer-consumer problem to solve. Usually, when the buffer is empty, the consumer is supposed to idly wait for an new item. How could we accomplish that using a synched queue? If we try to dequeue from an empty Queue, a exception is thrown. We could check the item count before dequeueing, but there would be two steps involved and it wouldn't be thread-safe even if the Queue instance is synched. Another possibility is catching the queue-is-empty exception, which is a very lousy trick. Anyway, we wouldn't be able to avoid writing synchronization code and this is not what I wanted.

So, considering that you are introduced to the problem, let me show the wish list that I gathered in the form of an interface:


public interface IBlockingQueue
{
/// <summary>Max number of item in the queue (-1 means there's no limit)</summary>
int MaxQueueDepth { get; }

/// <summary>Returns false when there's something wrong, e.g., queue full</summary>
bool TryEnqueue(object item);

/// <summary>Returns false when there's no message to read</summary>
bool TryDequeue(out object item);

/// <summary>Returns false when a timeout or an abort happens</summary>
bool WaitEnqueue(object item, int millisecondsTimeout, FuncGetVolatileFlag ExternalAbortSignaled);

/// <summary>Returns false when a timeout or an abort happens</summary>
bool WaitDequeue(out object item, int millisecondsTimeout, FuncGetVolatileFlag ExternalAbortSignaled);
}

public delegate bool FuncGetVolatileFlag();

TryDequeue() and TryEnqueue are two simply synchronized methods to access the queue. The other two (WaitDequeue() and WaitEnqueue()) are much more interesting. The first one will try to dequeue and, if the queue is empty, will wait for a new item. The latter will try to enqueue and, if the queue is full, will wait until there's room for the new item.

But that's still not enough. A common problem in a multithreaded application is when you need to close it and there are some threads still running. There's several possible approaches on how to close them. The safer choice might be to wait for every thread to end, but it might take too long. One alternative is to terminate everyone, but it's very risky. I think the best pick is to ask somehow the threads to stop and wait until they do so. With my blocking queue, you can choose between any of the 3.

Let me explain how. As you can see, I've put two addicional parameters. Both have the same purpose: cancel the action and return the code execution to the client code. Whilst the fisrt is obviously a timeout, one might wonder what that other parameter is. It's a delegate of a type that accepts functions that like this:bool func(). Its name is ExternalAbortSignaled and its type name is FuncGetVolatileFlag, so I hope it's not hard to recognize how it's used.

In order to eliminate any doubts, I'll describe it. The idea is that, while waiting, the ExternalAbortSignaled delegate will be called in short intervals. If it returns true at any time, the waiting ends and no item is enqueued/dequeued. Also the function returns false. In that way, it's possible to provide the queue an anonymous method that merely returns the value of a boolean flag. Whenever the flag is set to true, the wait function will return promptly and the application as a whole will be able to respond faster.

As you may know, java has a built-in BlockingQueue already. Sadly, java has direct support for delegates. So, I think it would be hard to code a functionality like an abort flag in an elegant way.

Now, if you want to know what is the best way to implement the waiting synchronization code, I'll let the explanation to Joseph, which is much more eloquent than me. It might be a good idea to take a look at my source code too. The technique that I've chosen is through Wait and Pulse.

Some ideas for future enhancements that I've thought of are:
- TryPeek() and WaitPeek()
- Transactions

Source code download link at google code

[Updated at 2008-04-28]