In this 3rd part of the series, we’ll continue our journey to explore multi-threaded pipeline pattern implementations in C#. Let’s do a quick recap of the previous parts:

  • In Part 1 we defined the pipeline pattern and talked of the different implementation types. Then, we saw some hardcore C# trickery to create a very nice pipeline builder with BlockingCollection.

  • In Part 2 I showed how to create similar pipeline implementations with the excellent TPL Dataflow library. It turned out that the TPL Dataflow implementation was much cleaner and easier.

  • In both parts we were able to achieve all our goals, which were:

    • Create the pipeline with a nice functional syntax.
    • Add steps with lambda functions.
    • Use custom input and output types so that the output of each step is the input of the next without the need for casting.
    • Execute each step on a dedicated thread
    • Control the degree of parallelism for each step (number of threads).
    • Set max bound capacity for each step.
    • Be able to await the result of an item entered into the pipeline.

    The usage turned out to be something like this:

var pipeline = new MyPipeline<string, bool>()
    .AddStep<string, string>(sentence => FindMostCommon(sentence))
    .AddStep<string, int>(word => word.Length)
    .AddStep<int, bool>(length => length % 2 == 1)
    .CreatePipeline();

// usage:
string input = "The pipeline pattern is the best pattern";
bool result = await pipeline.Execute(input); 
// Returns 'True' because 'pattern' is the most common, 
// it has 7 characters and it's an odd number

This follows the pipeline example from the previous parts, which was:

  1. Take a string sentence
  2. Find out the most common word
  3. Count number of characters in the word
  4. Return “True” if the number is odd and “False” if it’s even

In This Part

In this article, we’re going to do a couple of things.

I got several comments about adding support for asynchronous steps. Basically, we’d like to have the ability to write something like:

pipeline.AddStep(async (input) => await DoSomethingAsync(input));

// or 
pipeline.AddStep(DoSomethingAsync);
// ...
public async Task<int> DoSomethingAsync(string input)
{
  // ...
}

This is a pretty important request. Even though our pipeline has dedicated thread(s) for each step, we don’t really want to “waste” a thread when there’s a long I/O-bound operation. So the first thing we’re going to see is creating a pipeline implementation that supports async steps.

The second thing we’ll see in this article is another pipeline implementation with the Disruptor-net library. Olivier Coanet told me about Disruptor and even created a pull request with a fully working implementation (he is one of the top maintainers of Disruptor-net ). So kudos to you Olivier, and we’ll see the full power of the Disruptor library.

Adding Asynchronous Steps

Let’s start with one of the cleanest pipeline implementations we saw so far with TPL Dataflow:

public class TPLDataflowSteppedSimple<TIn, TOut>
{
    private List<IDataflowBlock> _steps = new List<IDataflowBlock>();
    public void AddStep<TLocalIn, TLocalOut>(Func<TLocalIn, TLocalOut> stepFunc)
    {
        var step = new TransformBlock<TLocalIn, TLocalOut>((input) => 
            stepFunc(input));
        if (_steps.Count > 0)
        {
            var lastStep = _steps.Last();
            var targetBlock = (lastStep as ISourceBlock<TLocalIn>);
            targetBlock.LinkTo(step, new DataflowLinkOptions());
        }
        _steps.Add(step);
    }

    public void CreatePipeline(Action<TOut> resultCallback)
    {
        var callBackStep = new ActionBlock<TOut>(resultCallback);
        var lastStep = _steps.Last();
        var targetBlock = (lastStep as ISourceBlock<TOut>);
        targetBlock.LinkTo(callBackStep);
    }

    public void Execute(TIn input)
    {
        var firstStep = _steps[0] as ITargetBlock<TIn>;
        firstStep.SendAsync(input);
    }
}

Usage:

var pipeline = new TPLDataflowSteppedSimple<string, bool>();
pipeline.AddStep<string, string>(input => FindMostCommon(input));
pipeline.AddStep<string, int>(input => CountChars(input));
pipeline.AddStep<int, bool>(input => IsOdd(input));
pipeline.CreatePipeline(resultCallback:res => Console.WriteLine(res));

pipeline.Execute("The pipeline pattern is the best pattern");

