One of the most powerful tools in programming is the Job Queue. It’s a simple concept that stands in the core of many software solutions. It’s also a pretty interesting programming challenge, especially in a versatile language like C#.

Sometimes referred to as the Producer/Consumer pattern, the Job Queue means placing a Job of some kind in a Queue, which will be executed asynchronously in a First-In-First-Out (FIFO) order. The jobs will be Queued (produced) by one entity (thread, process, service), and executed (consumed) by another entity (thread, process, service).

For example, when a customer subscribes to your site, you might want to send them an email. Instead of sending an email during the “Subscribe” request, you can place it in a Queue to be executed at a later time and resolve the request immediately. Meanwhile, this Queue is processed by another thread in the background.

We’ll see a bunch of Job Queue implementations, including usages of .NET 4 thread-safe collections, Reactive Extensions, and TPL Dataflow. We’ll also see when and why you should use an asynchronous job queue, and common customizations to such a Queue.

When to consider using a Job Queue?

Some of the more common reasons are:

  • When you need to level out peak time pressure. For example, when you have launched a new product and have tons of orders placed in a short time. With a queue, you can process them at a slower pace (that means without crashing your server).
  • When your jobs need to be locked. In other words, only one job of this type can be executed at a single time.
  • When the jobs need to be executed in a specific order. It can be a customized order with priorities of some kind.
  • When you want to back up the jobs in case your program crashes or hangs.
  • When you want to spread your jobs across several entities (threads, processes, services).

As for when not to use a Job Queue: Whenever you need the job to execute synchronously. For example, if you need the result of the job’s execution.

Using Message Brokers

Instead of managing the Queue yourself, you can use a 3rd party message broker that will store and handle your queue. Some of the more popular ones are:

  • Azure Service Bus – Azure’s message broker solution. It has all of the expected advanced features like Publishers and Subscribers, Topics, Message Sessions, Auto-Forwarding and more.
  • Azure Queue – A simple Queue implementation on Aure based on Azure Storage.
  • Apache Kafka – A famous open source message broker with advanced pub/sub message queue.
  • Rabbit MQ – A very popular open source message broker. It combines user-friendly UI, good documentation, highly efficient and features packed.

These should be considered instead of an in-process queue if:

  • Your queue takes a lot of memory and it’s not reasonable to keep it in your process memory.
  • The queue is used by more than one process.
  • You don’t want to lose the queued jobs in case your process crashes.
  • You want some kind of advanced queue functionality these 3rd party queues provide.

In this article, we’ll mostly focus on creating an effective in-process job queue in C#.

Creating your own Job Queue in C#

Let’s go over the requirements again:

  1. We need a Queue-Job object that implements just the method Enqueue.
  2. Each job will execute as soon as possible
  3. The jobs will execute one after the other in a FIFO order
  4. The jobs will execute in a background thread

For example’s sake, our Job Queue will just write strings to Console.

Let’s start with the simplest most naive implementation:

public class NaiveQueue
{
    private List<string> _jobs = new List<string>();

    public NaiveQueue()
    {
        Task.Run(() => { OnStart(); });
    }

    public void Enqueue(object job)
    {
        _jobs.Add(job);
    }

    private void OnStart()
    {
        while (true)
        {
            if (_jobs.Count > 0)
            {
                var job = _jobs.First();
                _jobs.RemoveAt(0);
                Console.WriteLine(job);
            }
        }
    }
}</string></string>

As you can see, this is a simple Queue that writes to Console text payloads (our jobs) as they come. This implementation has some good things going for it, but it also has several problems.

Let’s talk about the good stuff first. One good thing I can say about this is that it does what we wanted it to do. It will run on its own Thread, pulling jobs in a FIFO order and executing them (writing to console in our case).

Now for the problems:

  1. This queue is not thread-safe. That’s because we’re using List<T>, which is not a thread-safe collection. Since we’re using at least 2 threads (to Enqueue and to Dequeue), bad things will happen.
  2. The List<T> collection will provide terrible performance for this usage. It’s using a vector under the hood, which is essentially a dynamic size array. An array is great for direct access operations, but not so great for adding and removing items.
  3. We are using a thread-pool thread (with Task.Run) for a thread that’s supposed to be alive during entire application lifecycle. The rule of thumb is to use a regular Thread for long-running threads and pooled threads (thread-pool threads) for short running threads. Alternatively, we can change the Task’s creation options to TaskCreationOptions.LongRunning .

