In the First Part of the series, we talked about the Pipeline Pattern in programming. Also known as the Pipes and Filters design pattern. The first part showed how to implement a Multi-Threaded pipeline with BlockingCollection. In this part, you’ll see how to implement such a pipeline with TPL Dataflow .
In a pipeline, each step accepts an input and produces an output. The output of one step is the input of the next one. In a Multi-Threaded Pipeline, each step of the pipeline is executed on a separate thread (or threads). There’s a buffer (Queue) between each pipeline element to store step results while the next element is still not able to receive them. The buffers might be limited in size.
TPL Dataflow is part of Microsoft’s Task Parallel Library (TPL)
. It’s a powerful and flexible library that implements the Dataflow pattern
, which is broader than the pipeline pattern
. That is, the pipeline pattern is a subset of the Dataflow pattern. TPL Dataflow is thread-safe, supports different levels of parallelism, bound capacity and has async/await
API. This supposedly makes TPL Dataflow a perfect candidate to implement the multi-threaded pipeline. Let’s test this theory.
Requirements
In the previous part we defined a pipeline example scenario and a list of requirements our implementation should fulfill. Let’s recap.
In the example pipeline scenario, our pipeline example accepts a string, finds the most common word, counts its number of characters, and checks if that number is Odd. Something like this:
string input = "The pipeline pattern is the best pattern";
var pipeline = CreatePipeline();
bool result = await pipeline.Execute(input);
// Returns 'True' because 'pattern' is the most common,
// it has 7 characters and it's an odd number
When creating the pipeline, in a perfect world, it should look something like this:
public Pipeline CreatePipeline()
{
return new Pipeline()
.AddStep(str => FindMostCommonWord(str))
.AddStep(mostCommonWord => mostCommonWord.Length)
.AddStep(length => /*isOdd */ length % 2 == 1);
}
Since we want our pipeline to be robust and customizable, we also have meet a bunch of requirements. Those are:
- Allow to create steps with lambda functions
- Have each pipeline element execute on a dedicated thread. Have a buffer (queue) between the pipeline elements for items that finished with one step and waiting for the next step.
- Enable to use custom input and output types in the delegates of each step. That is, without need to be casting from
object
in each delegate. - (Optionally) Allow setting a degree of parallelism for each step. That is the number of maximum dedicated threads.
- (Optionally) Set maximum capacity to each buffer
- (Optionally) Allow to await the result of an item entered into the pipeline with
async/await
API.
Let’s see if TPL Dataflow is up to the task.
First implementation with TPL Dataflow
To get started with TPL Dataflow, we need to add the System.Threading.Tasks.Dataflow NuGet .
TPL Dataflow’s TransformBlock<TInput, TOutput>
is perfect to represent each step. It accepts an input of any type, runs a delegate on it, and returns an output of any other type. The TransformBlock
holds 2 buffers – one for the inputs and one for the outputs.
To do something with the result after all step are finished, we can use ActionBlock
, which simply invokes a custom delegate on any input. This block holds 1 buffer for its inputs.
Here’s the code:
public static TransformBlock<string,string> CreatePipeline(Action<bool> resultCallback)
{
var step1 = new TransformBlock<string, string>((sentence) => FindMostCommon(sentence));
var step2 = new TransformBlock<string, int>((word) => word.Length);
var step3 = new TransformBlock<int, bool>((length) => length % 2 == 1);
var callbackStep = new ActionBlock<bool>(resultCallback);
step1.LinkTo(step2, new DataflowLinkOptions());
step2.LinkTo(step3, new DataflowLinkOptions());
step3.LinkTo(callbackStep);
return step1;
}
Usage:
var pipeline = TPLDataflowPipelineSimple.CreatePipeline(
resultCallback: res => Console.WriteLine(res));
pipeline.Post("The pipeline pattern is the best pattern");
Looking back at the implementations
with BlockingCollection
, this is much a simpler code. Everything is intuitive and straightforward. +1 for TPL Dataflow so far.
Let’s try to improve this by adding all the missing features and optional requirements. Those are:
- Levels of parallelism for each step
- Bound capacity for each step
- Await the result with
async/await
API instead of using a callback. - Exceptions are not handled. An exception will destroy the pipeline.
Changing level of parallelism for each step and bound capacity
With TPL Dataflow, changing parallelism level and max capacity is as simple as changing a couple of properties:
public static TransformBlock<string, string> CreatePipeline(Action<bool> resultCallback)
{
var step1 = new TransformBlock<string, string>((sentence) => FindMostCommon(sentence),
new ExecutionDataflowBlockOptions()
{
MaxDegreeOfParallelism = 3,
BoundedCapacity = 5,
});
var step2 = new TransformBlock<string, int>((word) => word.Length,
new ExecutionDataflowBlockOptions()
{
MaxDegreeOfParallelism = 1,
BoundedCapacity = 13,
});
var step3 = new TransformBlock<int, bool>((length) =>length % 2 == 1,
new ExecutionDataflowBlockOptions()
{
MaxDegreeOfParallelism = 11,
BoundedCapacity = 6,
});
var callBackStep = new ActionBlock<bool>(resultCallback);
step1.LinkTo(step2, new DataflowLinkOptions());
step2.LinkTo(step3, new DataflowLinkOptions());
step3.LinkTo(callBackStep);
return step1;
}
The only change is in the usage, where you can optionally wait asynchronously to post a new item, due to the bound capacity:
await pipeline.SendAsync("The pipeline pattern is the best pattern");
If the first step is full, this will wait asynchronously until there’s an opening and the input is entered into the pipeline. So far, TPL Dataflow continues to be easy enough.
Awaiting the result
Although we can await
for an item to be inserted into the first step of the pipeline, we can’t actually await
for it to finish all the following steps and get the result. Optimally, the usage should be:
bool res = await pipeline.Execute("The pipeline pattern is the best pattern"); // True
Like in Part 1
, we can make use of the TaskCompletionSource
class. This class provides a Task instance whose result you can set manually. Or an exception if needed. In our case, each item executed by the pipeline will have to keep a TaskCompletionSource instance with it.
Here’s a first attempt at an implementation which allows to await
for the result:
public class TC<TInput, TOutput>
{
public TC(TInput input, TaskCompletionSource<TOutput> tcs)
{
Input = input;
TaskCompletionSource = tcs;
}
public TInput Input { get; set; }
public TaskCompletionSource<TOutput> TaskCompletionSource{ get; set; }
}
public class TPLDataflowPipelineWithAwaitAttempt1
{
public static TransformBlock<TC<string, bool>, TC<string, bool>> CreatePipeline()
{
var step1 = new TransformBlock<TC<string, bool>, TC<string, bool>>((tc) =>
new TC<string,bool> (FindMostCommon(tc.Input), tc.TaskCompletionSource));
var step2 = new TransformBlock<TC<string, bool>, TC<int, bool>>((tc) =>
new TC<int,bool> (tc.Input.Length, tc.TaskCompletionSource));
var step3 = new TransformBlock<TC<int, bool>, TC<bool, bool>>((tc) =>
new TC<bool,bool> (tc.Input % 2 == 1, tc.TaskCompletionSource));
var setResultStep = new ActionBlock<TC<bool, bool>>((tc) => tc.TaskCompletionSource.SetResult(tc.Input));
step1.LinkTo(step2, new DataflowLinkOptions());
step2.LinkTo(step3, new DataflowLinkOptions());
step3.LinkTo(setResultStep, new DataflowLinkOptions());
return step1;
}
}
Usage:
var tcs = new TaskCompletionSource<bool>();
var tc = new TC<string, bool>(
"The pipeline patter is the best patter", tcs);
var task = tcs.Task;
await pipeline.SendAsync(tc);
var result = await task;
Console.WriteLine(result);
Explanation:
- The class
TC<TInput, TOutput>
holds the current step’s input and the entire pipeline’s output. That’s because theTaskCompletionSource
should be of the pipeline’s output, not the step’s. I called itTC
just to keep the code more convenient to read. I suppose a Tuple can be used here instead of a class. - Each step is a
TransformBlock
that runs on the input of theTC
and produces a newTC
withTInput
being the next’s step’s input type. - The last step is an
ActionBlock
which invokesSetResult
to have the Task end with a result.
This works but the syntax is pretty terrible here. Creating each pipeline is going to be a nightmare, and the execution usage is not great either.
With some effort though, we can create a pipeline builder which makes pipeline creation usage and execution usage much more pleasant.
Awaiting the result with a pleasant Usage + Handling Exceptions
I removed levels of parallelism and bound capacity for better readability, but they should be easy to add.
public class TC<TInput, TOutput>
{
public TC(TInput input, TaskCompletionSource<TOutput> tcs)
{
Input = input;
TaskCompletionSource = tcs;
}
public TInput Input { get; set; }
public TaskCompletionSource<TOutput> TaskCompletionSource{ get; set; }
}
public class TPLPipelineWithAwaitAttempt2<TIn, TOut>
{
private List<IDataflowBlock> _transformBlocks = new List<IDataflowBlock>();
public TPLPipelineWithAwaitAttempt2<TIn, TOut> AddStep<TLocalIn, TLocalOut>(Func<TLocalIn, TLocalOut> stepFunc)
{
var step = new TransformBlock<TC<TLocalIn, TOut>, TC<TLocalOut, TOut>>((tc) =>
{
try
{
return new TC<TLocalOut, TOut>(stepFunc(tc.Input), tc.TaskCompletionSource);
}
catch (Exception e)
{
tc.TaskCompletionSource.SetException(e);
return new TC<TLocalOut, TOut>(default(TLocalOut), tc.TaskCompletionSource);
}
});
if (_transformBlocks.Count > 0)
{
var lastStep = _transformBlocks.Last();
var targetBlock = (lastStep as ISourceBlock<TC<TLocalIn, TOut>>);
targetBlock.LinkTo(step, new DataflowLinkOptions(),
tc => !tc.TaskCompletionSource.Task.IsFaulted);
targetBlock.LinkTo(DataflowBlock.NullTarget<TC<TLocalIn, TOut>>(), new DataflowLinkOptions(),
tc => tc.TaskCompletionSource.Task.IsFaulted);
}
_transformBlocks.Add(step);
return this;
}
public TPLPipelineWithAwaitAttempt2<TIn, TOut> CreatePipeline()
{
var setResultStep =
new ActionBlock<TC<TOut, TOut>>((tc) => tc.TaskCompletionSource.SetResult(tc.Input));
var lastStep = _transformBlocks.Last();
var setResultBlock = (lastStep as ISourceBlock<TC<TOut, TOut>>);
setResultBlock.LinkTo(setResultStep);
return this;
}
public Task<TOut> Execute(TIn input)
{
var firstStep = _transformBlocks[0] as ITargetBlock<TC<TIn, TOut>>;
var tcs = new TaskCompletionSource<TOut>();
firstStep.SendAsync(new TC<TIn, TOut>(input, tcs));
return tcs.Task;
}
}
Usage:
var pipeline = new TPLPipelineWithAwaitAttempt2<string, bool>()
.AddStep<string, string>(sentence => FindMostCommon(sentence))
.AddStep<string, int>(word => word.Length)
.AddStep<int, bool>(length => length % 2 == 1)
.CreatePipeline();
// ...
var result = await pipeline.Execute("The pipeline pattern is the best pattern");
Console.WriteLine(result); // True
So the builder’s code is nowhere near being more pleasant, I admit. But it doesn’t matter because we have to write it just once. The usage now becomes pretty much perfect. We can add pipeline steps in a type-safe fashion without casting. We can use lambda functions. And this code even handles exceptions. Beautiful.
Explanation:
- The new implementation is pretty similar to the first attempt.
- This class uses
TC<TIn, TOut>
like before. - When a step is added with
AddStep
, it creates a newTransformBlock
. This block transforms the input of theTC
and produces a newTC
withTInput
being the next’s step’s input type. - Each delegate’s execution
stepFunc(tc.Input)
is surrounded bytry/catch
. If an exception is thrown, we usetaskCompletionSource.SetException
to set the Task as faulted. Later, the step is linked with a condition inLinkTo
. If the Task is Faulted, it will go toNullTarget
, which means nowhere. If it isn’t faulted, it will continue to the next step. We have to link faulted tasks somewhere, otherwise they will block theTransformBlock
‘s output buffer forever. - When
CreatePipeline
is called to complete the pipeline construction, an additional block is added. This last blocksetResultBlock
invokesSetResult
to mark the Task as finished.
All the implementations, including the last one are available on GitHub
Improving Usage
In theory, we can improve the usage of creating the pipeline even further. From this:
var pipeline = new TPLPipelineWithAwaitAttempt2<string, bool>()
.AddStep<string, string>(sentence => FindMostCommon(sentence))
.AddStep<string, int>(word => word.Length)
.AddStep<int, bool>(length => length % 2 == 1)
.CreatePipeline();
To this:
var pipeline = new TPLPipelineWithAwaitAttempt2<string, bool>((inputFirst, builder) =>
inputFirst
.AddStep(builder, input => FindMostCommon(input))
.AddStep(builder, input => input.Length)
.AddStep(builder, input => input % 2 == 1));
The difference here is that we don’t even have to specify the generics. This can be achieved by changing AddStep
to an extension method that accepts the input of the previous step and produces an output. If you missed the first part of the series and want to see some C# trickery, check out a similar implementation
with BlockingCollection
. And if you’re up for a challenge, implement it yourself and send me a pull request for an additional implementation on GitHub
. I’ll give you credit and include this in the article.
Summary and Coming up
Like with the BlockingCollection
implementation
, a perfect solution was reached with TPL Dataflow. By that I mean that all requirements were met and the usage is easy enough. I wouldn’t call this implementation easy by any means, but I’d say it was easier in comparison with BlockingCollection
. In that regard, TPL Dataflow has the upper hand.
Part 3 of the series is now ready
In the following parts, I’ll try do some pipeline implementation attempts with System.Threading.Channels (which I expect to succeed), Reactive Extensions (which I expect to fail), and possibly other, less known libraries. Subscribe to be updated on the next series parts.