In the First Part of the series, we talked about the Pipeline Pattern in programming. Also known as the Pipes and Filters design pattern. The first part showed how to implement a Multi-Threaded pipeline with BlockingCollection. In this part, you’ll see how to implement such a pipeline with TPL Dataflow .

In a pipeline, each step accepts an input and produces an output. The output of one step is the input of the next one. In a Multi-Threaded Pipeline, each step of the pipeline is executed on a separate thread (or threads). There’s a buffer (Queue) between each pipeline element to store step results while the next element is still not able to receive them. The buffers might be limited in size.

TPL Dataflow is part of Microsoft’s Task Parallel Library (TPL) . It’s a powerful and flexible library that implements the Dataflow pattern , which is broader than the pipeline pattern . That is, the pipeline pattern is a subset of the Dataflow pattern. TPL Dataflow is thread-safe, supports different levels of parallelism, bound capacity and has async/await API. This supposedly makes TPL Dataflow a perfect candidate to implement the multi-threaded pipeline. Let’s test this theory.

Requirements

In the previous part we defined a pipeline example scenario and a list of requirements our implementation should fulfill. Let’s recap.

In the example pipeline scenario, our pipeline example accepts a string, finds the most common word, counts its number of characters, and checks if that number is Odd. Something like this:

string input = "The pipeline pattern is the best pattern";
 
var pipeline = CreatePipeline();
bool result = await pipeline.Execute(input); 
// Returns 'True' because 'pattern' is the most common, 
// it has 7 characters and it's an odd number

When creating the pipeline, in a perfect world, it should look something like this:

public Pipeline CreatePipeline()
{
    return new Pipeline()
        .AddStep(str => FindMostCommonWord(str))
        .AddStep(mostCommonWord => mostCommonWord.Length)
        .AddStep(length => /*isOdd */ length % 2 == 1);
}

Since we want our pipeline to be robust and customizable, we also have meet a bunch of requirements. Those are:

  • Allow to create steps with lambda functions
  • Have each pipeline element execute on a dedicated thread. Have a buffer (queue) between the pipeline elements for items that finished with one step and waiting for the next step.
  • Enable to use custom input and output types in the delegates of each step. That is, without need to be casting from object in each delegate.
  • (Optionally) Allow setting a degree of parallelism for each step. That is the number of maximum dedicated threads.
  • (Optionally) Set maximum capacity to each buffer
  • (Optionally) Allow to await the result of an item entered into the pipeline with async/await API.

Let’s see if TPL Dataflow is up to the task.

First implementation with TPL Dataflow

To get started with TPL Dataflow, we need to add the System.Threading.Tasks.Dataflow NuGet .

TPL Dataflow’s TransformBlock<TInput, TOutput> is perfect to represent each step. It accepts an input of any type, runs a delegate on it, and returns an output of any other type. The TransformBlock holds 2 buffers – one for the inputs and one for the outputs.

To do something with the result after all step are finished, we can use ActionBlock, which simply invokes a custom delegate on any input. This block holds 1 buffer for its inputs.

Here’s the code:

public static TransformBlock<string,string> CreatePipeline(Action<bool> resultCallback)
{
    var step1 = new TransformBlock<string, string>((sentence) => FindMostCommon(sentence));
    var step2 = new TransformBlock<string, int>((word) => word.Length);
    var step3 = new TransformBlock<int, bool>((length) => length % 2 == 1);
    var callbackStep = new ActionBlock<bool>(resultCallback);
    step1.LinkTo(step2, new DataflowLinkOptions());
    step2.LinkTo(step3, new DataflowLinkOptions());
    step3.LinkTo(callbackStep);
    return step1;
}

Usage:

var pipeline = TPLDataflowPipelineSimple.CreatePipeline(
    resultCallback: res => Console.WriteLine(res));
pipeline.Post("The pipeline pattern is the best pattern");

Looking back at the implementations with BlockingCollection, this is much a simpler code. Everything is intuitive and straightforward. +1 for TPL Dataflow so far.

Let’s try to improve this by adding all the missing features and optional requirements. Those are:

  • Levels of parallelism for each step
  • Bound capacity for each step
  • Await the result with async/await API instead of using a callback.
  • Exceptions are not handled. An exception will destroy the pipeline.

Changing level of parallelism for each step and bound capacity

With TPL Dataflow, changing parallelism level and max capacity is as simple as changing a couple of properties:

public static TransformBlock<string, string> CreatePipeline(Action<bool> resultCallback)
{
    var step1 = new TransformBlock<string, string>((sentence) => FindMostCommon(sentence), 
        new ExecutionDataflowBlockOptions()
        {
            MaxDegreeOfParallelism = 3,
            BoundedCapacity = 5,
        });
    var step2 = new TransformBlock<string, int>((word) => word.Length, 
        new ExecutionDataflowBlockOptions()
        {
            MaxDegreeOfParallelism = 1,
            BoundedCapacity = 13,
        });
    var step3 = new TransformBlock<int, bool>((length) =>length % 2 == 1, 
        new ExecutionDataflowBlockOptions()
        {
            MaxDegreeOfParallelism = 11,
            BoundedCapacity = 6,
        });
    var callBackStep = new ActionBlock<bool>(resultCallback);
    step1.LinkTo(step2, new DataflowLinkOptions());
    step2.LinkTo(step3, new DataflowLinkOptions());
    step3.LinkTo(callBackStep);
    return step1;
}

The only change is in the usage, where you can optionally wait asynchronously to post a new item, due to the bound capacity:

await pipeline.SendAsync("The pipeline pattern is the best pattern");

If the first step is full, this will wait asynchronously until there’s an opening and the input is entered into the pipeline. So far, TPL Dataflow continues to be easy enough.

Awaiting the result

Although we can await for an item to be inserted into the first step of the pipeline, we can’t actually await for it to finish all the following steps and get the result. Optimally, the usage should be:

bool res = await pipeline.Execute("The pipeline pattern is the best pattern"); // True

Like in Part 1 , we can make use of the TaskCompletionSource class. This class provides a Task instance whose result you can set manually. Or an exception if needed. In our case, each item executed by the pipeline will have to keep a TaskCompletionSource instance with it.

Here’s a first attempt at an implementation which allows to await for the result:

public class TC<TInput, TOutput>
{
    public TC(TInput input, TaskCompletionSource<TOutput> tcs)
    {
        Input = input;
        TaskCompletionSource = tcs;
    }
    public TInput Input { get; set; } 
    public TaskCompletionSource<TOutput> TaskCompletionSource{ get; set; }
}

public class TPLDataflowPipelineWithAwaitAttempt1
{
    public static TransformBlock<TC<string, bool>, TC<string, bool>> CreatePipeline()
    {
        var step1 = new TransformBlock<TC<string, bool>, TC<string, bool>>((tc) => 
            new TC<string,bool> (FindMostCommon(tc.Input), tc.TaskCompletionSource));

        var step2 = new TransformBlock<TC<string, bool>, TC<int, bool>>((tc) =>
            new TC<int,bool> (tc.Input.Length, tc.TaskCompletionSource));

        var step3 = new TransformBlock<TC<int, bool>, TC<bool, bool>>((tc) =>
            new TC<bool,bool> (tc.Input % 2 == 1, tc.TaskCompletionSource));

        var setResultStep = new ActionBlock<TC<bool, bool>>((tc) => tc.TaskCompletionSource.SetResult(tc.Input));
        
        step1.LinkTo(step2, new DataflowLinkOptions());
        step2.LinkTo(step3, new DataflowLinkOptions());
        step3.LinkTo(setResultStep, new DataflowLinkOptions());
        return step1;
    }
}

Usage:

var tcs = new TaskCompletionSource<bool>();
var tc = new TC<string, bool>(
    "The pipeline patter is the best patter", tcs);
var task = tcs.Task;
await pipeline.SendAsync(tc);
var result = await task;
Console.WriteLine(result);

Explanation:

  • The class TC<TInput, TOutput> holds the current step’s input and the entire pipeline’s output. That’s because the TaskCompletionSource should be of the pipeline’s output, not the step’s. I called it TC just to keep the code more convenient to read. I suppose a Tuple can be used here instead of a class.
  • Each step is a TransformBlock that runs on the input of the TC and produces a new TC with TInput being the next’s step’s input type.
  • The last step is an ActionBlock which invokes SetResult to have the Task end with a result.

This works but the syntax is pretty terrible here. Creating each pipeline is going to be a nightmare, and the execution usage is not great either.

With some effort though, we can create a pipeline builder which makes pipeline creation usage and execution usage much more pleasant.

Awaiting the result with a pleasant Usage + Handling Exceptions

I removed levels of parallelism and bound capacity for better readability, but they should be easy to add.

public class TC<TInput, TOutput>
{
    public TC(TInput input, TaskCompletionSource<TOutput> tcs)
    {
        Input = input;
        TaskCompletionSource = tcs;
    }
    public TInput Input { get; set; } 
    public TaskCompletionSource<TOutput> TaskCompletionSource{ get; set; }
}

