The Pipeline pattern, also known as the Pipes and Filters design pattern is a powerful tool in programming. The idea is to chain a group of functions in a way that the output of each function is the input the next one. The concept is pretty similar to an assembly line where each step manipulates and prepares the product for the next step.

We might have a pipeline that accepts a string, finds the most common word, counts its number of characters, and checks if that number is Odd. Something like this:

string input = "The pipeline pattern is the best pattern";

var pipeline = CreatePipeline();
Console.Write(pipeline.Execute(input)); 
// Returns 'True' because 'pattern' is the most common, 
// it has 7 characters and it's an odd number

In this article series, we’re going to see different implementations of multi-threaded pipeline in C#. This first part is going to show implementations using BlockingCollections. In the next parts, I’ll show you implementations with TPL Dataflow, Channels, Rx and maybe some 3rd party libraries.

Types of Pipeline Implementations

There are 3 basic types of pipeline implementations:

  1. Synchronous execution – In its most simple form, the pipeline object will synchronously execute all of its steps and return the result. Basically, a regular method call.
  2. Asynchronous execution (producer/consumer) – We might want to have our pipe executions work in some background thread, which we can add jobs to from other threads. This is a subset of the producer/consumer pattern (Job Queues) where each job is a pipeline. This means that the entire pipeline steps will be executed in a single thread for a specific item.
  3. Multi-Threaded pipeline – With this approach, each step of the pipeline is executed on a separate thread (or threads). There’s a buffer (Queue) between each pipeline element to store step results while the next element is still not able to receive them. The buffers might be limited in size.

Pipeline pattern csharp uml

  1. Multi-Process pipeline and Multi-Machine pipeline – Much like Multi-Threaded pipeline, we can have pipeline steps across multiple processes or even machines. This might be useful for performance or security measures. For example, just one machine (or process) can access a specific resource (like a database).

You might be wondering what’s the advantage of Multi-Threaded pipeline over asynchronous execution. Both execute asynchronously and you can just increase the number of dedicated threads to improve performance. There are actually some pretty good reasons to choose the Multi-Threaded pipeline approach:

  • Having a different number of threads for different operation allows better optimization.
  • By having one thread execute the same pipeline operation, you can save whatever warmup leading to the operation.
  • Since each thread executes just one operation, the CPU cache for memory and execution code is always in use.
  • Some resources can be used by just 1 thread at a time. You can dedicate a single thread element to work with that resource. This is better than locking that resource and having threads wait for each other.

As mentioned, this article is going to concentrate on Multi-Threaded pipeline, which I think is the more interesting challenge. For Synchronous execution, I suggest reading this article . For asynchronous execution (producer/consumer), I suggest reading my series on Job Queue Implementations .

Building the Pipeline

We’re going to see a bunch of different ways to implement pipelines, but the usage is always going to be the same:

pipeline.Execute(input);
// or if the pipeline implementation allows waiting for result
var result = await pipeline.Execute(input);

The question is how this pipeline object is to be created? We can go in several different ways here:

  1. Derive from an abstract class
public class MyPipeline : Pipeline
{
    public override IList<IPipelineStep> 
    {
        get {
            return new List<IPipelineStep>() {
                new MostCommonWordStep(),
                new CountCharactersStep(),
                new IsOddStep(),
            }
        }
    }
}

// ...
var pipeline = new MyPipeline();

This is a decent approach but it has several problems. For one thing, a user will have to create a class for each pipeline. Besides that, the bigger challenge is to have different TInput and TOutput for each step. This means that for 3 steps our base class signature will have to look like this:

public abstract class Pipeline<TIn1, TOut1, TIn2, TOut2, TIn3, TOut3>
  1. Use a Builder object with delegates for each step
var builder = CreatePipelineBuilder();
builder.AddStep(str => FindMostCommonWord(str));
builder.AddStep(mostCommonWord => mostCommonWord.Length);
builder.AddStep(length => /*isOdd */ length % 2 == 1);