Let’s try to solve all 3 of those problems in the next implementation.

A bit better implementation

Learning from the problems in the previous implementation, here’s the new one:

public class BitBetterQueue
{
    private ConcurrentQueue<object> _jobs = new ConcurrentQueue<object>();

    public BitBetterQueue()
    {
        var thread = new Thread(new ThreadStart(OnStart));
        thread.IsBackground = true;
        thread.Start();
    }

    public void Enqueue(object job)
    {
        _jobs.Enqueue(job);
    }

    private void OnStart()
    {
        while (true)
        {
            if (_jobs.TryDequeue(out object result))
            {
                Console.WriteLine(result);
            }
        }
    }
}</object></object>

.NET Framework 4 introduced ConcurrentQueue, which is exactly the data structure we need. It’s thread-safe and also optimized for Queue’s Enqueue and Dequeue operations.

Let’s try to make this a bit better though.

BlockingCollection for the win

We can make that last implementation even nicer by utilizing another class introduced along with ConcurrentQueue – BlockingCollection. This class is optimized for the Producer/Consumer pattern:

public class BlockingCollectionQueue
{
    private BlockingCollection<object> _jobs = new BlockingCollection<object>();

    public BlockingCollectionQueue()
    {
        var thread = new Thread(new ThreadStart(OnStart));
        thread.IsBackground = true;
        thread.Start();
    }

    public void Enqueue(object job)
    {
        _jobs.Add(job);
    }

    private void OnStart()
    {
        foreach (var job in _jobs.GetConsumingEnumerable(CancellationToken.None))
        {
            Console.WriteLine(job);
        }
    }
}</object></object>

BlockingCollection provides blocking and upper-bound capabilities for IProducerConsumer collections.

Let’s break that sentence down, starting with IProducerConsumer collections. This interface includes TryAdd and TryTake methods and is meant to implement collections like Queue or Stack, which mostly Produce and Consume items.ConcurrentQueue implements IProducerConsumer and is also the default implementation of BlockingCollection. This means that the following are the same:

BlockingCollection<object> _jobs = new BlockingCollection<object>();
    BlockingCollection<object> _jobs = new BlockingCollection<object>(new ConcurrentQueue<object>());</object></object></object></object></object>

The next thing BlockingCollection does is provide Blocking capabilities. This is represented in the GetConsumingEnumerable method. When called, it will either Take the next item in the collection or Block until such an item exists. In other words, it will stop the thread until a new item is added to the collection. With this method, we don’t have to write that annoying infinite loop while(true){}.

Finally, BlockingCollection can provide an upper-bound to the collection. This simply means in our case that we can limit the queue to a certain amount of maximum items.

Implementing a Job Queue without a dedicated Thread

There’s a chance that something was nagging at you with all the above implementations. Why does a Queue need a dedicated thread? In some scenarios, the Queue will be empty most of the time so it might make more sense to use a temporary pooled thread.

You can do exactly that with this implementation:

public class NoDedicatedThreadQueue
{
    private Queue<string> _jobs = new Queue<string>();
    private bool _delegateQueuedOrRunning = false;

    public void Enqueue(string job)
    {
        lock (_jobs)
        {
            _jobs.Enqueue(job);
            if (!_delegateQueuedOrRunning)
            {
                _delegateQueuedOrRunning = true;
                ThreadPool.UnsafeQueueUserWorkItem(ProcessQueuedItems, null);
            }
        }
    }

    private void ProcessQueuedItems(object ignored)
    {
        while (true)
        {
            string item;
            lock (_jobs)
            {
                if (_jobs.Count == 0)
                {
                    _delegateQueuedOrRunning = false;
                    break;
                }

                item = _jobs.Dequeue();
            }

            try
            {
                //do job
                Console.WriteLine(item);
            }
            catch
            {
                ThreadPool.UnsafeQueueUserWorkItem(ProcessQueuedItems, null);
                throw;
            }
        }
    }
}</string></string>

This is actually a simplified implementation from Stephen Toub’s article .
Here’s how it works:

When first queuing a job, a pooled thread is created. It will iterate over all jobs (just 1 at first) and execute them in order.

