I was writing some code this afternoon to better understand WCF extension, and had a classic producer-consumer problem. In this case the producer was a networking library, and the consumer was the WCF client application calling Receive or a similar method. On the client side, a user of WCF could call Receive, and if a message had already come in and been queued up, I wanted to return it immediately. Otherwise it would wait for the next message to arrive, then return it.
My initial approach was to keep a queue of messages, and signal an event whenever a new message arrived on the wire. Then the Receive method could check the queue, and if it was empty, block on the event. After a few hundred lines of C# code, this pattern was working for 3-4 different methods, including their BeginFoo/EndFoo asynchronous versions. Thankfully, I then realized there was a much simpler way - invert the problem, by keeping the waiting methods in the queue rather than the messages.
Not only was this approach more efficient, since I did not need to keep any threads around to wait on the event, it was also much simpler, which will probably significantly reduce the number of threading issues. It probably wouldn't make much difference if I only had synchronous methods, but since I was also implementing asynchronous versions, it made a significant difference.
The interesting methods for the queued message version looked something like this:
public void Produce(string message)
{
lock (protect)
{
messages.Enqueue(message);
messageWaitingEvent.Set();
}
}
public string Consume(TimeSpan timeout)
{
DateTime absoluteTimeout = DateTime.Now + timeout;
do
{
if (messageWaitingEvent.WaitOne(timeout))
{
lock (protect)
{
try
{
return messages.Dequeue();
}
catch (InvalidOperationException)
{
// the queue is actually empty
messageWaitingEvent.Reset();
}
}
}
timeout = absoluteTimeout - DateTime.Now;
} while (timeout > TimeSpan.Zero);
throw new TimeoutException();
}
Most of the complexity is in the consumer; the producer is very simple in this approach. The asynchronous methods were implemented with a delegate on the synchronous Consume method. Very simple, but not especially elegant:
private Func<TimeSpan, string> asyncConsume = new Func<TimeSpan, string>(Consume);
public IAsyncResult BeginConsume(TimeSpan timeout, AsyncCallback callback, object state)
{
return asyncConsume.BeginInvoke(timeout, callback, state);
}
public string EndConsume(IAsyncResult result)
{
return asyncConsume.EndInvoke(result);
}
I cheated slightly above to keep the sample short - the initialization of asyncConsume must be handled in the constructor, not a field initializer. I'll post the method-queuing solution next.
-randy
Comments