The Pipeline pattern, also known as the Pipes and Filters design pattern is a powerful tool in programming. The idea is to chain a group of functions in a way that the output of each function is the input the next one. The concept is pretty similar to an assembly line where each step manipulates and prepares the product for the next step.
We might have a pipeline that accepts a string, finds the most common word, counts its number of characters, and checks if that number is Odd. Something like this:
string input = "The pipeline pattern is the best pattern";
var pipeline = CreatePipeline();
Console.Write(pipeline.Execute(input));
// Returns 'True' because 'pattern' is the most common,
// it has 7 characters and it's an odd number
In this article series, we’re going to see different implementations of multi-threaded pipeline in C#. This first part is going to show implementations using BlockingCollections
. In the next parts, I’ll show you implementations with TPL Dataflow, Channels, Rx and maybe some 3rd party libraries.
Types of Pipeline Implementations
There are 3 basic types of pipeline implementations:
- Synchronous execution – In its most simple form, the pipeline object will synchronously execute all of its steps and return the result. Basically, a regular method call.
- Asynchronous execution (producer/consumer) – We might want to have our pipe executions work in some background thread, which we can add jobs to from other threads. This is a subset of the producer/consumer pattern (Job Queues) where each job is a pipeline. This means that the entire pipeline steps will be executed in a single thread for a specific item.
- Multi-Threaded pipeline – With this approach, each step of the pipeline is executed on a separate thread (or threads). There’s a buffer (Queue) between each pipeline element to store step results while the next element is still not able to receive them. The buffers might be limited in size.
- Multi-Process pipeline and Multi-Machine pipeline – Much like Multi-Threaded pipeline, we can have pipeline steps across multiple processes or even machines. This might be useful for performance or security measures. For example, just one machine (or process) can access a specific resource (like a database).
You might be wondering what’s the advantage of Multi-Threaded pipeline over asynchronous execution. Both execute asynchronously and you can just increase the number of dedicated threads to improve performance. There are actually some pretty good reasons to choose the Multi-Threaded pipeline approach:
- Having a different number of threads for different operation allows better optimization.
- By having one thread execute the same pipeline operation, you can save whatever warmup leading to the operation.
- Since each thread executes just one operation, the CPU cache for memory and execution code is always in use.
- Some resources can be used by just 1 thread at a time. You can dedicate a single thread element to work with that resource. This is better than locking that resource and having threads wait for each other.
As mentioned, this article is going to concentrate on Multi-Threaded pipeline, which I think is the more interesting challenge. For Synchronous execution, I suggest reading this article . For asynchronous execution (producer/consumer), I suggest reading my series on Job Queue Implementations .
Building the Pipeline
We’re going to see a bunch of different ways to implement pipelines, but the usage is always going to be the same:
pipeline.Execute(input);
// or if the pipeline implementation allows waiting for result
var result = await pipeline.Execute(input);
The question is how this pipeline object is to be created? We can go in several different ways here:
- Derive from an abstract class
public class MyPipeline : Pipeline
{
public override IList<IPipelineStep>
{
get {
return new List<IPipelineStep>() {
new MostCommonWordStep(),
new CountCharactersStep(),
new IsOddStep(),
}
}
}
}
// ...
var pipeline = new MyPipeline();
This is a decent approach but it has several problems. For one thing, a user will have to create a class for each pipeline. Besides that, the bigger challenge is to have different TInput
and TOutput
for each step. This means that for 3 steps our base class signature will have to look like this:
public abstract class Pipeline<TIn1, TOut1, TIn2, TOut2, TIn3, TOut3>
- Use a Builder object with delegates for each step
var builder = CreatePipelineBuilder();
builder.AddStep(str => FindMostCommonWord(str));
builder.AddStep(mostCommonWord => mostCommonWord.Length);
builder.AddStep(length => /*isOdd */ length % 2 == 1);
var pipeline = builder.GetPipeline();
This is a much nicer syntax now. You don’t have to create a class for each pipeline. Instead, just pass delegates to an AddStep
method. Let’s stick with this solution.
You might notice that this doesn’t solve the per-step TInput
and TOutput
problem. We still want each step to have its own input type and output type. There’s actually a very interesting way to solve that issue which I’ll show you further on.
Now that we know more about what we need, let’s clearly define the requirements.
Requirements
Here’s what our Multi-Threaded pipeline implementation should do:
- Allow to add steps with a Builder object and support anonymous functions.
- Have each pipeline element execute on a dedicated thread.
- Have a buffer (queue) between the pipeline elements for items that finished with one step and waiting for the next step.
- (Optionally) Allow a different
TInput
andTOutput
for each step without casting. - (Optionally) Allow setting a degree of parallelism for each step. That is the number of maximum dedicated threads.
- (Optionally) Set maximum capacity to each buffer
- (Optionally) Allow to
await
the result of an item entered into the pipeline.
Now that we got everything straight, let’s continue to our first Multi-Threaded pipeline implementation.
Implementation 1: Using BlockingCollection
The BlockingCollection
was introduced in .NET framework 4 to solve the producer-consumer problem
. It allows to produce and handle jobs from multiple threads. It supports a maximum capacity of items. It also allows to block when we have no items to handle or when we reached its full capacity. It’s completely thread-safe. The default implementation acts as a Queue. This makes BlockingCollection
perfect to act as the buffer between our pipeline steps. Here’s the basic implementation plan:
- Each pipeline step will have a dedicated thread (or several threads)
- Each thread will have a dedicated
BlockingCollection
input source. The thread will Take items (inputs) from theBlockingCollection
, invoke them with that step’s delegate, and place them into the next buffer. - On the last step, an event will fire with the result. This is not optimal, but we will improve it later by awaiting the result.
So with this plan in mind, here’s the first implementation:
public interface IPipeline
{
void Execute(object input);
event Action<object> Finished;
}
public class CastingPipelineBuilder : IPipeline
{
List<Func<object, object>> _pipelineSteps = new List<Func<object, object>>();
BlockingCollection<object>[] _buffers;
public event Action<object> Finished;
public void AddStep(Func<object, object> stepFunc)
{
_pipelineSteps.Add(stepFunc);
}
public void Execute(object input)
{
var first = _buffers[0];
first.Add(input);
}
public IPipeline GetPipeline()
{
_buffers = _pipelineSteps // Create buffers
.Select(step => new BlockingCollection<object>())
.ToArray();
int bufferIndex = 0;
foreach (var pipelineStep in _pipelineSteps)
{
var bufferIndexLocal = bufferIndex; // so it remains the same in each thread
Task.Run(() =>
{
// 'GetConsumingEnumerable' is blocking when the collection is empty
foreach (var input in _buffers[bufferIndexLocal].GetConsumingEnumerable())
{
var output = pipelineStep.Invoke(input);
bool isLastStep = bufferIndexLocal == _pipelineSteps.Count - 1;
if (isLastStep)
{
// This is dangerous as the invocation is added to the last step
// Alternatively, you can utilize 'BeginInvoke' like here: https://stackoverflow.com/a/16336361/1229063
Finished?.Invoke(output);
}
else
{
var next = _buffers[bufferIndexLocal + 1];
next.Add(output); // output will be stored as object
}
}
});
bufferIndex++;
}
return this;
}
}
Usage:
var builder = new CastingPipelineBuilder();
//casting from object is needed on each step
builder.AddStep(input => FindMostCommon(input as string));
builder.AddStep(input => (input as string).Length);
builder.AddStep(input => ((int)input) % 2 == 1);
var pipeline = builder.GetPipeline();
pipeline.Finished += res => Console.WriteLine(res);
pipeline.Execute("The pipeline pattern is the best pattern");
// 'True' is printed because 'pattern' is the most common with 7 chars and it's an odd number
// ...
private static string FindMostCommon(string input)
{
return input.Split(' ')
.GroupBy(word => word)
.OrderBy(group => group.Count())
.Last()
.Key;
}
What happens here is pretty much the initial plan in code:
- Each step’s delegate is stored in the
_pipelineSteps
list. - Each step has a
BlockingCollection<object>
buffer as its input which is stored in the_buffers
array. - When
GetPipeline
is called, the following happens:- The
_buffers
array is created. - A thread is opened for each step by
Task.Run
- Each thread takes items from its corresponding buffer (
BlockingCollection
) withforeach (var input in _buffers[bufferIndexLocal].GetConsumingEnumerable())
. This method will block (wait) when theBlockingCollection
is empty until a new item has been added. - When an item is taken from the
BlockingCollection
, the relevant step’s delegate is invoked. If it’s the last step, theevent
is invoked. If not, the output is added to the buffer of the next step.
- The
This code works well enough, but it has some disadvantages:
- Each delegate in the pipeline gets an
object
as its input. This requires casting to be made. Instead of writingbuilder.AddStep(input => input.Length)
we have to writebuilder.AddStep(input => (input as string).Length);
. Besides the syntactic inconvenience, it introduces a performance problem. Casting is required on each step. For value types, each step will perform a boxing and an unboxing . - A dedicated thread remains active for each step, even when empty. If we were to dedicate multiple threads for each step (I’ll show this later), they would all remain working even when there’s no work. We won’t be able to solve this with
BlockingCollection
implementations. - We can’t actually
await
a pipeline job. Optimally, I would like the option to writevar result = await pipeline.Execute("The pipeline pattern is the best pattern")
.
Let’s try to get over some of those problems.
Making BlockingCollection Implementation Better
The first issue I want to address is the casting problem. We can get over the syntax problem in the usage rather easily by playing a bit with generics. Simply replace the public void AddStep(Func<object, object> stepFunc)
method with this:
public void AddStep<TStepIn, TStepOut>(Func<TStepIn, TStepOut> stepFunc)
{
_pipelineSteps.Add(objInput =>
stepFunc.Invoke((TStepIn)(object)objInput));
}
This will make the usage much nicer:
var builder = new InnerPipelineBuilder();
builder.AddStep<string, string>(input => FindMostCommon(input));
builder.AddStep<string, int>(input => CountChars(input));
builder.AddStep<int, bool>(input => IsOdd(input));
var pipeline = builder.GetPipeline();
pipeline.Finished += res => Console.WriteLine(res);
pipeline.Execute("The pipeline pattern is the best pattern");
This mostly solves the syntax problems. You’ll still have to specify the <TInput, TOutput>
with each step, but the delegate itself won’t require casting.
The performance problem remains though. In fact, the performance with this change is even worst. We still have castings, but now we added an additional wrapper delegate for each. Another delegate wrapper means more allocations, which means worst performance.
Luckily, I was able to overcome the casting problems with some C# trickery and some inspiration from this post by Jeremy Davis.
BlockingCollection Implementation without Castings
The following code cost me some sweat and tears to write:
public interface IPipelineStep<TStepIn>
{
BlockingCollection<TStepIn> Buffer { get; set; }
}
public class GenericBCPipelineStep<TStepIn, TStepOut> : IPipelineStep<TStepIn>
{
public BlockingCollection<TStepIn> Buffer { get; set; } = new BlockingCollection<TStepIn>();
public Func<TStepIn, TStepOut> StepAction { get; set; }
}
public static class GenericBCPipelineExtensions
{
public static TOutput Step<TInput, TOutput, TInputOuter, TOutputOuter>
(this TInput inputType,
GenericBCPipeline<TInputOuter, TOutputOuter> pipelineBuilder,
Func<TInput, TOutput> step)
{
var pipelineStep = pipelineBuilder.GenerateStep<TInput, TOutput>();
pipelineStep.StepAction = step;
return default(TOutput);
}
}
public class GenericBCPipeline<TPipeIn, TPipeOut>
{
List<object> _pipelineSteps = new List<object>();
public event Action<TPipeOut> Finished;
public GenericBCPipeline(Func<TPipeIn, GenericBCPipeline<TPipeIn, TPipeOut>, TPipeOut> steps)
{
steps.Invoke(default(TPipeIn), this);//Invoke just once to build blocking collections
}
public void Execute(TPipeIn input)
{
var first = _pipelineSteps[0] as IPipelineStep<TPipeIn>;
first.Buffer.Add(input);
}
public GenericBCPipelineStep<TStepIn, TStepOut> GenerateStep<TStepIn, TStepOut>()
{
var pipelineStep = new GenericBCPipelineStep<TStepIn, TStepOut>();
var stepIndex = _pipelineSteps.Count;
Task.Run(() =>
{
IPipelineStep<TStepOut> nextPipelineStep = null;
foreach (var input in pipelineStep.Buffer.GetConsumingEnumerable())
{
bool isLastStep = stepIndex == _pipelineSteps.Count - 1;
var output = pipelineStep.StepAction(input);
if (isLastStep)
{
// This is dangerous as the invocation is added to the last step
// Alternatively, you can utilize BeginInvoke like here: https://stackoverflow.com/a/16336361/1229063
Finished?.Invoke((TPipeOut)(object)output);
}
else
{
nextPipelineStep = nextPipelineStep // no need to evaluate more than once
?? (isLastStep ? null : _pipelineSteps[stepIndex + 1] as IPipelineStep<TStepOut>);
nextPipelineStep.Buffer.Add(output);
}
}
});
_pipelineSteps.Add(pipelineStep);
return pipelineStep;
}
}
Usage:
var pipeline = new GenericBCPipeline<string, bool>((inputFirst, builder) =>
inputFirst.Step(builder, input => FindMostCommon(input))
.Step(builder, input => input.Length)
.Step(builder, input => input % 2 == 1));
pipeline.Finished += res => Console.WriteLine(res);
pipeline.Execute("The pipeline pattern is the best pattern");
// 'True' is printed
That’s not the most readable code, I admit, but the result is very effective. As you can see the usage has no castings at all. It also doesn’t require to write types <TInput, TOutput>
on each step. Moreover, the implementation itself doesn’t do any internal casting beyond the initialization.
Here’s the explanation of the code:
GenericBCPipelineStep
represents a pipeline step. It contains theBlockingCollection
input buffer, and the delegate to invoke.IPipelineStep<TStepIn>
is required for a step to add the output to the input of the next step. That’s because each step knows just its own input and output types. It doesn’t know the next step’s output, but it does know its input type, which is its own output.- The extension method
Step
is where the magic happens. This allows to add a step for any input type and output type without needing to specify the types in advance. It’s called just on startup to initialize all the threads andBlockingCollection
buffers. Since it’s a static class, it requires the Pipeline builder itself, hence the need ot pass the builder in each step. List<object> _pipelineSteps
represents all the steps. They have to be objects since we don’t know the generic types in advance. They will be cast toGenericBCPipelineStep<TStepIn, TStepOut>
later, but just once for each step.- The constructor is the one to call all the extension methods. These generate the threads and the buffer.
GenerateStep
is called for each step. It creates a newGenericBCPipelineStep
with the blocking collection buffer. It then creates a new thread which reads from that buffer, invokes the step’s delegate and places the output in the next step’s buffer.
This implementation makes the BlockingCollection
a pretty reasonable choice I think.
Customizations and Optional Requirements
Going back to the requirements, we talked of some optional features. Specifically:
- Allow setting a degree of parallelism for each step. That is the number of maximum dedicated threads.
- Set maximum capacity to each buffer
- Allow to
await
the result of an item entered into the pipeline.
Let’s solve all of them.
Set a degree of parallelism for each step
With BlockingCollection
, we can easily have several handling threads for each step. It’s just a matter of adding more threads with the same code. For simplicity’s sake, I’ll change the first implementation (the one with casting) to have degrees of parallelism:
The differences are marked with the !!!
comments
public class CastingPipelineWithParallelism : IPipeline
{
class Step
{
public Func<object, object> Func { get; set; }
public int DegreeOfParallelism { get; set; }
}
List<Step> _pipelineSteps = new List<Step>();
BlockingCollection<object>[] _buffers;
public event Action<object> Finished;
public void AddStep(Func<object, object> stepFunc, int degreeOfParallelism)
{
// !!! Save the degree of parallelism
_pipelineSteps.Add(new Step() { Func = stepFunc, DegreeOfParallelism = degreeOfParallelism });
}
public void Execute(object input)
{
var first = _buffers[0];
first.Add(input);
}
public IPipeline GetPipeline()
{
_buffers = _pipelineSteps.Select(step => new BlockingCollection<object>()).ToArray();
int bufferIndex = 0;
foreach (var pipelineStep in _pipelineSteps)
{
var bufferIndexLocal = bufferIndex;
// !!! start as many threads as there are degrees of parallelism.
for (int i = 0; i < pipelineStep.DegreeOfParallelism; i++)
{
Task.Run(() => { StartStep(bufferIndexLocal, pipelineStep); });
}
bufferIndex++;
}
return this;
}
private void StartStep(int bufferIndexLocal, Step pipelineStep)
{
foreach (var input in _buffers[bufferIndexLocal].GetConsumingEnumerable())
{
var output = pipelineStep.Func.Invoke(input);
bool isLastStep = bufferIndexLocal == _pipelineSteps.Count - 1;
if (isLastStep)
{
Finished?.Invoke(output);
}
else
{
var next = _buffers[bufferIndexLocal + 1];
next.Add(output);
}
}
}
}
The only difference from the initial implementation is to run as many threads as there are degrees of parallelism.
Set maximum capacity to each buffer
Setting a maximum capacity is also easy because BlockingCollection
supports it natively. Here are the specific changes to make:
class Step
{
public Func<object, object> Func { get; set; }
public int DegreeOfParallelism { get; set; }
public int MaxCapacity { get; set; } // !!!
}
public void AddStep(Func<object, object> stepFunc, int degreeOfParallelism, int maxCapacity)
{
_pipelineSteps.Add(new Step()
{
Func = stepFunc,
DegreeOfParallelism = degreeOfParallelism,
MaxCapacity = maxCapacity // !!!
});
}
public IPipeline GetPipeline()
{
_buffers = _pipelineSteps.Select(step =>
new BlockingCollection<object>(step.MaxCapacity)) // !!!
.ToArray();
// ...
Allow to await
the result of an item entered into the pipeline.
Optimally, we’d like to be able to write var result = await pipeline.Execute(input)
. This will wait for the input to finish all steps of the pipeline asynchronously and return the result.
This is done relatively simply with the TaskCompletionSource
class. This class provides a Task
instance whose result you can set manually. Or an exception if needed. In our case, each item executed by the pipeline will have to keep a TaskCompletionSource
instance with it. Here’s the implementation with the simple CastingPipeline
implementation.
The interface changed since it now returns a Task<TOutput>
and no longer needs an event.
public interface IAwaitablePipeline<TOutput>
{
Task<TOutput> Execute(object input);
}
The new implementation includes all 3 optional features: Degrees of parallelism, Max capacity for each step, and awaitable result:
The differences are marked with the !!!
comments
public class CastingPipelineWithAwait<TOutput> : IAwaitablePipeline<TOutput>
{
class Step
{
public Func<object, object> Func { get; set; }
public int DegreeOfParallelism { get; set; }
public int MaxCapacity { get; set; }
}
// !!! Keeping a TaskCompletionSource to each item
class Item
{
public object Input { get; set; }
public TaskCompletionSource<TOutput> TaskCompletionSource { get; set; }
}
List<Step> _pipelineSteps = new List<Step>();
BlockingCollection<Item>[] _buffers;
public event Action<TOutput> Finished;
public void AddStep(Func<object, object> stepFunc, int degreeOfParallelism, int maxCapacity)
{
_pipelineSteps.Add(new Step() {Func = stepFunc, DegreeOfParallelism = degreeOfParallelism,
MaxCapacity = maxCapacity, });
}
public Task<TOutput> Execute(object input)
{
var first = _buffers[0];
var item = new Item()
{
Input = input,
TaskCompletionSource = new TaskCompletionSource<TOutput>()
};
first.Add(item);
//!!! This Task will return when we manually call item.TaskCompletionSource.SetResult()
return item.TaskCompletionSource.Task;
}
public IAwaitablePipeline<TOutput> GetPipeline()
{
_buffers = _pipelineSteps.Select(step => new BlockingCollection<Item>()).ToArray();
int bufferIndex = 0;
foreach (var pipelineStep in _pipelineSteps)
{
var bufferIndexLocal = bufferIndex;
for (int i = 0; i < pipelineStep.DegreeOfParallelism; i++)
{
Task.Run(() => { StartStep(bufferIndexLocal, pipelineStep); });
}
bufferIndex++;
}
return this;
}
private void StartStep(int bufferIndexLocal, Step pipelineStep)
{
foreach (var input in _buffers[bufferIndexLocal].GetConsumingEnumerable())
{
object output;
try
{
output = pipelineStep.Func.Invoke(input.Input);
}
catch (Exception e)
{
// !!! If an exception happened, we need to set the Task as failed with exception or it will keep waiting
input.TaskCompletionSource.SetException(e);
continue;
}
bool isLastStep = bufferIndexLocal == _pipelineSteps.Count - 1;
if (isLastStep)
{
// !!! Setting Task's result in the last step
input.TaskCompletionSource.SetResult((TOutput)(object)output);
}
else
{
var next = _buffers[bufferIndexLocal + 1];
next.Add(new Item() { Input = output, TaskCompletionSource = input.TaskCompletionSource});
}
}
}
}
For simplicity’s sake, I showed all variations with the simpler CastingPipeline
. However, the same variations can be applied to the GenericBCPipeline
. In fact, I implemented the GenericBCPipelineAwait
, variation which allows awaiting for the result. You can see it on GitHub
as well as all other implementations
shown in this article.
In previous implementations I neglected to handle exceptions. This is actually pretty important since we don’t want an exception to destroy the entire pipeline. The solution is simply to wrap the step’s invocation with try/catch
and place continue;
in the catch
clause. This will continue to the next item in the buffer.
Summary and Coming Up
As you can see, the pipeline pattern is pretty interesting to implement. Specifically, supporting any input type and output type for each step was a major challenge. Besides that, the BlockingCollection
class did most of the work.
In the next parts, we’ll see how to implement a pipeline in other ways like TPL Dataflow, System.Threading.Channels, and some 3rd party libraries I got my eye on. Subscribe to be updated on the next series parts.