This is actually a new variation of the previous implementations , but it’s perfect for what’s coming next.

What we want here is the ability to add asynchronous steps in the following manner:

pipeline.AddStepAsync<string, string>(async input => await SomethingAsync(input));

The thing is that AddStep and AddStepAsync will have to have different signatures:

public void AddStep<TLocalIn, TLocalOut>(Func<TLocalIn, TLocalOut> stepFunc)

public void AddStepAsync<TLocalIn, TLocalOut>(Func<TLocalIn, Task<TLocalOut>> stepFunc)

This forces several changes to our initial implementation:

  1. The simple step var step = new TransformBlock<TLocalIn, TLocalOut>(stepFunc) is not suited for asynchronous steps. It should be something like this:
var step = new TransformBlock<TLocalIn, Task<TLocalOut>>(async (input) => await stepFunc(input));
  1. If the previous step was also an async-step, then we should be actually doing something like this:
var step = new TransformBlock<Task<TLocalIn>, Task<TLocalOut>>(async (input) => await stepFunc(await input)); 

Note that the TransformBlock now accepts Task<TLocalIn> as input, so we have to write stepFunc(await input).

This leads to the need to mark each step as asynchronous or not and to a bit cumbersome implementation:

public class TPLDataflowSteppedAsyncFinal<TIn, TOut>
{
    private List<(IDataflowBlock Block, bool IsAsync)> _steps = 
        new List<(IDataflowBlock Block, bool IsAsync)>();

    public void AddStep<TLocalIn, TLocalOut>(Func<TLocalIn, TLocalOut> stepFunc)
    {
        if (_steps.Count == 0)
        {
            var step = new TransformBlock<TLocalIn, TLocalOut>(stepFunc);
            _steps.Add((step, IsAsync: false));
        }
        else
        {

            var lastStep = _steps.Last();
            if (!lastStep.IsAsync)
            {
                var step = new TransformBlock<TLocalIn, TLocalOut>(stepFunc);
                var targetBlock = (lastStep.Block as ISourceBlock<TLocalIn>);
                targetBlock.LinkTo(step, new DataflowLinkOptions());
                _steps.Add((step, IsAsync: false));
            }
            else
            {
                var step = new TransformBlock<Task<TLocalIn>, TLocalOut>
                    (async (input) => stepFunc(await input));
                var targetBlock = (lastStep.Block as ISourceBlock<Task<TLocalIn>>);
                targetBlock.LinkTo(step, new DataflowLinkOptions());
                _steps.Add((step, IsAsync: false));
            }
        }

    }

    public void AddStepAsync<TLocalIn, TLocalOut>
        (Func<TLocalIn, Task<TLocalOut>> stepFunc)
    {
        if (_steps.Count == 0)
        {
            var step = new TransformBlock<TLocalIn, Task<TLocalOut>>
                (async (input) => await stepFunc(input));
            _steps.Add((step, IsAsync: true));
        }
        else
        {
            var lastStep = _steps.Last();
            if (lastStep.IsAsync)
            {
                var step = new TransformBlock<Task<TLocalIn>, Task<TLocalOut>>
                    (async (input) => await stepFunc(await input));
                var targetBlock = (lastStep.Block as ISourceBlock<Task<TLocalIn>>);
                targetBlock.LinkTo(step, new DataflowLinkOptions());
                _steps.Add((step, IsAsync: true));
            }
            else
            {
                var step = new TransformBlock<TLocalIn, Task<TLocalOut>>
                    (async (input) => await stepFunc(input));
                var targetBlock = (lastStep.Block as ISourceBlock<TLocalIn>);
                targetBlock.LinkTo(step, new DataflowLinkOptions());
                _steps.Add((step, IsAsync: true));
            }
        }
    }

    public async Task CreatePipeline(Action<TOut> resultCallback)
    {
        var lastStep = _steps.Last();
        if (lastStep.IsAsync)
        {
            var targetBlock = (lastStep.Block as ISourceBlock<Task<TOut>>);
            var callBackStep = new ActionBlock<Task<TOut>>
                (async t => resultCallback(await t));
            targetBlock.LinkTo(callBackStep);
        }
        else
        {
            var callBackStep = new ActionBlock<TOut>(t => resultCallback(t));
            var targetBlock = (lastStep.Block as ISourceBlock<TOut>);
            targetBlock.LinkTo(callBackStep);
        }
    }

