In Part 1 and Part 2 we went over what are Job Queues, why they are so important and how to implement them with several methods. Some of those methods were thread-pool implementations, BlockingCollection implementations, Reactive Extensions, and System.Threading.Channels.

Job Queues are also referred to as the Producer-consumer problem . We’ll be adding jobs to the queue (producing) and handling them (consuming) in a First-In-First-Out (FIFO) order. With some variations.

Let’s talk about those variations for a moment. Software development is versatile (thank god), otherwise there wouldn’t be so many of us. Each project is different and requires customization. Some common Job Queue variations might be:

  • Prioritizing jobs
  • Having different handlers for different types of job (publisher-subscriber)
  • Handling jobs in multiple threads
  • Limiting Job Queue capacity
  • Having the queue stored in an external queue like Kafka or RabbitMQ.

In this article, we’ll see how to implement Job Queues with TPL Dataflow, including implementations of several of the said variations. We will dive into the Dataflow mindset along the way, figuring out this awesome library.

And we’re also going to talk about the important concepts of failure handling and the poison queue. This is the part where we decide and implement what’s going to happen when our jobs fail for some reason.

Starting with TPL Dataflow

The TPL Dataflow library allows to create components that communicate with each other. Each component (Block) can send and receive messages from other components. You can control how these messages are sent, including parallelism levels and bounding capacity. Each component has a mind of its own, behaving as an actor-based programming model.

Simple Job Queue

Let’s start with a simple example. A simple job queue where you can Enqueue jobs and handle them in a single thread. For simplicity’s sake, our jobs will be strings and our handler will write them to Console.

To start with TPL Dataflow, you will need to add the System.Threading.Tasks.Dataflow NuGet. Once done, here is our first Job Queue implementation:

public class TPLDataflowQueue
{
    private ActionBlock<string> _jobs;

    public TPLDataflowQueue()
    {
        _jobs = new ActionBlock<string>((job) =>
        {
            Console.WriteLine(job);
        });
    }

    public void Enqueue(string job)
    {
        _jobs.Post(job);
    }
}</string></string>

As you can see, this is as simple as it gets. In fact, the little wrapper class can be removed entirely. A single ActionBlock naturally acts like a full-blown Job Queue with a single dedicated thread.

An ActionBlock is one kind of Block in TPL Dataflow. It acts as an ITargetBlock, so you can send messages to it. But not as an ISourceBlock, so it can’t propagate messages to other blocks. It has the ability to invoke a delegate for each data element received.

Multiple dedicated Threads

By default, ActionBlock will execute on a single thread. However, you might want for performance reasons to have several dedicated threads to handle jobs. With TPL Dataflow, it can be done with a single line of code:

public class TPLDataflowMultipleHandlers
{
    private ActionBlock<string> _jobs;

    public TPLDataflowMultipleHandlers()
    {
        var executionDataflowBlockOptions = new ExecutionDataflowBlockOptions()
        {
            MaxDegreeOfParallelism = 2, 
        };

        _jobs = new ActionBlock<string>((job) =>
        {
            Thread.Sleep(10);
            // following is just for example's sake
            Console.WriteLine(
                $"job:{job}, 
                thread: {Thread.CurrentThread.ManagedThreadId}" );
        }, executionDataflowBlockOptions);
    }

    public void Enqueue(string job)
    {
        _jobs.Post(job);
    }
}</string></string>

Usage:

var q = new TPLDataflowMultipleHandlers();
var numbers = Enumerable.Range(1, 10);
foreach (var num in numbers)
{
    q.Enqueue(num.ToString());
}

The output will be:

job:1, thread: 3
job:2, thread: 5
job:3, thread: 3
job:4, thread: 5
job:5, thread: 3
job:6, thread: 5
job:7, thread: 3
job:8, thread: 5
job:9, thread: 3
job:10, thread: 5

As you can see, with change of the property MaxDegreeOfParallelism there are now 2 dedicated threads to handle jobs.

Multiple Handlers for the same Job

Sometimes, we might want to have several handlers for the same job. For example, when we are implementing a logger that both prints messages to Console and Logs them to file. In this case, we’ll need our job to be posted to two different handlers. For that, we can use TPL Dataflow’s BroadcastBlock which sends any message it receives to all the blocks it is linked to. Here’s the code:

public class TPLDataflowBroadcast
{
    private BroadcastBlock<string> _jobs;