When additional jobs are queued, they are added to the Queue collection. When the pooled thread finished the first job, it will dequeue the next job and execute them until the queue is empty.

When all jobs are finished, the thread exits. On the next Enqueue, a job will be enqueued and if the pooled thread exited, it will spawn up a new one and start the execution.

The lock exists to ensure a pooled thread is created just when needed and that there is a single one at a time.

Note that this is a great implementation, but it’s not necessarily better than the one with BlockingCollection. It’s a matter of your program’s requirements. If your queue is going to be working most of the time anyway, it’s better to create a dedicated thread for it.

Job Queue Customizations

We saw 2 decent Job Queue implementations. One with a dedicated thread using BlockingCollection. The second with a pooled-thread that’s used on demand.

I’m not done with Job Queue implementations yet, but before going forward with those (in Part 2 ), I want to talk about customization.

It’s very likely that those exact implementations won’t fit you and you will need some kind of customized variation of that. Let’s go over some common use cases and see how to approach them.

1. Priority Queue

You might want some more complicated logic to your Queue. For example, when building software for an emergency call center, you might want to give priority to life-threatening situations.

You will need to implement your own Queue. If using BlockingCollection, you’ll need to implement the IProducerConsumer interface and give it as a parameter in BlockingCollection’s constructor:

BlockingCollection<object> _jobs = new BlockingCollection<object>(new MyPriorityQueue<object>());</object></object></object>

In the second implementation, you’ll have to replace the Queue object. This can actually also be an IProducerConsumer implementation.

Here’s an example of a C# priority queue implementation.

2. Interacting with an External Queue / Message Broker

If you choose to have your Queue stored in an external queue like Azure Queue or Rabbit MQ, you can stick to the given implementation as well. Like with a priority queue, you’ll have to implement the IProducerConsumer interface. Something like this:

public class AzureQueueProducerConsumer<t> : IProducerConsumerCollection<t>
{
    public bool TryAdd(T item)
    {
        // go to Azure Queue and add item
    }

        public bool TryTake(out T item)
    {
        // go to Azure Queue and take item
    }
    //...
}</t></t>
BlockingCollection<object> _jobs = new BlockingCollection<object>(new AzureQueueProducerConsumer<object>());</object></object></object>

3. Have your queue handled by more than one thread

For performance reasons, you might want to have several threads to handle the queue. This raises the question whether to use dedicated threads or pooled threads.

When using dedicated threads, BlockingCollection makes this really simple:

public class MultiThreadQueue
{
    BlockingCollection<string> _jobs = new BlockingCollection<string>();

    public MultiThreadQueue(int numThreads)
    {
        for (int i = 0; i < numThreads; i++)
        {
            var thread = new Thread(OnHandlerStart)
                { IsBackground = true };//Mark 'false' if you want to prevent program exit until jobs finish
            thread.Start();
        }
    }

    public void Enqueue(string job)
    {
        if (!_jobs.IsAddingCompleted)
        {
            _jobs.Add(job);
        }
    }

    public void Stop()
    {
        //This will cause '_jobs.GetConsumingEnumerable' to stop blocking and exit when it's empty
        _jobs.CompleteAdding();
    }

    private void OnHandlerStart()
    {
        foreach (var job in _jobs.GetConsumingEnumerable(CancellationToken.None))
        {
            Console.WriteLine(job);
            Thread.Sleep(10);
        }
    }
}</string></string>

As you can see, we can use GetConsumingEnumerable on the same collection from different threads and it works perfectly (Thanks Jon Skeet ).

4. Publisher / Subscriber Queue

A pretty common scenario is a pub/sub system for your Queue. The idea is that each job will have a Type and different handler(s) will be able to subscribe to each job type. So when a job is up for execution, we will look for a handler(s) that registered for the job’s type, and execute them with the job as a parameter.

I’ll probably show an example of this in Part 2.

Summary

We saw a couple of decent in-process Job Queue implementations in C# .NET. One with a dedicated Thread and the other that creates pooled threads as required. I’ll admit that I enjoyed writing them way too much.

In the next part - more stuff about Queues. We’ll walk about some important concepts like persistancy, failure handling, and poison queue. I’ll show you a couple of additional implementations, including one that might surprise you. Subscribe to the blog to get updated on the next part .

If you got some interesting insight about Queues, or the above implementations, leave some feedback in the comments below. Cheers.