var pipeline = builder.GetPipeline();

This is a much nicer syntax now. You don’t have to create a class for each pipeline. Instead, just pass delegates to an AddStep method. Let’s stick with this solution.

You might notice that this doesn’t solve the per-step TInput and TOutput problem. We still want each step to have its own input type and output type. There’s actually a very interesting way to solve that issue which I’ll show you further on.

Now that we know more about what we need, let’s clearly define the requirements.

Requirements

Here’s what our Multi-Threaded pipeline implementation should do:

  • Allow to add steps with a Builder object and support anonymous functions.
  • Have each pipeline element execute on a dedicated thread.
  • Have a buffer (queue) between the pipeline elements for items that finished with one step and waiting for the next step.
  • (Optionally) Allow a different TInput and TOutput for each step without casting.
  • (Optionally) Allow setting a degree of parallelism for each step. That is the number of maximum dedicated threads.
  • (Optionally) Set maximum capacity to each buffer
  • (Optionally) Allow to await the result of an item entered into the pipeline.

Now that we got everything straight, let’s continue to our first Multi-Threaded pipeline implementation.

Implementation 1: Using BlockingCollection

The BlockingCollection was introduced in .NET framework 4 to solve the producer-consumer problem . It allows to produce and handle jobs from multiple threads. It supports a maximum capacity of items. It also allows to block when we have no items to handle or when we reached its full capacity. It’s completely thread-safe. The default implementation acts as a Queue. This makes BlockingCollection perfect to act as the buffer between our pipeline steps. Here’s the basic implementation plan:

  • Each pipeline step will have a dedicated thread (or several threads)
  • Each thread will have a dedicated BlockingCollection input source. The thread will Take items (inputs) from the BlockingCollection, invoke them with that step’s delegate, and place them into the next buffer.
  • On the last step, an event will fire with the result. This is not optimal, but we will improve it later by awaiting the result.

So with this plan in mind, here’s the first implementation:

public interface IPipeline
{
    void Execute(object input);
    event Action<object> Finished;
}

public class CastingPipelineBuilder : IPipeline
{
    List<Func<object, object>> _pipelineSteps = new List<Func<object, object>>();
    BlockingCollection<object>[] _buffers;

    public event Action<object> Finished;

    public void AddStep(Func<object, object> stepFunc)
    {
        _pipelineSteps.Add(stepFunc);
    }

    public void Execute(object input)
    {
        var first = _buffers[0];
        first.Add(input);
    }

    public IPipeline GetPipeline()
    {
        _buffers = _pipelineSteps // Create buffers
            .Select(step => new BlockingCollection<object>())
            .ToArray(); 

        int bufferIndex = 0;
        foreach (var pipelineStep in _pipelineSteps)
        {
            var bufferIndexLocal = bufferIndex; // so it remains the same in each thread
            Task.Run(() =>
            {
                // 'GetConsumingEnumerable' is blocking when the collection is empty
                foreach (var input in _buffers[bufferIndexLocal].GetConsumingEnumerable())
                {
                    var output = pipelineStep.Invoke(input);

                    bool isLastStep = bufferIndexLocal == _pipelineSteps.Count - 1;
                    if (isLastStep)
                    {
                        // This is dangerous as the invocation is added to the last step
                        // Alternatively, you can utilize 'BeginInvoke' like here: https://stackoverflow.com/a/16336361/1229063
                        Finished?.Invoke(output);
                    }
                    else
                    {
                        var next = _buffers[bufferIndexLocal + 1];
                        next.Add(output); // output will be stored as object
                    }
                }
            });
            bufferIndex++;
        }
        return this;
    }
}

Usage:

var builder = new CastingPipelineBuilder();

//casting from object is needed on each step
builder.AddStep(input => FindMostCommon(input as string));
builder.AddStep(input => (input as string).Length);
builder.AddStep(input => ((int)input) % 2 == 1);