    public TPLDataflowBroadcast()
    {
        // The delegate 'job=>job' allows to transform the job, like Select in LINQ
        _jobs = new BroadcastBlock<string>(job => job);

        var act1 = new ActionBlock<string>((job) =>
        {
            Console.WriteLine(job);
        });
        var act2 = new ActionBlock<string>((job) =>
        {
            LogToFile(job);
        });
        _jobs.LinkTo(act1);
        _jobs.LinkTo(act2);
    }

    private void LogToFile(string job)
    {
        //...
    }

    public void Enqueue(string job)
    {
        _jobs.Post(job);
    }
}</string></string></string></string>

Each time a job is added to the BroadcastBlock it is communicated to both Action Blocks, which run their respective delegate – write to console and log to file.

Now that we are using another type of Block, I’m guessing you are starting to see the awesome power of Dataflow. Keep reading and you’ll become a believer.

Thread behavior in TPL Dataflow ActionBlock

In Dataflow, each ActionBlock handler is executed on at least 1 thread. One thread is the default, but by changing its MaxDegreeOfParallelism property, it can be more. For example:

var act = new ActionBlock<t>(job => {/*..*/ }, new ExecutionDataflowBlockOptions() {MaxDegreeOfParallelism = 3});</t>

The number of threads you might have handling jobs is the addition of all the Action Blocks in play. In the last implementation we had 2 action blocks, each with the default 1 MaxDegreeOfParallelism. So the Job Queue uses 2 thread-pool threads. If each of those action blocks had MaxDegreeOfParallelism of 3, then the Job Queue would use up to 6 threads.

The thread behavior of ActionBlock relies on the default task-scheduler, which simply uses a thread-pool thread. You can actually change the thread behavior, by implementing your own custom TaskScheduler (ActionBlock constructor accepts it in the options). For example, you might want to have all jobs execute in a dedicate thread that run in a FIFO order. Sounds familiar? You got it, you will need to implement a Job Queue for that.

Different Handlers for different Job Types (publisher/subscriber)

A common pattern is to be able for handlers to subscribe to a type of job. For example, in an emergency 911 call center, we might have criminal calls handled by the Police department and fire calls handled by the Fire department. Here’s the implementation:

public interface IJob
{
}

public class TPLDataflowSubscribers
{
    private BroadcastBlock<ijob> _jobs;

    public TPLDataflowSubscribers()
    {
        _jobs = new BroadcastBlock<ijob>(job => job);
    }

    public void RegisterHandler<t>(Action<t> handleAction) where T : IJob
    {
        // We have to have a wrapper to work with IJob instead of T
        Action<ijob> actionWrapper = (job) => handleAction((T)job);

        // create the action block that executes the handler wrapper
        var actionBlock = new ActionBlock<ijob>((job) => actionWrapper(job));

        // Link with Predicate - only if a job is of type T
        _jobs.LinkTo(actionBlock, predicate: (job) => job is T);
    }