    public void Execute(TIn input)
    {
        var firstStep = _steps[0].Block as ITargetBlock<TIn>;
        firstStep.SendAsync(input);
    }
}

Usage:

pipeline.AddStepAsync<string, string>(async input => await FindMostCommonAsync(input));
pipeline.AddStep<string, int>(input => CountChars(input));
pipeline.AddStepAsync<int, bool>(async input => await IsOddAsync(input));

pipeline.CreatePipeline(res => Console.WriteLine(res));

pipeline.Execute("The pipeline pattern is the best pattern");

This is a lot of code, but pretty straightforward. Here’s the explanation and some important notes:

  • The _steps collection is now a Tuple that stores both the Block and whether it’s an async block.
  • In both AddStep and AddStepAsync we have a somewhat similar code. We find out which type was the previous step (async or not) and then create a TransformBlock accordingly. This is not pretty, but necessary because of the different castings to ISourceBlock<TLocalIn> and ISourceBlock<Task<TLocalIn>>.
  • We can use overloading and call both AddStep and AddStepAsync as AddStep. This is a better API, but maybe less clear in a tutorial. In fact, the is exactly what TPL Dataflow does with the TharnsformBlock constructor. That’s why we can write both variations of synchronous and asynchronous methods:
// We can write either this (synchronous):
new TransformBlock<TLocalIn, Task<TLocalOut>>((input) =>  stepFunc(input));

// or this: (asynchronous)
new TransformBlock<TLocalIn, Task<TLocalOut>>(async (input) => await stepFuncAsync(input));

Transform block  overloads

Summary on Asynchronous steps

So this is it for async steps in TPL Dataflow. Nothing fancy, just a simple strategy. It’s a good thing TransformBlock supports asynchronous operations, otherwise, we wouldn’t be able to make everything truly asynchronous.

Pipeline implementations with Disruptor-net

Disruptor-net is a very interesting library I recently discovered thanks to Olivier Coanet. It’s originally a port from the Java library LMAX Disruptor .

The general idea is kind of like TPL Dataflow, but not exactly. The library allows you to create Job Queues (consumers). The library then lets you move messages (events) from one Job-Queue consumer to another. You can multi-cast events to multiple consumers. You can set a dependency graph, specify thread-count to each consumer and specify event receive (wait) strategy.

If you think it all sounds kind of like TPL Dataflow, I would agree. TPL Dataflow solves much of the problems Disruptor already solved (and more), but with a friendlier API (in my opinion). There are differences though, especially around performance and event wait strategies. We’ll talk about them later.

There’s an interesting discussion from 2011 where Stephen Toub addresses some of the implementation differences between TPL Dataflow and Disruptor-net.

Before drawing any conclusions, let’s see Disruptor-net in action with the simplest pipeline implementation:

public interface ISimplePipeline
{
    void Execute(object data);
}

public class DisruptorSimple : ISimplePipeline
{
    // Disruptor must have the same event payload to all steps
    // That's why we won't be able to avoid casting
    private class StepPayload
    {
        public object Value { get; set; }
    }

    private class DelegateHandler : IWorkHandler<StepPayload>
    {
        private readonly Func<object, object> _stepFunc;

        public DelegateHandler(Func<object, object> stepFunc)
        {
            _stepFunc = stepFunc;
        }

        public void OnEvent(StepPayload payload)
        {
            // Here the step actually executes
            payload.Value = _stepFunc(payload.Value);
        }
    }


    private Disruptor<StepPayload> _disruptor;
    private List<DelegateHandler> _steps = new List<DelegateHandler>();

    public DisruptorSimple AddStep<TLocalIn, TLocalOut>(Func<TLocalIn, TLocalOut> stepFunc)
    {
        // Note that casting is always required
        _steps.Add(new DelegateHandler((obj) => stepFunc((TLocalIn)obj)));
        return this;
    }

