C# Job Queue Implementations in Depth – Part 1

How to implement a Queue in C# .NET

One of the most powerful tools in programming is the Job Queue. It’s a simple concept that stands in the core of many software solutions. It’s also a pretty interesting programming challenge, especially in a versatile language like C#.

Sometimes referred to as the Producer/Consumer pattern, the Job Queue means placing a Job of some kind in a Queue, which will be executed asynchronously in a First-In-First-Out (FIFO) order. The jobs will be Queued (produced) by one entity (thread, process, service), and executed (consumed) by another entity (thread, process, service).

For example, when a customer subscribes to your site, you might want to send them an email. Instead of sending an email during the “Subscribe” request, you can place it in a Queue to be executed at a later time and resolve the request immediately. Meanwhile, this Queue is processed by another thread in the background.

We’ll see a bunch of Job Queue implementations, including usages of .NET 4 thread-safe collections, Reactive Extensions, and TPL Dataflow. We’ll also see when and why you should use an asynchronous job queue, and common customizations to such a Queue.

When to consider using a Job Queue?

Some of the more common reasons are:

  • When you need to level out peak time pressure. For example, when you have launched a new product and have tons of orders placed in a short time. With a queue, you can process them at a slower pace (that means without crashing your server).
  • When your jobs need to be locked. In other words, only one job of this type can be executed at a single time.
  • When the jobs need to be executed in a specific order. It can be a customized order with priorities of some kind.
  • When you want to back up the jobs in case your program crashes or hangs.
  • When you want to spread your jobs across several entities (threads, processes, services).

As for when not to use a Job Queue: Whenever you need the job to execute synchronously. For example, if you need the result of the job’s execution.

Using Message Brokers

Instead of managing the Queue yourself, you can use a 3rd party message broker that will store and handle your queue. Some of the more popular ones are:

  • Azure Service Bus – Azure’s message broker solution. It has all of the expected advanced features like Publishers and Subscribers, Topics, Message Sessions, Auto-Forwarding and more.
  • Azure Queue – A simple Queue implementation on Aure based on Azure Storage.
  • Apache Kafka – A famous open source message broker with advanced pub/sub message queue.
  • Rabbit MQ – A very popular open source message broker. It combines user-friendly UI, good documentation, highly efficient and features packed.

These should be considered instead of an in-process queue if:

  • Your queue takes a lot of memory and it’s not reasonable to keep it in your process memory.
  • The queue is used by more than one process.
  • You don’t want to lose the queued jobs in case your process crashes.
  • You want some kind of advanced queue functionality these 3rd party queues provide.

In this article, we’ll mostly focus on creating an effective in-process job queue in C#.

Creating your own Job Queue in C#

Let’s go over the requirements again:

  1. We need a Queue-Job object that implements just the method Enqueue.
  2. Each job will execute as soon as possible
  3. The jobs will execute one after the other in a FIFO order
  4. The jobs will execute in a background thread

For example’s sake, our Job Queue will just write strings to Console.

Let’s start with the simplest most naive implementation:

As you can see, this is a simple Queue that writes to Console text payloads (our jobs) as they come. This implementation has some good things going for it, but it also has several problems.

Let’s talk about the good stuff first. One good thing I can say about this is that it does what we wanted it to do. It will run on its own Thread, pulling jobs in a FIFO order and executing them (writing to console in our case).

Now for the problems:

  1. This queue is not thread-safe. That’s because we’re using List<T>, which is not a thread-safe collection. Since we’re using at least 2 threads (to Enqueue and to Dequeue), bad things will happen.
  2. The List<T> collection will provide terrible performance for this usage. It’s using a vector under the hood, which is essentially a dynamic size array. An array is great for direct access operations, but not so great for adding and removing items.
  3. We are using a thread-pool thread (with Task.Run) for a thread that’s supposed to be alive during entire application lifecycle. The rule of thumb is to use a regular Thread for long-running threads and pooled threads (thread-pool threads) for short running threads. Alternatively, we can change the Task’s creation options to  TaskCreationOptions.LongRunning  .