var pipeline = builder.GetPipeline();

pipeline.Finished += res => Console.WriteLine(res); 
pipeline.Execute("The pipeline pattern is the best pattern");
// 'True' is printed because 'pattern' is the most common with 7 chars and it's an odd number
// ...
private static string FindMostCommon(string input)
{
    return input.Split(' ')
        .GroupBy(word => word)
        .OrderBy(group => group.Count())
        .Last()
        .Key;
}

What happens here is pretty much the initial plan in code:

  • Each step’s delegate is stored in the _pipelineSteps list.
  • Each step has a BlockingCollection<object> buffer as its input which is stored in the _buffers array.
  • When GetPipeline is called, the following happens:
    • The _buffers array is created.
    • A thread is opened for each step by Task.Run
    • Each thread takes items from its corresponding buffer (BlockingCollection) with foreach (var input in _buffers[bufferIndexLocal].GetConsumingEnumerable()). This method will block (wait) when the BlockingCollection is empty until a new item has been added.
    • When an item is taken from the BlockingCollection, the relevant step’s delegate is invoked. If it’s the last step, the event is invoked. If not, the output is added to the buffer of the next step.

This code works well enough, but it has some disadvantages:

  1. Each delegate in the pipeline gets an object as its input. This requires casting to be made. Instead of writing builder.AddStep(input => input.Length) we have to write builder.AddStep(input => (input as string).Length);. Besides the syntactic inconvenience, it introduces a performance problem. Casting is required on each step. For value types, each step will perform a boxing and an unboxing .
  2. A dedicated thread remains active for each step, even when empty. If we were to dedicate multiple threads for each step (I’ll show this later), they would all remain working even when there’s no work. We won’t be able to solve this with BlockingCollection implementations.
  3. We can’t actually await a pipeline job. Optimally, I would like the option to write var result = await pipeline.Execute("The pipeline pattern is the best pattern").

Let’s try to get over some of those problems.

Making BlockingCollection Implementation Better

The first issue I want to address is the casting problem. We can get over the syntax problem in the usage rather easily by playing a bit with generics. Simply replace the public void AddStep(Func<object, object> stepFunc) method with this:

public void AddStep<TStepIn, TStepOut>(Func<TStepIn, TStepOut> stepFunc)
{
    _pipelineSteps.Add(objInput => 
        stepFunc.Invoke((TStepIn)(object)objInput));
}

This will make the usage much nicer:

var builder = new InnerPipelineBuilder();
builder.AddStep<string, string>(input => FindMostCommon(input));
builder.AddStep<string, int>(input => CountChars(input));
builder.AddStep<int, bool>(input => IsOdd(input));
var pipeline = builder.GetPipeline();

pipeline.Finished += res => Console.WriteLine(res);
pipeline.Execute("The pipeline pattern is the best pattern");

This mostly solves the syntax problems. You’ll still have to specify the <TInput, TOutput> with each step, but the delegate itself won’t require casting.

The performance problem remains though. In fact, the performance with this change is even worst. We still have castings, but now we added an additional wrapper delegate for each. Another delegate wrapper means more allocations, which means worst performance.

Luckily, I was able to overcome the casting problems with some C# trickery and some inspiration from this post by Jeremy Davis.

BlockingCollection Implementation without Castings

The following code cost me some sweat and tears to write:

public interface IPipelineStep<TStepIn>
{
    BlockingCollection<TStepIn> Buffer { get; set; }
}

public class GenericBCPipelineStep<TStepIn, TStepOut> : IPipelineStep<TStepIn>
{
    public BlockingCollection<TStepIn> Buffer { get; set; } = new BlockingCollection<TStepIn>();
    public Func<TStepIn, TStepOut> StepAction { get; set; }
}