    public ISimplePipeline CreatePipeline()
    {
        _disruptor = new Disruptor<StepPayload>(() => new StepPayload(), 1024, TaskScheduler.Default);
        // first step
        var handlerGroup = _disruptor.HandleEventsWithWorkerPool(_steps.First());

        // the other steps
        for (int i = 1; i < _steps.Count; i++)
        {
            var step = _steps[i];
            var stepAsArray = new IWorkHandler<StepPayload>[] {step};
            handlerGroup = handlerGroup.HandleEventsWithWorkerPool(stepAsArray);
        }
        
        _disruptor.Start();
        return this;
    }


    public void Execute(object data)
    {
        var sequence = _disruptor.RingBuffer.Next();//produce new event with factory method
        var disruptorEvent = _disruptor[sequence];//get the new event
        disruptorEvent.Value = data;
        _disruptor.RingBuffer.Publish(sequence);

    }
}

Usage:

var pipeline = new DisruptorSimple()
    .AddStep<string, string>(FindMostCommon)
    .AddStep<string, int>(CountChars)
    .AddStep<int, bool>(IsOdd)
    // This last step is kind of a result callback
    .AddStep<bool, bool>((res) =>
    {
        Console.WriteLine(res);
        return res;
    })
    .CreatePipeline();
pipeline.Execute("The pipeline pattern is the best pattern");

Some explanation of this code:

  • The Disruptor works by sending messages (events) from consumer to consumer. A consumer is a pipeline step in our case. We have to create the event class when creating a disruptor instance. There can be only one such class, so having a different generic payload for each step is out of the question.
  • The IWorkHandler can be specified for each consumer to handle messages when they come in with the OnEvent method. This is when we can execute the pipeline step.
  • In CreatePipeline we create the disruptor instance. We have to give a factory method ()=> new StepPayload() which creates the event instance. This instance is the message that’s passed around between all the steps.
  • 1024 is the ring-buffer-size. It’s the bound capacity of the consumers. It must be in the power of 2.

Some conclusions

As you can see, the Disruptor-net API is decent enough. Not great, but pretty good. I’d say this code is nicer than creating pipelines with BlockingCollection like in Part 1 , but not as nice as using TPL Dataflow.

The biggest drawback is that there’s a single message type for the disruptor, which makes it impossible to have different inputs and outputs for each pipeline step. Well, without casting that is.

The Disruptor has a lot of customizations to affect performance and behavior. For example, you can customize the so-called ring-buffer wait strategy. That is, determine how consumer threads will wait for messages. This strategy determines if locks are going to be in use and can increase performance. Another big issue is that the Disruptor pre-allocates memory for the events. We already talked about the events being of the same type, which makes this possible. This ability, along with the other performance consideration make the Disruptor (allegedly) an extremely low-latency framework. So your response time is really fast, but this might come in exchange for overall performance as mentioned by Stephen Toub .

Disruptor awaitable implementation

The single event type allows making the disruptor implementation to support await pretty simply. Here’s the improved implementation – the differences are marked with “!!!”.

public interface IPipeline<TOut>
{
    Task<TOut> Execute(string data);
}

// !!! The class is now generic since it returns a Task<TOut>
public class DisruptorSimpleAwaitable<TOut> : IPipeline<TOut>
{
    public class StepPayload<TOut>
    {
        public object Value { get; set; }
        // !!! Added TaskCompletionSource
        public TaskCompletionSource<TOut> TaskCompletionSource { get; set; }
    }

    public class DelegateHandler<TOut> : IWorkHandler<StepPayload<TOut>>
    {
        private readonly Func<object, object> _stepFunc;

        public DelegateHandler(Func<object, object> stepFunc)
        {
            _stepFunc = stepFunc;
        }

        public void OnEvent(StepPayload<TOut> payload)
        {
            try
            {
                // !!! If faulted, do nothing in this step
                if (payload.TaskCompletionSource.Task.IsFaulted)
                    return;
                payload.Value = _stepFunc(payload.Value);
            }
            catch (Exception e)
            {
                // If an exception occurred, set Task as faulted with exception
                payload.TaskCompletionSource.SetException(e);
            }
        }
    }

