I recently wrote 3 blog posts ([1] [2] [3] ) on different Producer/Consumer (Job Queues) implementations. There are a lot of great different ways to implement Job Queues in C#, but which to choose? Which one is better, faster and more versatile?

In this article, I want to get to the point where you can make a confident decision on which implementation to choose. That means checking performance and comparing customization options.

The implementation we covered were:

  • Blocking collection Queue (Part 1 )
  • Thread-pool on demand (aka no-dedicated-thread-queue) (Part 1 )
  • System.Thread.Channels (Part 2 )
  • Reactive Extensions (Part 2 )
  • TPL Dataflow (Part 3 )

And we’re going to do the following tests:

  • Compare performance of single job to completion
  • Compare performance of 100,000 jobs-to-completion
  • Compare available customizations

To make matters simple, I’ll use a basic implementation of each type, with a single thread handling the jobs.

The Code

This code is for the simplest implementation of each type:

BlockingCollection Queue:

public class BlockingCollectionQueue : IJobQueue<action>
{
    private BlockingCollection<action> _jobs = new BlockingCollection<action>();

    public BlockingCollectionQueue()
    {
        var thread = new Thread(new ThreadStart(OnStart));
        thread.IsBackground = true;
        thread.Start();
    }

    public void Enqueue(Action job)
    {
        _jobs.Add(job);
    }

    private void OnStart()
    {
        foreach (var job in _jobs.GetConsumingEnumerable(CancellationToken.None))
        {
            job.Invoke();
        }
    }

    public void Stop()
    {
        _jobs.CompleteAdding();
    }
}</action></action></action>

Thread-pool on demand (aka no-dedicated-thread-queue):

public class NoDedicatedThreadQueue : IJobQueue<action>
{
    private Queue<action> _jobs = new Queue<action>();
    private bool _delegateQueuedOrRunning = false;

    public void Enqueue(Action job)
    {
        lock (_jobs)
        {
            _jobs.Enqueue(job);
            if (!_delegateQueuedOrRunning)
            {
                _delegateQueuedOrRunning = true;
                ThreadPool.UnsafeQueueUserWorkItem(ProcessQueuedItems, null);
            }
        }
    }

    private void ProcessQueuedItems(object ignored)
    {
        while (true)
        {
            Action job;
            lock (_jobs)
            {
                if (_jobs.Count == 0)
                {
                    _delegateQueuedOrRunning = false;
                    break;
                }

                job = _jobs.Dequeue();
            }

            try
            {
                job.Invoke();
            }
            catch
            {
                ThreadPool.UnsafeQueueUserWorkItem(ProcessQueuedItems, null);
                throw;
            }
        }
    }
    public void Stop()
    {
    }
}</action></action></action>

Reactive Extensions (Rx):

public class RxQueue : IJobQueue<action>
{
    Subject<action> _jobs = new Subject<action>();

    public RxQueue()
    {
        _jobs.ObserveOn(Scheduler.Default)
            .Subscribe(job => { job.Invoke(); });
    }

    public void Enqueue(Action job)
    {
        _jobs.OnNext(job);
    }

    public void Stop()
    {
        _jobs.Dispose();
    }
}</action></action></action>

System.Threading.Channels Queue:

public class ChannelsQueue : IJobQueue<Action>
{
    private ChannelWriter<Action> _writer;

    public ChannelsQueue()
    {
        var channel = Channel.CreateUnbounded<Action>(new UnboundedChannelOptions() { SingleReader = true });
        var reader = channel.Reader;
        _writer = channel.Writer;

        Task.Run(async () =>
            {
                while (await reader.WaitToReadAsync())
                {
                    // Fast loop around available jobs
                    while (reader.TryRead(out var job))
                    {
                        job.Invoke();
                    }
                }
            });
    }

    public void Enqueue(Action job)
    {
        _writer.TryWrite(job);
    }

    public void Stop()
    {
        _writer.Complete();
    }
}

TPL Dataflow Queue:

public class TPLDataflowQueue : IJobQueue<action>
{
    private ActionBlock<action> _jobs;

    public TPLDataflowQueue()
    {
        _jobs = new ActionBlock<action>((job) =>
        {
            job.Invoke();
        });
    }

    public void Enqueue(Action job)
    {
        _jobs.Post(job);
    }

    public void Stop()
    {
        _jobs.Complete();
    }
}</action></action></action>

First Benchmark: Time to getting a single job done

The first thing I want to measure is initializing the Job Queue, enqueuing one job, wait for it to finish, and complete the queue. It’s easy to do with the following code:

public class SingleJobBenchmark
{
    private AutoResetEvent _autoResetEvent;

    public SingleJob()
    {
        _autoResetEvent = new AutoResetEvent(false);
    }

    [Benchmark]
    public void BlockingCollectionQueue()
    {
        DoOneJob(new BlockingCollectionQueue());
    }
    [Benchmark]
    public void NoDedicatedThreadQueue()
    {
        DoOneJob(new NoDedicatedThreadQueue());
    }
    [Benchmark]
    public void RxQueue()
    {
        DoOneJob(new RxQueue());
    }
    [Benchmark]
    public void ChannelsQueue()
    {
        DoOneJob(new ChannelsQueue());
    }
    [Benchmark]
    public void TPLDataflowQueue()
    {
        DoOneJob(new TPLDataflowQueue());
    }