    public async Task Enqueue(IJob job)
    {
        await _jobs.SendAsync(job);
    }
}</ijob></ijob></t></t></ijob></ijob>

Usage example:

class CriminalCall : IJob
{
    //...
}
class FireCall : IJob
{
    //...
}

public async Task Start()
{
    var q = new TPLDataflowSubscribers();

    q.RegisterHandler<criminalcall>(j => SendToPolice(j));
    q.RegisterHandler<firecall>(j => SendToFireDpt(j));

    await q.Enqueue(new CriminalCall());
    await q.Enqueue(new CriminalCall());
    await q.Enqueue(new FireCall());
    await q.Enqueue(new CriminalCall());
    await q.Enqueue(new FireCall());
}</firecall></criminalcall>

As you can see, I had to use a wrapper around the handler action (in a similar manner to the Rx pub/sub implementation in Part 2 ). Other than that, I’d say the TPL Dataflow solution is pretty elegant.

The BroadcastBlock that’s used in this solution will send messages to all linked blocks. This means that you can have several handlers for one type of job, all of which will execute. If I were to change BroadcastBlock to BufferBlock , only one handler would execute for each job.

TPL Dataflow completion

For simplicity, up to now I avoided completing the Dataflow Blocks. A good practice is to call .Complete() on all your Blocks once you are done with the Job Queue (e.g actionBlock.Complete()). Completing a block means it will no longer accept or produce messages.

One easy way to do that is to mark all your links as propagating completion:

sourceBlock.LinkTo(targetBlock, new DataflowLinkOptions() {PropagateCompletion = true});

This way, when you .Complete() on the sourceBlock, the targetBlock is also going to complete.

If your Job Queue is going to be active throughout the entire lifetime of your application, then don’t worry about completion.

Priority Queue with TPL Dataflow

Sometimes, you will want to have prioritization for your jobs. For example, in our call center, medical calls should always be treated first, then criminal calls and fire calls last. TPL DataFlow is not very well suited for priority queues, but it can be done.

If you have a fixed amount of priority levels, then you can implement it relatively easily with multiple BufferBlock instances.

  1. In this GitHub project TPLDataflowHelpers_PriorityBufferBlock there’s an implementation of a such a block.
  2. svick offered a similar solution here .

If you want an unlimited amount of priorities, then my suggestion is to use BlockingCollection. It can accept an IProducerConsumer object, which can implement a priority queue anyway you want. Here’s the implementation:

public class TPLDataflowPriority
{
    private ActionBlock<string> _actionBlock;
    private BlockingCollection<string> _jobs;

    public TPLDataflowPriority()
    {
        _actionBlock = new ActionBlock<string>(
            (job) => Console.WriteLine(job),
            // BoundedCapacity must be 1
            new ExecutionDataflowBlockOptions() { BoundedCapacity = 1 });

        _jobs = new BlockingCollection<string>(GetPriorityQueue());

        Task.Run(async () =>
        {
            foreach (var job in _jobs.GetConsumingEnumerable())
            {
                await _actionBlock.SendAsync(job);
            }
        });
    }

    private IProducerConsumerCollection<string> GetPriorityQueue()
    {
        // your priority queue here
    }

    public void Enqueue(string job)
    {
        _jobs.Add(job);
    }
}</string></string></string></string></string>

Note that BoundedCapacity has to be 1. Otherwise, the jobs will immediately move from the BlockingCollection to the ActionBlock, disabling any prioritization.

You might be asking why even to use TPL Dataflow instead of BlockingCollection solutions (shown in Part 1 ). Well, you can combine the prioritization with other Dataflow features like Controlling Parallelism level and publisher/subscriber patterns.

Verdict on TPL Dataflow

I honestly think Dataflow is one of the best solutions presented in this series. It combines the beauty of Reactive Extensions (Rx) and the intuitive manner of other approaches. It’s one also the most versatile solution from everything we’ve seen so far. In fact, it innately supports most common Job Queue customization presented in this series.

My only concern is the overhead of adding this library and performance implications. I suspect Dataflow is going to be slower than the other solutions. But, suspicion only goes so far. I plan on writing one more special part to this series where I’ll benchmark all the Job Queue implementations and we will see the real performance results (and possibly memory footprint).

Additional Resources

Failure handling, and the Poison Queue

Let’s consider a scenario where a job handler failed with an exception. According to your specific needs, you can go several ways about it:

  • Retry
  • Do nothing (move to the next message)
  • Return to the queue to handle later

And you probably can guess that there are issues that can arise from that.