public static class GenericBCPipelineExtensions
{
    public static TOutput Step<TInput, TOutput, TInputOuter, TOutputOuter>
        (this TInput inputType, 
        GenericBCPipeline<TInputOuter, TOutputOuter> pipelineBuilder, 
        Func<TInput, TOutput> step)
    {
        var pipelineStep = pipelineBuilder.GenerateStep<TInput, TOutput>();
        pipelineStep.StepAction = step;
        return default(TOutput);
    }
}

public class GenericBCPipeline<TPipeIn, TPipeOut>
{
    List<object> _pipelineSteps = new List<object>();
    
    public event Action<TPipeOut> Finished;

    public GenericBCPipeline(Func<TPipeIn, GenericBCPipeline<TPipeIn, TPipeOut>, TPipeOut> steps)
    {
        steps.Invoke(default(TPipeIn), this);//Invoke just once to build blocking collections
    }
    
    public void Execute(TPipeIn input)
    {
        var first = _pipelineSteps[0] as IPipelineStep<TPipeIn>;
        first.Buffer.Add(input);
    }

    public GenericBCPipelineStep<TStepIn, TStepOut> GenerateStep<TStepIn, TStepOut>()
    {
        var pipelineStep = new GenericBCPipelineStep<TStepIn, TStepOut>();
        var stepIndex = _pipelineSteps.Count;

        Task.Run(() =>
        {
            IPipelineStep<TStepOut> nextPipelineStep = null;

            foreach (var input in pipelineStep.Buffer.GetConsumingEnumerable())
            {
                bool isLastStep = stepIndex == _pipelineSteps.Count - 1;
                var output = pipelineStep.StepAction(input);
                if (isLastStep)
                {
                    // This is dangerous as the invocation is added to the last step
                    // Alternatively, you can utilize BeginInvoke like here: https://stackoverflow.com/a/16336361/1229063
                    Finished?.Invoke((TPipeOut)(object)output);
                }
                else
                {
                    nextPipelineStep = nextPipelineStep // no need to evaluate more than once
                        ?? (isLastStep ? null : _pipelineSteps[stepIndex + 1] as IPipelineStep<TStepOut>);
                    nextPipelineStep.Buffer.Add(output);
                }
            }
        });
        
        _pipelineSteps.Add(pipelineStep);
        return pipelineStep;
    }
}

Usage:

var pipeline = new GenericBCPipeline<string, bool>((inputFirst, builder) =>
    inputFirst.Step(builder, input => FindMostCommon(input))
        .Step(builder, input => input.Length)
        .Step(builder, input => input % 2 == 1));

pipeline.Finished += res => Console.WriteLine(res); 
pipeline.Execute("The pipeline pattern is the best pattern");
// 'True' is printed

That’s not the most readable code, I admit, but the result is very effective. As you can see the usage has no castings at all. It also doesn’t require to write types <TInput, TOutput> on each step. Moreover, the implementation itself doesn’t do any internal casting beyond the initialization.

Here’s the explanation of the code:

  • GenericBCPipelineStep represents a pipeline step. It contains the BlockingCollection input buffer, and the delegate to invoke.
  • IPipelineStep<TStepIn> is required for a step to add the output to the input of the next step. That’s because each step knows just its own input and output types. It doesn’t know the next step’s output, but it does know its input type, which is its own output.
  • The extension method Step is where the magic happens. This allows to add a step for any input type and output type without needing to specify the types in advance. It’s called just on startup to initialize all the threads and BlockingCollection buffers. Since it’s a static class, it requires the Pipeline builder itself, hence the need ot pass the builder in each step.
  • List<object> _pipelineSteps represents all the steps. They have to be objects since we don’t know the generic types in advance. They will be cast to GenericBCPipelineStep<TStepIn, TStepOut> later, but just once for each step.
  • The constructor is the one to call all the extension methods. These generate the threads and the buffer.
  • GenerateStep is called for each step. It creates a new GenericBCPipelineStep with the blocking collection buffer. It then creates a new thread which reads from that buffer, invokes the step’s delegate and places the output in the next step’s buffer.