    private void DoOneJob(IJobQueue<action> jobQueue)
    {
        jobQueue.Enqueue(() => _autoResetEvent.Set());
        _autoResetEvent.WaitOne();
        jobQueue.Stop();
    }
}</action>
For all Benchmarks, I use the excellent BenchmarkDotNet library. My PC is: Intel Core i7-7700HQ CPU 2.80GHz (Kaby Lake), 1 CPU, 8 logical and 4 physical cores. The host is .NET Framework 4.7.2 (CLR 4.0.30319.42000), 32bit LegacyJIT-v4.8.3745.0.

The last method DoOneJob is the interesting one. I use an AutoResetEvent to signal the job was done and stop the job queue.

The results are:

Method Mean Error StdDev
BlockingCollectionQueue 215.295 us 4.1643 us 5.4148 us
NoDedicatedThreadQueue 7.536 us 0.1458 us 0.1432 us
RxQueue 204.700 us 4.0370 us 5.6594 us
ChannelsQueue 18.655 us 2.0949 us 1.8571 us
TPLDataflowQueue 18.773 us 0.4318 us 1.2730 us
The measuring unit ‘us’ stands for microseconds. 1000 us = 1 millisecond
Thanks to Azik and rendlelabs for correcting my System.Threading.Channels implementation.

As you can see, NoDedicatedThreadQueue is fastest, which is no wonder because it does the bare minimum.

The second and third fastest are TPLDataFlowQueue and System.Threading.Channels, about 12 times faster than the other implementations.

The most important thing to note here is that creating new Job Queues usually happens rarely, maybe once in an application lifespan, so 200 microseconds (1/5 of one millisecond) is not much.

Second Benchmark: Getting 100,000 jobs done

Initialization can happen only once, so the real test is to see if there’s any substantial difference when dealing with high-frequency jobs.

Testing this benchmark can be done in a similar matter as before with the following code:

public class ManyJobsBenchmark
{
    private AutoResetEvent _autoResetEvent;

    public ManyJobsBenchmark()
    {
        _autoResetEvent = new AutoResetEvent(false);
    }

    [Benchmark]
    public void BlockingCollectionQueue()
    {
        DoManyJobs(new BlockingCollectionQueue());
    }
    [Benchmark]
    public void NoDedicatedThreadQueue()
    {
        DoManyJobs(new NoDedicatedThreadQueue());
    }
    [Benchmark]
    public void RxQueue()
    {
        DoManyJobs(new RxQueue());
    }
    [Benchmark]
    public void ChannelsQueue()
    {
        DoManyJobs(new ChannelsQueue());
    }
    [Benchmark]
    public void TPLDataflowQueue()
    {
        DoManyJobs(new TPLDataflowQueue());
    }

    private void DoManyJobs(IJobQueue<action> jobQueue)
    {
        int jobs = 100000;
        for (int i = 0; i < jobs-1; i++)
        {
            jobQueue.Enqueue(() => { });
        }
        jobQueue.Enqueue(() => _autoResetEvent.Set());
        _autoResetEvent.WaitOne();
        jobQueue.Stop();
    }
}</action>

The results for 100,000 jobs were:

Method Mean Error StdDev
BlockingCollectionQueue 23.045 ms 0.5046 ms 0.4473 ms
NoDedicatedThreadQueue 7.770 ms 0.1553 ms 0.1964 ms
RxQueue 10.478 ms 0.2053 ms 0.3430 ms
ChannelsQueue 5.661 ms 0.9099 ms 2.6687 ms
TPLDataflowQueue 6.924 ms 0.1334 ms 0.1310 ms

System.Threading.Channels is in first place with 5.6 milliseconds. TPL Dataflow is (surprisingly) second place with 7.7 milliseconds, gaining on No-Dedicated-Queue by 10%.

BlockingCollection is slowest with 23 milliseconds, 4 times slower than Channels.

In many cases, these performance differences will not matter because the Job Queue time will be negligible in comparison to the job execution time. However, this can be important when you’re dealing with high-frequency short execution jobs.

Showdown Summary

Summing things up from the benchmarks, here’s a visualization:

The fastest overall implementations turned out to be System.Threading.Channels, no-dedicated-thread-queue, and TPL Dataflow.

Performance is not always the most important factor though. Perhaps, more important than speed, each type of implementation allows natively (with relative ease) a bunch of customization you might want for your specific application. Here are some common Job Queue variations:

  • Handling jobs in multiple threads, instead of just one thread
  • Prioritizing jobs
  • Having different handlers for different types of job (publisher/subscriber)
  • Limiting Job Queue capacity (Bound capacity)

You can’t do any customization with any implementation. Not with reasonable effort anyway. That’s why choosing an implementation will always have to be done according to your needs. Here’s a summary on which supports what:

Producer consumer customization table

* Priority Queue is possible by combining with BlockingCollection or by having a finite number of priority levels.

** Publisher/Subscriber is possible by adding a casting wrapper around each Job.

To see how I constructed this table, you can read the original articles (Part 1 , Part 2 , and Part 3 ).

As you can see, there’s no clear winner when it comes to customization. So the decision on which producer/consumer implementation to choose is always “It depends”.

This is it for my Job Queue series, hope you enjoyed it. Any feedback in the comments section is welcome. I’ll probably write similar posts with other patterns like the Pipeline pattern in the near future, so stay tuned . Cheers.