  1. If you decided to retry, what happens if retry fails indefinitely? The entire queue becomes stuck.
  2. If you decided to move the job back to the queue, to a later position, after multiple failing jobs you can get to the point where the queue is handling just failing jobs. And again the queue becomes stuck.
  3. If the job is part of a Transaction, then you might want to Abort the entire transaction. That means that each job will have to have some kind of AbortTransaction() method.
  4. If the job is part of a Batch of jobs, you might want to have an AbortBatch() method. In that case, you might want to handle the entire batch again.

All these problems lead to the concept of the poison messages (aka poison jobs). The idea is that according to some rules, you can mark messages as posion message. In that case, you will move these messages to a separate dedicate queue called the poison queue. Here’s are some example to policies on what to do when a job fails:

  • Move the job to the poison queue
  • Retry 3 times and if failed move to poison queue
  • Move back to the queue in to the 100th position and increase retry counter. When retry counter reaches 5, move to poison queue.
  • Move back to the queue to be executed after 30 minutes. After 3 retries, discard entirely (without posion queue). This requires a sophisticated queue that can produce jobs after a given time.

Since this post is dedicated to TPL Dataflow, let’s see an example with that. The native exception handling in TPL Dataflow is not best suited for our needs because once an exception happens, the Block will move to a Faulted state. When faulted, a block can no longer receive messages and our Job Queue is effectively dead. Our goal instead is to move it to the poison queue.

We can make use of the Polly NuGet to help enforce our rules. It’s a very popular fault-handling library that helps to create retry-mechanism, fallbacks and alike.

Let’s see an example of how to implement the rule (aka policy) “On failure, retry 3 times and move to poison queue if failed on all retries”:

public interface IJobQueue<t>
{
    void Enqueue(string T);
}

class MyPoisonQueue : IJobQueue<string>
{
    public void Enqueue(string str)
    {
        // do something
    }
}

public class TPLDataflowWithErrorHandling : IJobQueue<string>
{
    private ActionBlock<string> _jobs;

    public TPLDataflowWithErrorHandling(IJobQueue<string> poisonQueue)
    {
        var policy = 
            Policy.Handle<exception>() // on any exception
            .Retry(3); // retry 3 times

        _jobs = new ActionBlock<string>((job) =>
        {
            try
            {
                policy.Execute(() =>
                {
                    int customer = GetCustomerById(job);// possibly throws exception
                    Console.WriteLine(customer.Name);
                });
            }
            catch (Exception e)
            {
                // If policy failed (after 3 retries), move to poison queue
                poisonQueue.Enqueue(job);
            }
        });
    }

    public void Enqueue(string job)
    {
        _jobs.Post(job);
    }
}</string></exception></string></string></string></string></t>

This is a regular Job Queue that executes jobs on a single thread. Only that it uses Polly to retry each job 3 times in case it fails. So if GetCustomerById() threw an exception, it will execute it three more times. If it failed all 3 times, we will add it to the poison queue.

This raises the question of what to do with the poison queue job. Some options are:

  • Debug each one to realize what’s the problem
  • Monitor how many jobs are added to the poison queue
  • Save them in some backup location until the end of time

Hangfire

A lot of people commented about Hangfire as a solution for Job Queues, and I feel obligated to mention it.

Although I never used it myself, I heard good things both from the comments and from colleagues.

Hangfire implements a Job Queue for you, and it includes a ton of functionality which you will be happy it’s already developed. Some of that functionality includes:

  • Job persistance in some kind of storage (They really support a lot of databases )
  • Single process or Multiple processes
  • Batch Jobs
  • Automatic retries
  • Job Continuations
  • Monitoring UI

Some of that stuff, like persistence in a database, requires a ton of work if you wanted to implement yourself.

It’s an open-source project with a free plan for the core functionality and a paid plan for more advanced features.

Summary

This is the 3rd and final part of the series, but as I mentioned there will be is another special part where I compare all the implementations. I’ll definitely do performance benchmarks, compare between customization options and maybe memory footprint as well, so stay tuned.

I hope you liked the series, I really enjoyed writing it. It’s interesting how C# has so many different libraries and options to implement Job Queues. It’s pretty fun to work in a programming space where you always have new things to discover, whether it’s language features or awesome libraries.

I can’t really choose which implementation I like best. They’re all great for different purposes. Besides, it’s like choosing a favorite child. If you got a clear winner in your mind though, do share in the comments section. Happy coding.