Pipeline Pattern in C# (part 2) with TPL Dataflow

Pipeline with TPL dataflow

In the First Part of the series, we talked about the Pipeline Pattern in programming. Also known as the Pipes and Filters design pattern. The first part showed how to implement a Multi-Threaded pipeline with BlockingCollection. In this part, you’ll see how to implement such a pipeline with TPL Dataflow.

In a pipeline, each step accepts an input and produces an output. The output of one step is the input of the next one. In a Multi-Threaded Pipeline, each step of the pipeline is executed on a separate thread (or threads). There’s a buffer (Queue) between each pipeline element to store step results while the next element is still not able to receive them. The buffers might be limited in size.

TPL Dataflow is part of Microsoft’s Task Parallel Library (TPL). It’s a powerful and flexible library that implements the Dataflow pattern, which is broader than the pipeline pattern. That is, the pipeline pattern is a subset of the Dataflow pattern. TPL Dataflow is thread-safe, supports different levels of parallelism, bound capacity and has async/await API. This supposedly makes TPL Dataflow a perfect candidate to implement the multi-threaded pipeline. Let’s test this theory.

Requirements

In the previous part we defined a pipeline example scenario and a list of requirements our implementation should fulfill. Let’s recap.

In the example pipeline scenario, our pipeline example accepts a string, finds the most common word, counts its number of characters, and checks if that number is Odd. Something like this:

When creating the pipeline, in a perfect world, it should look something like this:

Since we want our pipeline to be robust and customizable, we also have meet a bunch of requirements. Those are:

  • Allow to create steps with lambda functions
  • Have each pipeline element execute on a dedicated thread. Have a buffer (queue) between the pipeline elements for items that finished with one step and waiting for the next step.
  • Enable to use custom input and output types in the delegates of each step. That is, without need to be casting from object in each delegate.
  • (Optionally) Allow setting a degree of parallelism for each step. That is the number of maximum dedicated threads.
  • (Optionally) Set maximum capacity to each buffer
  • (Optionally) Allow to await the result of an item entered into the pipeline with async/await API.

Let’s see if TPL Dataflow is up to the task.

First implementation with TPL Dataflow

To get started with TPL Dataflow, we need to add the System.Threading.Tasks.Dataflow NuGet.

TPL Dataflow’s TransformBlock<TInput, TOutput> is perfect to represent each step. It accepts an input of any type, runs a delegate on it, and returns an output of any other type. The TransformBlock holds 2 buffers – one for the inputs and one for the outputs.

To do something with the result after all step are finished, we can use ActionBlock, which simply invokes a custom delegate on any input. This block holds 1 buffer for its inputs.

Here’s the code:

Usage:

Looking back at the implementations with BlockingCollection, this is much a simpler code. Everything is intuitive and straightforward. +1 for TPL Dataflow so far.

Let’s try to improve this by adding all the missing features and optional requirements. Those are:

  • Levels of parallelism for each step
  • Bound capacity for each step
  • Await the result with async/await API instead of using a callback.
  • Exceptions are not handled. An exception will destroy the pipeline.

Changing level of parallelism for each step and bound capacity

With TPL Dataflow, changing parallelism level and max capacity is as simple as changing a couple of properties:

The only change is in the usage, where you can optionally wait asynchronously to post a new item, due to the bound capacity:

If the first step is full, this will wait asynchronously until there’s an opening and the input is entered into the pipeline. So far, TPL Dataflow continues to be easy enough.

Awaiting the result

Although we can await for an item to be inserted into the first step of the pipeline, we can’t actually await for it to finish all the following steps and get the result. Optimally, the usage should be:

Like in Part 1, we can make use of the TaskCompletionSource class. This class provides a Task instance whose result you can set manually. Or an exception if needed. In our case, each item executed by the pipeline will have to keep a TaskCompletionSource instance with it.

Here’s a first attempt at an implementation which allows to await for the result:

Usage:

Explanation:

  • The class TC<TInput, TOutput> holds the current step’s input and the entire pipeline’s output. That’s because the TaskCompletionSource should be of the pipeline’s output, not the step’s. I called it TC just to keep the code more convenient to read. I suppose a Tuple can be used here instead of a class.
  • Each step is a TransformBlock that runs on the input of the TC and produces a new TC with TInput being the next’s step’s input type.
  • The last step is an ActionBlock which invokes SetResult to have the Task end with a result.

This works but the syntax is pretty terrible here. Creating each pipeline is going to be a nightmare, and the execution usage is not great either.

With some effort though, we can create a pipeline builder which makes pipeline creation usage and execution usage much more pleasant.

Awaiting the result with a pleasant Usage + Handling Exceptions

I removed levels of parallelism and bound capacity for better readability, but they should be easy to add.

Usage:

So the builder’s code is nowhere near being more pleasant, I admit. But it doesn’t matter because we have to write it just once. The usage now becomes pretty much perfect. We can add pipeline steps in a type-safe fashion without casting. We can use lambda functions. And this code even handles exceptions. Beautiful.