Let’s try to solve all 3 of those problems in the next implementation.

A bit better implementation

Learning from the problems in the previous implementation, here’s the new one:

.NET Framework 4 introduced ConcurrentQueue, which is exactly the data structure we need. It’s thread-safe and also optimized for Queue’s Enqueue and Dequeue operations.

Let’s try to make this a bit better though.

BlockingCollection for the win

We can make that last implementation even nicer by utilizing another class introduced along with ConcurrentQueue – BlockingCollection. This class is optimized for the Producer/Consumer pattern:

BlockingCollection provides blocking and upper-bound capabilities for IProducerConsumer collections.

Let’s break that sentence down, starting with IProducerConsumer collections. This interface includes TryAdd and TryTake methods and is meant to implement collections like Queue or Stack, which mostly Produce and Consume items.ConcurrentQueue implements IProducerConsumer and is also the default implementation of BlockingCollection. This means that the following are the same:

The next thing BlockingCollection does is provide Blocking capabilities. This is represented in the GetConsumingEnumerable method. When called, it will either Take the next item in the collection or Block until such an item exists. In other words, it will stop the thread until a new item is added to the collection. With this method, we don’t have to write that annoying infinite loop while(true){}.

Finally, BlockingCollection can provide an upper-bound to the collection. This simply means in our case that we can limit the queue to a certain amount of maximum items.

Implementing a Job Queue without a dedicated Thread

There’s a chance that something was nagging at you with all the above implementations. Why does a Queue need a dedicated thread? In some scenarios, the Queue will be empty most of the time so it might make more sense to use a temporary pooled thread.

You can do exactly that with this implementation:

This is actually a simplified implementation from Stephen Toub’s article.
Here’s how it works:

When first queuing a job, a pooled thread is created. It will iterate over all jobs (just 1 at first) and execute them in order.

When additional jobs are queued, they are added to the Queue collection. When the pooled thread finished the first job, it will dequeue the next job and execute them until the queue is empty.

When all jobs are finished, the thread exits. On the next Enqueue, a job will be enqueued and if the pooled thread exited, it will spawn up a new one and start the execution.

The lock exists to ensure a pooled thread is created just when needed and that there is a single one at a time.

Note that this is a great implementation, but it’s not necessarily better than the one with BlockingCollection. It’s a matter of your program’s requirements. If your queue is going to be working most of the time anyway, it’s better to create a dedicated thread for it.

Job Queue Customizations

We saw 2 decent Job Queue implementations. One with a dedicated thread using BlockingCollection. The second with a pooled-thread that’s used on demand.

I’m not done with Job Queue implementations yet, but before going forward with those (in Part 2), I want to talk about customization.

It’s very likely that those exact implementations won’t fit you and you will need some kind of customized variation of that. Let’s go over some common use cases and see how to approach them.

1. Priority Queue

You might want some more complicated logic to your Queue. For example, when building software for an emergency call center, you might want to give priority to life-threatening situations.

You will need to implement your own Queue. If using BlockingCollection, you’ll need to implement the IProducerConsumer interface and give it as a parameter in BlockingCollection’s constructor:

In the second implementation, you’ll have to replace the Queue object. This can actually also be an IProducerConsumer implementation.

Here’s an example of a C# priority queue implementation.

2. Interacting with an External Queue / Message Broker

If you choose to have your Queue stored in an external queue like Azure Queue or Rabbit MQ, you can stick to the given implementation as well. Like with a priority queue, you’ll have to implement the IProducerConsumer interface. Something like this:

3. Have your queue handled by more than one thread

For performance reasons, you might want to have several threads to handle the queue. This raises the question whether to use dedicated threads or pooled threads.

When using dedicated threads, BlockingCollection makes this really simple:

As you can see, we can use GetConsumingEnumerable on the same collection from different threads and it works perfectly (Thanks Jon Skeet).

4. Publisher / Subscriber Queue

A pretty common scenario is a pub/sub system for your Queue. The idea is that each job will have a Type and different handler(s) will be able to subscribe to each job type. So when a job is up for execution, we will look for a handler(s) that registered for the job’s type, and execute them with the job as a parameter.

I’ll probably show an example of this in Part 2.

Summary

We saw a couple of decent in-process Job Queue implementations in C# .NET. One with a dedicated Thread and the other that creates pooled threads as required. I’ll admit that I enjoyed writing them way too much.

In the next part – more stuff about Queues. We’ll walk about some important concepts like persistancy, failure handling, and poison queue. I’ll show you a couple of additional implementations, including one that might surprise you. Subscribe to the blog to get updated on the next part.

If you got some interesting insight about Queues, or the above implementations, leave some feedback in the comments below. Cheers.

Share:

Enjoy the blog? I would love you to subscribe! Performance Optimizations in C#: 10 Best Practices (exclusive article)

Want to become an expert problem fixer? Check out a chapter from my book Practical Debugging for .NET Developers

25 thoughts on “C# Job Queue Implementations in Depth – Part 1”

      1. Hi Michael, I really like the way you explain the job queue. I have a scenario where I implemented a job queue with the class, when a job fail, it keeps on retrying it over and over again like 4-5 time until it stops. I would like to find a way to stop any job that failed and remove it from the queue right the way and proceed with the next one. Also I would like to put a delay on every job in the that so that every job start like a min or 2 one after the other. Can you please me with this or does anyone know how to do that?

    1. Sure, it’s because I want to control the number of threads that are handling the jobs.
      Otherwise, for a big queue and long-operation Jobs, you will very quickly flood the thread pool, which will impact your app performance.

  1. An excellent read! I just happen to be implementing a simple non-critical messaging & emailing service in my app, and this is just what I’m looking for. Thanks!

  2. Ehsan Mirsaeedi

    Really good job.
    Here you’ve put the Consumer logic inside your queue implementation. It would be great if you show us how it’s possible to separate the consuming logic and to have the Queue just as a mediator between producer and consumer.

  3. Good post. Maybe in a future part you could cover the new System.Threading.Channels API which is also useful for these patterns?

      1. Channels easily implement the BlockingCollection/Pub-Sub idiom while being fully async (no wasted blocked thread). It’s more performant than DataFlow. ConcurrentQueue doesn’t have an async Read, so I also use Channels instead of the ConcurrentQueue now. Other than this omission, nice article!

  4. >Sometimes referred to as the Producer/Consumer pattern,

    Why sometimes? I thinks it’s a well-established name of this pattern.

    > Which will be executed asynchronously in a First-In-First-Out (FIFO) order

    If consumer guarantees the order, what is not always the case. May be a part of the contract, or maybe not. And being FIFO per se is not cruical to the pattern itself, following the contact is.

  5. Really interesting post. I am currently learning a lot of stuff about microservices, queues (especially with rabbitmq and C#) and this is just what I needed. Thanks for this.

  6. Very nice!
    I have an perhaps unusual question? What if I want to enqueue async functions? Eg. a webrequest or an other IO job? Is there a way to execute await async lambras in OnHandlerStart?

    1. Thanks,
      Sure, your job can be a Task and you can await it, it shouldn’t be a problem.
      It shouldn’t be that difficult to refactor and change OnHandleStart to be async.
      Alternatively, you don’t have to await the function, depending on your needs. Just start the task and go on to the next job.

      Look at one my latest articles “Performance Showdown of Job Queue Implementations” where the job is a delegate, it’s similar.

  7. Thanks for writing this series of articles. You’ve done a wonderful job of explaining this stuff. Great work fella! 🙂

Comments are closed.