This implementation makes the BlockingCollection a pretty reasonable choice I think.

Customizations and Optional Requirements

Going back to the requirements, we talked of some optional features. Specifically:

  • Allow setting a degree of parallelism for each step. That is the number of maximum dedicated threads.
  • Set maximum capacity to each buffer
  • Allow to await the result of an item entered into the pipeline.

Let’s solve all of them.

Set a degree of parallelism for each step

With BlockingCollection, we can easily have several handling threads for each step. It’s just a matter of adding more threads with the same code. For simplicity’s sake, I’ll change the first implementation (the one with casting) to have degrees of parallelism:

The differences are marked with the !!! comments
public class CastingPipelineWithParallelism : IPipeline
{
    class Step
    {
        public Func<object, object> Func { get; set; }
        public int DegreeOfParallelism { get; set; }
    }

    List<Step> _pipelineSteps = new List<Step>();
    BlockingCollection<object>[] _buffers;

    public event Action<object> Finished;

    public void AddStep(Func<object, object> stepFunc, int degreeOfParallelism)
    {
        // !!! Save the degree of parallelism
        _pipelineSteps.Add(new Step() { Func = stepFunc, DegreeOfParallelism = degreeOfParallelism });
    }

    public void Execute(object input)
    {
        var first = _buffers[0];
        first.Add(input);
    }

    public IPipeline GetPipeline()
    {
        _buffers = _pipelineSteps.Select(step => new BlockingCollection<object>()).ToArray();

        int bufferIndex = 0;
        foreach (var pipelineStep in _pipelineSteps)
        {
            var bufferIndexLocal = bufferIndex;

            // !!! start as many threads as there are degrees of parallelism.
            for (int i = 0; i < pipelineStep.DegreeOfParallelism; i++)
            {
                Task.Run(() => { StartStep(bufferIndexLocal, pipelineStep); });
            }

            bufferIndex++;
        }
        return this;
    }

    private void StartStep(int bufferIndexLocal, Step pipelineStep)
    {
        foreach (var input in _buffers[bufferIndexLocal].GetConsumingEnumerable())
        {
            var output = pipelineStep.Func.Invoke(input);
            bool isLastStep = bufferIndexLocal == _pipelineSteps.Count - 1;
            if (isLastStep)
            {
                Finished?.Invoke(output);
            }
            else
            {
                var next = _buffers[bufferIndexLocal + 1];
                next.Add(output);
            }
        }
    }
}

The only difference from the initial implementation is to run as many threads as there are degrees of parallelism.

Set maximum capacity to each buffer

Setting a maximum capacity is also easy because BlockingCollection supports it natively. Here are the specific changes to make:

class Step
{
    public Func<object, object> Func { get; set; }
    public int DegreeOfParallelism { get; set; }
    public int MaxCapacity { get; set; } // !!!
}

public void AddStep(Func<object, object> stepFunc, int degreeOfParallelism, int maxCapacity)
{
    _pipelineSteps.Add(new Step() 
    {
        Func = stepFunc, 
        DegreeOfParallelism = degreeOfParallelism, 
        MaxCapacity = maxCapacity // !!!
    });
}