    // !!! Adding a special handler that completes the Task
    class SetResultHandler<TOut> : IWorkHandler<StepPayload<TOut>>
    {
        public void OnEvent(StepPayload<TOut> payload)
        {
            if (payload.TaskCompletionSource.Task.IsFaulted)
                return;
            payload.TaskCompletionSource.SetResult((TOut)payload.Value);
        }
    }

    private Disruptor<StepPayload<TOut>> _disruptor;
    private List<DelegateHandler<TOut>> _steps = new List<DelegateHandler<TOut>>();
    public DisruptorSimpleAwaitable<TOut> AddStep<TLocalIn, TLocalOut>(Func<TLocalIn, TLocalOut> stepFunc)
    {
        _steps.Add(new DelegateHandler<TOut>((obj) => stepFunc((TLocalIn)obj)));
        return this;
    }

    public IPipeline<TOut> CreatePipeline()
    {
        _disruptor = new Disruptor<StepPayload<TOut>>(() => new StepPayload<TOut>(), 1024, TaskScheduler.Default);
        var handlerGroup = _disruptor.HandleEventsWithWorkerPool(_steps.First());
        for (int i = 1; i < _steps.Count; i++)
        {
            var step = _steps[i];
            var makeStepToArray = new IWorkHandler<StepPayload<TOut>>[] { step };
            handlerGroup = handlerGroup.HandleEventsWithWorkerPool(makeStepToArray);
        }
        // !!! Adding the additional last step to complete the Task
        var setResultHandler = new SetResultHandler<TOut>();
        var setResultHandlerToArray = new IWorkHandler<StepPayload<TOut>>[] { setResultHandler };
        handlerGroup.HandleEventsWithWorkerPool(setResultHandlerToArray);

        _disruptor.Start();
        return this;
    }


    public Task<TOut> Execute(string data)
    {
        var sequence = _disruptor.RingBuffer.Next();
        var disruptorEvent = _disruptor[sequence];
        disruptorEvent.Value = data;
        var tcs = new TaskCompletionSource<TOut>();
        disruptorEvent.TaskCompletionSource = tcs;

        _disruptor.RingBuffer.Publish(sequence);
        return tcs.Task;

    }
}

Usage:

var pipeline = new DisruptorSimpleAwaitable<bool>()
    .AddStep<string, string>(FindMostCommon)
    .AddStep<string, int>(CountChars)
    .AddStep<int, bool>(IsOdd)
    .CreatePipeline();

Console.WriteLine(
    await pipeline.Execute("The pipeline pattern is the best pattern"));

When to use Disruptor

The Disruptor library should be used for multiple consumers (job queues) that run on different threads and pass messages to each other. So in pretty much all cases, you can use TPL Dataflow instead. I’d say in most cases TPL Dataflow is going to be easier to use and more flexible. Performance-wise, it depends. There are cases when Disruptor offers some advantages.

  • Disruptor has low-latency. But it comes with the price of higher CPU usage and longer startup time.
  • The Disruptor framework does no allocations beyond startup. This can be a huge performance benefit but remains to be checked with benchmarks. Note that the above pipeline implementation does have allocations.
  • Disruptor is very configurable and can be tuned to be entirely lock-free.
  • Disruptor doesn’t seem to have the ability to execute async steps as we showed in the beginning with TPL Dataflow.

So the Disruptor pattern can be a perfect fit for long-living pipelines that require very fast response time. This should be taken with a grain of salt until we do some benchmarking (which we will).

Olivier Coanet has done some interesting performance optimizations, including trying to prevent boxing and unboxing for value type payloads. You can read all about it in his blog post on Medium . Also, his implementation is pretty great with additional abilities and nice abstractions (I didn’t show it because it was more difficult to explain). Thanks, Olivier!

Summary

In this part, we saw how to add asynchronous steps to the pipeline with a TPL Dataflow implementation. This turned out to be pretty straightforward, even if not the prettiest code I ever wrote.

We also saw a new implementation with the Disruptor-net library. It turned out to be a good, working solution, even though it didn’t achieve all our set goals (casting was still required on every step).

All the code is available on GitHub here .

The real test is yet to come – performance benchmarks comparing the different implementations. This is crucial since we often need to use pipelines in the first place to deal with performance challenges. So coming next – maybe one more article with additional implementations and a final performance showdown between all of them.