Friday, 25 April 2008

A C# BlockingQueue

Ok, things didn't go as I planned, so I'll stop promising and start delivering. This time I'll share with you a code I've used before.

The problem is quite simple and is not new: exchange messages between threads. One possible solution is message queuing, but it's too bulky to use inside a single process. The simplest way to make threads communicate is through shared variables, but it's obviously too simple.

So, what kind of feature I was looking for? To begin with, it needed to be fast and light. Secondly, it had to work in a fire-and-forget fashion, i.e., I wanted a thread to be able to send a message (or task, job, whatever) to another thread and go on. Obviously, in order to such a thing to function, it would have to use some sort of internal buffer. So, it looked like what I needed was a thread-safe FIFO queue.

All right, I could simply use a Queue<t> and wrap its calls around a lock statement. Or, even easier, use a non-generic Queue and transform if by calling the Queue.Synchronized() method. In both ways, we'd get a simply synchronized thread-safe queue.

Is that all? Of course not. Suppose we have a producer-consumer problem to solve. Usually, when the buffer is empty, the consumer is supposed to idly wait for an new item. How could we accomplish that using a synched queue? If we try to dequeue from an empty Queue, a exception is thrown. We could check the item count before dequeueing, but there would be two steps involved and it wouldn't be thread-safe even if the Queue instance is synched. Another possibility is catching the queue-is-empty exception, which is a very lousy trick. Anyway, we wouldn't be able to avoid writing synchronization code and this is not what I wanted.

So, considering that you are introduced to the problem, let me show the wish list that I gathered in the form of an interface:

public interface IBlockingQueue
/// <summary>Max number of item in the queue (-1 means there's no limit)</summary>
int MaxQueueDepth { get; }

/// <summary>Returns false when there's something wrong, e.g., queue full</summary>
bool TryEnqueue(object item);

/// <summary>Returns false when there's no message to read</summary>
bool TryDequeue(out object item);

/// <summary>Returns false when a timeout or an abort happens</summary>
bool WaitEnqueue(object item, int millisecondsTimeout, FuncGetVolatileFlag ExternalAbortSignaled);

/// <summary>Returns false when a timeout or an abort happens</summary>
bool WaitDequeue(out object item, int millisecondsTimeout, FuncGetVolatileFlag ExternalAbortSignaled);

public delegate bool FuncGetVolatileFlag();

TryDequeue() and TryEnqueue are two simply synchronized methods to access the queue. The other two (WaitDequeue() and WaitEnqueue()) are much more interesting. The first one will try to dequeue and, if the queue is empty, will wait for a new item. The latter will try to enqueue and, if the queue is full, will wait until there's room for the new item.

But that's still not enough. A common problem in a multithreaded application is when you need to close it and there are some threads still running. There's several possible approaches on how to close them. The safer choice might be to wait for every thread to end, but it might take too long. One alternative is to terminate everyone, but it's very risky. I think the best pick is to ask somehow the threads to stop and wait until they do so. With my blocking queue, you can choose between any of the 3.

Let me explain how. As you can see, I've put two addicional parameters. Both have the same purpose: cancel the action and return the code execution to the client code. Whilst the fisrt is obviously a timeout, one might wonder what that other parameter is. It's a delegate of a type that accepts functions that like this:bool func(). Its name is ExternalAbortSignaled and its type name is FuncGetVolatileFlag, so I hope it's not hard to recognize how it's used.

In order to eliminate any doubts, I'll describe it. The idea is that, while waiting, the ExternalAbortSignaled delegate will be called in short intervals. If it returns true at any time, the waiting ends and no item is enqueued/dequeued. Also the function returns false. In that way, it's possible to provide the queue an anonymous method that merely returns the value of a boolean flag. Whenever the flag is set to true, the wait function will return promptly and the application as a whole will be able to respond faster.

As you may know, java has a built-in BlockingQueue already. Sadly, java has direct support for delegates. So, I think it would be hard to code a functionality like an abort flag in an elegant way.

Now, if you want to know what is the best way to implement the waiting synchronization code, I'll let the explanation to Joseph, which is much more eloquent than me. It might be a good idea to take a look at my source code too. The technique that I've chosen is through Wait and Pulse.

Some ideas for future enhancements that I've thought of are:
- TryPeek() and WaitPeek()
- Transactions

Source code download link at google code

[Updated at 2008-04-28]


Leandro Fernandez said...

Great post JP!
Don't forget your readers and keep delighting us with your knowledge.


jpbochi said...


but no worshipping, please :P

Anonymous said...

jp: Have you considered what would happen if someone had hundreds of your blocking queue in an app, each executing calls to FuncGetVolatileFlag? It would be a disaster. You should think this through ... having a blocked thread poll for an event is never ever a good idea. Lookup code samples for blocking queues that don't have this behavior.

And btw, I was able to discern the design flaw because of order and reason and logic in the first cause.

jpbochi said...

I'm not sure if I understood your question.

First, different BlockingQueues would not intefere with each other. So there's no way one BlockingQueue would lock a thread that was using another BlockingQueue.

Second, the whole idea of FuncGetVolatileFlag is the volatile part. If should be a function that has no locks. If you use it correctly, no thread would be blocked by calling this function. I've thought about this and that's why the delegate has this name. I don't see it as a design flaw. If you have a [better] alternative solution, I'd be glad to see it.

Third, having hundreds of threads accessing the same shared resource (as a BlockingQueue) is certainly a design issue with the user code. I've tried to make my BlockingQueue have a minumum bloking time as possible and I don't see a way to reduce it.

By the way, I think you should be more careful when saying another developer's code is wrong or badly designed.

jpbochi said...

By the way, I have no idea what you meant by "I was able to discern the design flaw because of order and reason and logic in the first cause."

Anonymous said...

Hi !.
You may , probably very interested to know how one can make real money .
There is no initial capital needed You may begin to get income with as small sum of money as 20-100 dollars.

AimTrust is what you need
The company incorporates an offshore structure with advanced asset management technologies in production and delivery of pipes for oil and gas.

It is based in Panama with affiliates around the world.
Do you want to become an affluent person?
That`s your choice That`s what you desire!

I feel good, I began to get income with the help of this company,
and I invite you to do the same. If it gets down to select a proper partner who uses your savings in a right way - that`s the AimTrust!.
I take now up to 2G every day, and what I started with was a funny sum of 500 bucks!
It`s easy to join , just click this link
and go! Let`s take this option together to feel the smell of real money