public IPipeline GetPipeline()
{
    _buffers = _pipelineSteps.Select(step => 
        new BlockingCollection<object>(step.MaxCapacity)) // !!!
        .ToArray();
    // ...

Allow to await the result of an item entered into the pipeline.

Optimally, we’d like to be able to write var result = await pipeline.Execute(input). This will wait for the input to finish all steps of the pipeline asynchronously and return the result.

This is done relatively simply with the TaskCompletionSource class. This class provides a Task instance whose result you can set manually. Or an exception if needed. In our case, each item executed by the pipeline will have to keep a TaskCompletionSource instance with it. Here’s the implementation with the simple CastingPipeline implementation.

The interface changed since it now returns a Task<TOutput> and no longer needs an event.

public interface IAwaitablePipeline<TOutput>
{
    Task<TOutput> Execute(object input);
}

The new implementation includes all 3 optional features: Degrees of parallelism, Max capacity for each step, and awaitable result:

The differences are marked with the !!! comments
public class CastingPipelineWithAwait<TOutput> : IAwaitablePipeline<TOutput>
{
    class Step
    {
        public Func<object, object> Func { get; set; }
        public int DegreeOfParallelism { get; set; }
        public int MaxCapacity { get; set; }
    }

    // !!! Keeping a TaskCompletionSource to each item
    class Item
    {
        public object Input { get; set; }
        public TaskCompletionSource<TOutput> TaskCompletionSource { get; set; }
    }

    List<Step> _pipelineSteps = new List<Step>();
    BlockingCollection<Item>[] _buffers;

    public event Action<TOutput> Finished;

    public void AddStep(Func<object, object> stepFunc, int degreeOfParallelism, int maxCapacity)
    {
        _pipelineSteps.Add(new Step() {Func = stepFunc, DegreeOfParallelism = degreeOfParallelism, 
            MaxCapacity = maxCapacity, });
    }

    public Task<TOutput> Execute(object input)
    {
        var first = _buffers[0];
        var item = new Item()
        {
            Input = input,
            TaskCompletionSource = new TaskCompletionSource<TOutput>()
        };
        first.Add(item);
        
        //!!! This Task will return when we manually call item.TaskCompletionSource.SetResult()
        return item.TaskCompletionSource.Task; 
    }

    public IAwaitablePipeline<TOutput> GetPipeline()
    {
        _buffers = _pipelineSteps.Select(step => new BlockingCollection<Item>()).ToArray();

        int bufferIndex = 0;
        foreach (var pipelineStep in _pipelineSteps)
        {
            var bufferIndexLocal = bufferIndex;

            for (int i = 0; i < pipelineStep.DegreeOfParallelism; i++)
            {
                Task.Run(() => { StartStep(bufferIndexLocal, pipelineStep); });
            }

            bufferIndex++;
        }
        return this;
    }

    private void StartStep(int bufferIndexLocal, Step pipelineStep)
    {
        foreach (var input in _buffers[bufferIndexLocal].GetConsumingEnumerable())
        {
            object output;
            try
            {
                output = pipelineStep.Func.Invoke(input.Input);
            }
            catch (Exception e)
            {
                // !!! If an exception happened, we need to set the Task as failed with exception or it will keep waiting
                input.TaskCompletionSource.SetException(e);
                continue;
            }

            bool isLastStep = bufferIndexLocal == _pipelineSteps.Count - 1;
            if (isLastStep)
            {
                // !!! Setting Task's result in the last step
                input.TaskCompletionSource.SetResult((TOutput)(object)output);
            }
            else
            {
                var next = _buffers[bufferIndexLocal + 1];
                next.Add(new Item() { Input = output, TaskCompletionSource = input.TaskCompletionSource});
            }
        }
    }
}

For simplicity’s sake, I showed all variations with the simpler CastingPipeline. However, the same variations can be applied to the GenericBCPipeline. In fact, I implemented the GenericBCPipelineAwait, variation which allows awaiting for the result. You can see it on GitHub as well as all other implementations shown in this article.

In previous implementations I neglected to handle exceptions. This is actually pretty important since we don’t want an exception to destroy the entire pipeline. The solution is simply to wrap the step’s invocation with try/catch and place continue; in the catch clause. This will continue to the next item in the buffer.

Summary and Coming Up

As you can see, the pipeline pattern is pretty interesting to implement. Specifically, supporting any input type and output type for each step was a major challenge. Besides that, the BlockingCollection class did most of the work.

In the next parts, we’ll see how to implement a pipeline in other ways like TPL Dataflow, System.Threading.Channels, and some 3rd party libraries I got my eye on. Subscribe to be updated on the next series parts.