public class TPLPipelineWithAwaitAttempt2<TIn, TOut>
{
    private List<IDataflowBlock> _transformBlocks = new List<IDataflowBlock>();
    public TPLPipelineWithAwaitAttempt2<TIn, TOut> AddStep<TLocalIn, TLocalOut>(Func<TLocalIn, TLocalOut> stepFunc)
    {
        var step = new TransformBlock<TC<TLocalIn, TOut>, TC<TLocalOut, TOut>>((tc) =>
            {
                try
                {
                    return new TC<TLocalOut, TOut>(stepFunc(tc.Input), tc.TaskCompletionSource);
                }
                catch (Exception e)
                {
                    tc.TaskCompletionSource.SetException(e);
                    return new TC<TLocalOut, TOut>(default(TLocalOut), tc.TaskCompletionSource);
                }
            });
        
        if (_transformBlocks.Count > 0)
        {
            var lastStep = _transformBlocks.Last();
            var targetBlock = (lastStep as ISourceBlock<TC<TLocalIn, TOut>>);
            targetBlock.LinkTo(step, new DataflowLinkOptions(), 
                tc => !tc.TaskCompletionSource.Task.IsFaulted);
            targetBlock.LinkTo(DataflowBlock.NullTarget<TC<TLocalIn, TOut>>(), new DataflowLinkOptions(), 
                tc => tc.TaskCompletionSource.Task.IsFaulted);
        }
        _transformBlocks.Add(step);
        return this;
    }

    public TPLPipelineWithAwaitAttempt2<TIn, TOut> CreatePipeline()
    {
        var setResultStep =
            new ActionBlock<TC<TOut, TOut>>((tc) => tc.TaskCompletionSource.SetResult(tc.Input));
        var lastStep = _transformBlocks.Last();
        var setResultBlock = (lastStep as ISourceBlock<TC<TOut, TOut>>);
        setResultBlock.LinkTo(setResultStep);
        return this;
    }

    public Task<TOut> Execute(TIn input)
    {
        var firstStep = _transformBlocks[0] as ITargetBlock<TC<TIn, TOut>>;
        var tcs = new TaskCompletionSource<TOut>();
        firstStep.SendAsync(new TC<TIn, TOut>(input, tcs));
        return tcs.Task;
    }
}

Usage:

var pipeline = new TPLPipelineWithAwaitAttempt2<string, bool>()
    .AddStep<string, string>(sentence => FindMostCommon(sentence))
    .AddStep<string, int>(word => word.Length)
    .AddStep<int, bool>(length => length % 2 == 1)
    .CreatePipeline();

// ...

var result = await pipeline.Execute("The pipeline pattern is the best pattern");
Console.WriteLine(result); // True

So the builder’s code is nowhere near being more pleasant, I admit. But it doesn’t matter because we have to write it just once. The usage now becomes pretty much perfect. We can add pipeline steps in a type-safe fashion without casting. We can use lambda functions. And this code even handles exceptions. Beautiful.

Explanation:

  • The new implementation is pretty similar to the first attempt.
  • This class uses TC<TIn, TOut> like before.
  • When a step is added with AddStep, it creates a new TransformBlock. This block transforms the input of the TC and produces a new TC with TInput being the next’s step’s input type.
  • Each delegate’s execution stepFunc(tc.Input) is surrounded by try/catch. If an exception is thrown, we use taskCompletionSource.SetException to set the Task as faulted. Later, the step is linked with a condition in LinkTo. If the Task is Faulted, it will go to NullTarget, which means nowhere. If it isn’t faulted, it will continue to the next step. We have to link faulted tasks somewhere, otherwise they will block the TransformBlock‘s output buffer forever.
  • When CreatePipeline is called to complete the pipeline construction, an additional block is added. This last block setResultBlock invokes SetResult to mark the Task as finished.

All the implementations, including the last one are available on GitHub

Improving Usage

In theory, we can improve the usage of creating the pipeline even further. From this:

var pipeline = new TPLPipelineWithAwaitAttempt2<string, bool>()
    .AddStep<string, string>(sentence => FindMostCommon(sentence))
    .AddStep<string, int>(word => word.Length)
    .AddStep<int, bool>(length => length % 2 == 1)
    .CreatePipeline();

To this:

var pipeline = new TPLPipelineWithAwaitAttempt2<string, bool>((inputFirst, builder) =>
    inputFirst
        .AddStep(builder, input => FindMostCommon(input))
        .AddStep(builder, input => input.Length)
        .AddStep(builder, input => input % 2 == 1));

The difference here is that we don’t even have to specify the generics. This can be achieved by changing AddStep to an extension method that accepts the input of the previous step and produces an output. If you missed the first part of the series and want to see some C# trickery, check out a similar implementation with BlockingCollection. And if you’re up for a challenge, implement it yourself and send me a pull request for an additional implementation on GitHub . I’ll give you credit and include this in the article.

Summary and Coming up

Like with the BlockingCollection implementation , a perfect solution was reached with TPL Dataflow. By that I mean that all requirements were met and the usage is easy enough. I wouldn’t call this implementation easy by any means, but I’d say it was easier in comparison with BlockingCollection. In that regard, TPL Dataflow has the upper hand.

Part 3 of the series is now ready

In the following parts, I’ll try do some pipeline implementation attempts with System.Threading.Channels (which I expect to succeed), Reactive Extensions (which I expect to fail), and possibly other, less known libraries. Subscribe to be updated on the next series parts.