Explanation:

  • The new implementation is pretty similar to the first attempt.
  • This class uses TC<TIn, TOut> like before.
  • When a step is added with AddStep, it creates a new TransformBlock. This block transforms the input of the TC and produces a new TC with TInput being the next’s step’s input type.
  • Each delegate’s execution stepFunc(tc.Input) is surrounded by try/catch. If an exception is thrown, we use taskCompletionSource.SetException to set the Task as faulted. Later, the step is linked with a condition in LinkTo. If the Task is Faulted, it will go to NullTarget, which means nowhere. If it isn’t faulted, it will continue to the next step. We have to link faulted tasks somewhere, otherwise they will block the TransformBlock‘s output buffer forever.
  • When CreatePipeline is called to complete the pipeline construction, an additional block is added. This last block setResultBlock invokes SetResult to mark the Task as finished.

All the implementations, including the last one are available on GitHub

Improving Usage

In theory, we can improve the usage of creating the pipeline even further. From this:

To this:

The difference here is that we don’t even have to specify the generics. This can be achieved by changing AddStep to an extension method that accepts the input of the previous step and produces an output. If you missed the first part of the series and want to see some C# trickery, check out a similar implementation with BlockingCollection. And if you’re up for a challenge, implement it yourself and send me a pull request for an additional implementation on GitHub. I’ll give you credit and include this in the article.

Summary and Coming up

Like with the BlockingCollection implementation, a perfect solution was reached with TPL Dataflow. By that I mean that all requirements were met and the usage is easy enough. I wouldn’t call this implementation easy by any means, but I’d say it was easier in comparison with BlockingCollection. In that regard, TPL Dataflow has the upper hand.

Part 3 of the series is now ready

In the following parts, I’ll try do some pipeline implementation attempts with System.Threading.Channels (which I expect to succeed), Reactive Extensions (which I expect to fail), and possibly other, less known libraries. Subscribe to be updated on the next series parts.

Share:

Enjoy the blog? I would love you to subscribe! Performance Optimizations in C#: 10 Best Practices (exclusive article)

12 thoughts on “Pipeline Pattern in C# (part 2) with TPL Dataflow”

  1. That was a quick follow-up, still need to check it in detail. But I’m following this closely πŸ˜€

    Great post!

  2. Cool article!
    Maybe I am getting ahead of things here but why do you expect Reactive Extensions to fail?

    1. Thanks! I’m not sure really why I thought so at the time. Maybe I could make it work, but I feel it was made for different purposes, so it will be a somewhat unnatural usage.

  3. Thats a really good article!
    What if I would like to pass some function as step that returns Task.
    Let’s say FindMostCommon retruns Task and I prefer it to look like this:
    .AddStep<string, string>(async sentence => await FindMostCommon(sentence))
    not like:
    .AddStep<string, string>(sentence => FindMostCommon(sentence).Result)
    How would be best to change the implementation of AddStep method?

    1. Thanks.
      That’s a great question and I’ll need to do some experimentation to give you a good answer. I’ll address this in one of the next parts of the series, so keep following πŸ™‚

  4. francis Irungu Munene

    Hi Michael,

    Working on implementing this var pipeline = new TPLPipelineWithAwaitAttempt2<string, bool>((inputFirst, builder) =>
    inputFirst
    .AddStep(builder, input => FindMostCommon(input))
    .AddStep(builder, input => input.Length)
    .AddStep(builder, input => input % 2 == 1));

    However public Task Execute(TIn input)
    {
    var firstStep = pipelineSteps[0] as ITargetBlock<DefaultTaskCompletionSource<TIn, TOut>>;
    var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
    firstStep.SendAsync(new DefaultTaskCompletionSource<TIn, TOut>(input, tcs));
    return tcs.Task;
    } fails as var firstStep = pipelineSteps[0] as ITargetBlock<DefaultTaskCompletionSource<TIn, TOut>>; is always returning null. Any pointers??

  5. michaelwolfenden

    When an exception is thrown in one of the steps:

    var pipeline = new TPLPipelineWithAwaitAttempt2<string, bool>()
    .AddStep<string, bool>(_ => throw new Exception(“something went wrong”))
    .CreatePipeline();

    I’m seeing an InvalidOperationException – An attempt was made to transition a task to a final state when it had already completed.

    I’m assuming this is because when an exception is throw, the cancelationToken is marked as faulted (completed) in the catch and then set as completed again in the setResultStep.

    1. You’re right Michael. I looked now at the code and I do consider exceptions everywhere except for the last step. If it’s on the last step it will throw an InvalidOperation.
      The solution is in this code:

      [code lang=text]
      var setResultStep =
      new ActionBlock<TC<TOut, TOut>>((tc) => tc.TaskCompletionSource.SetResult(tc.Input));
      var lastStep = _transformBlocks.Last();
      var setResultBlock = (lastStep as ISourceBlock<TC<TOut, TOut>>);
      setResultBlock.LinkTo(setResultStep);
      return this;
      [/code]

      should change to something like this:

      [code lang=text]
      var setResultStep =
      new ActionBlock<TC<TOut, TOut>>((tc) => tc.TaskCompletionSource.SetResult(tc.Input));
      var lastStep = _transformBlocks.Last();
      var setResultBlock = (lastStep as ISourceBlock<TC<TOut, TOut>>);
      setResultBlock.LinkTo(setResultStep, tc => !tc.TaskCompletionSource.Task.IsFaulted);
      setResultBlock.LinkTo(DataflowBlock.NullTarget<TC<TLocalIn, TOut>>(), new DataflowLinkOptions(),
      tc => tc.TaskCompletionSource.Task.IsFaulted);
      return this;
      [/code]

      I’ll update when I have time, but also a pull request is welcome πŸ™‚

Comments are closed.