In Part 1
we talked about what are Job Queues in C#, when they should be used and how to implement them. We saw several great implementations using BlockingCollection
and the thread-pool.
While those are great, it’s worth knowing other implementations. Depending on your needs, another type of solution will suit you better. Luckily, we have a lot of great options with C#. In this article, we’ll see a couple of good ones: Reactive Extensions and System.Threading.Channels.
Implementing a Job Queue with Reactive Extensions
Reactive Extensions (Rx) is just awesome, right? It’s one of those technologies that take some head-scratching to figure out but once you implement something it just looks beautiful. I’ll assume you already know Rx, use it and love it.
So let’s plan our Rx implementation of Job Queues, starting with the requirements:
- We want to have an Enqueue method to add jobs.
- Each job will execute as soon as possible
- The jobs will execute one after the other in a FIFO order
- We want the jobs to execute in a background single thread.
- For simplicity, our jobs will be strings printed to Console.
Sounds like it’s perfect for Rx observables, right? Just OnNext
the jobs and subscribe to execute. Let’s try that.
Rx implementation attempt #1
After adding the NuGet Install-Package System.Reactive
, I tried this code first of all:
public class RxQueueWithSubject
{
Subject<string> _jobs = new Subject<string>();
public RxQueueWithSubject()
{
_jobs.
Subscribe(job =>
{
Console.WriteLine(job);
});
}
public void Enqueue(string job)
{
_jobs.OnNext(job);
}
}</string></string>
Seems pretty straightforward, but will it work?
The answer is no. No it won’t. The reason is that this implementation will be synchronous. On each call to OnNext
, the handling will execute immediately on the same thread. In other words, the call to Enqueue
will return only after the job is executed.
We need to tell the subscriber to handle the jobs on a different thread. We need a Scheduler.
Rx implementation with Scheduler
public class RxQueueWithScheduler
{
Subject<string> _jobs = new Subject<string>();
public RxQueueWithScheduler()
{
_jobs.ObserveOn(Scheduler.Default)
.Subscribe(job =>
{
Console.WriteLine(job);
});
}
public void Enqueue(string job)
{
_jobs.OnNext(job);
}
}</string></string>
Seems pretty similar, right? And it is, except for the addition of .ObserveOn(Scheduler.Default)
. That means we create an IObservable
that will be scheduled with the default scheduler. What is the default scheduler though?
A scheduler decides how to execute tasks. In WPF, the scheduler of the main thread will add tasks to the Dispatcher-Queue. The default scheduler, however, will run the task on the thread-pool, which is just what we want (more on default task scheduler and synchronization context).
What’s nice about this implementation is that it’s not a dedicated thread. So when the job queue is empty, there’s no thread in use.
Rx implementation of Publisher/Subscriber
Here we start seeing the beauty of Rx.
A common use case of Job Queues is for different types of jobs to be executed by different handlers. For example, we can have 2 types of jobs:
- Job A will print a global number to console.
- Job B will add 1 to the global number.
So we can write the following code:
myQueue.Enqueue(new JobA());//print
myQueue.Enqueue(new JobB());//add
myQueue.Enqueue(new JobA());//print
myQueue.Enqueue(new JobB());//add
myQueue.Enqueue(new JobB());//add
myQueue.Enqueue(new JobA());//print
And the result should be (assuming the counter starts with 0): 0 1 3
.
Note that in this specific case the order is important. We can’t handle a job before the previous job is finished, even if it’s a job of a different type. This is not always the requirement but I wanted to show you this is an option.
Here’s the implementation:
public interface IJob
{
}
public class RxQueuePubSub
{
Subject<ijob> _jobs = new Subject<ijob>();
private IConnectableObservable<ijob> _connectableObservable;
public RxQueuePubSub()
{
_connectableObservable = _jobs.ObserveOn(Scheduler.Default).Publish();
_connectableObservable.Connect();
}
public void Enqueue(IJob job)
{
_jobs.OnNext(job);
}
public void RegisterHandler<t>(Action<t> handleAction) where T : IJob
{
_connectableObservable.OfType<t>().Subscribe(handleAction);
}
}</t></t></t></ijob></ijob></ijob>
Usage:
class JobA : IJob
{
}
class JobB : IJob
{
}
public static class Global
{
public static int Counter = 0;
}
...
public void Start()
{
var q = new RxQueuePubSub();
q.RegisterHandler<joba>(j => Console.WriteLine(Global.Counter));
q.RegisterHandler<jobb>(j => Global.Counter++);
q.Enqueue(new JobA());//print
q.Enqueue(new JobB());//add
q.Enqueue(new JobA());//print
q.Enqueue(new JobB());//add
q.Enqueue(new JobB());//add
q.Enqueue(new JobA());//print
}</jobb></joba>
This will give the wanted result.
Did you notice the difference of this implementation from the previous one? We used .Publish()
in _jobs.ObserveOn(Scheduler.Default).Publish()
to create a connectable observable that shares a single subscription in the sequence. Without this addition, each call to .Subscribe
would create its own sequence with the scheduler creating their own pooled thread for each job type. In other words, we would have 2 different threads handling the queue, which would break the FIFO order.
Here, we have a single thread handling jobs, even if it’s for 2 different handlers.
Note that we can easily change the functionality by omitting the call to .Publish()
. This will allow the 2 subscribers to work independently on 2 different threads. On each job in the queue, the relevant subscriber will execute the job once available.
Additional Sources for Rx:
- Reactive Framework as Message queue using BlockingCollection (StackOverflow)
- How to do proper Producer-Consumer pattern with RX (StackOverflow)
The Verdict of Job Queue Implementations with Reactive Extensions
Rx provides can provide very nice implementations for Job Queues when dealing with simple publisher/subscriber type of use cases. However, I’ve had a lot of trouble implementing some customizations of Job Queues. Specifically prioritizing queues and multiple thread handlers.
So my advice is to use Rx for Job Queues when:
- You need a simple Job Queue with a single thread-pool handler.
- You need a simple Job Queue with different handlers for different job types (publisher/subscriber). If your requirements require customizations beyond the basic use case, you might run into limitations.
And don’t use Rx when:
- You need to implement prioritization of jobs
- You want handlers on multiple threads
- You want a more customized implementation of producer/consumer like interacting with an external message broker (like Azure Service Bus or Rabbit MQ).
I’m not saying you can’t do all those things with Rx. I’m just saying Rx is not the natural fit for those and you’ll have an easier time with System.Threading.Channels (coming next), TPL Dataflow (part 3
) or BlockingCollection
(see Part1
).
System.Threading.Channels
System.Threading.Channels
is a library that provides excellent functionality for producer/consumer problems. It revolves around the Channel
class, which provides a Reader and Writer. Everything is done asynchronously and there’s inherent support for Bounds.
Let’s see a basic Job Queue implementation with a dedicated thread. You’ll need to add the Nuget System.Threading.Channels:
public class ChannelsQueue
{
private ChannelWriter<string> _writer;
public ChannelsQueue()
{
var channel = Channel.CreateUnbounded<string>();
var reader = channel.Reader;
_writer = channel.Writer;
Task.Factory.StartNew(async () =>
{
// Wait while channel is not empty and still not completed
while (await reader.WaitToReadAsync())
{
var job = await reader.ReadAsync();
Console.WriteLine(job);
}
}, TaskCreationOptions.LongRunning);
}
public async Task Enqueue(string job)
{
await _writer.WriteAsync(job);
}
public void Stop()
{
_writer.Complete();
}
}</string></string>
As you can see, it’s very straightforward. It reminds me a bit of ConcurrentQueue
, but it’s really much more.
For one thing, it has a fully asynchronous API. It has blocking functionality with WaitToReadAsync
, where it will wait on an empty channel until a job is added to the channel or until writer.Complete()
is called.
It also has Bound capabilities, where the channel has a limit. When the limit is reached, the WriteAsync
task waits until the channel can add the given job. That’s why Write is a Task
.
Let’s see how we can change this to for some common Job Queue customizations.
Handle on Multiple Threads
public class ChannelsQueueMultiThreads
{
private ChannelWriter<string> _writer;
public ChannelsQueueMultiThreads(int threads)
{
var channel = Channel.CreateUnbounded<string>();
var reader = channel.Reader;
_writer = channel.Writer;
for (int i = 0; i < threads; i++)
{
var threadId = i;
Task.Factory.StartNew(async () =>
{
// Wait while channel is not empty and still not completed
while (await reader.WaitToReadAsync())
{
var job = await reader.ReadAsync();
Console.WriteLine(job);
}
}, TaskCreationOptions.LongRunning);
}
}
public void Enqueue(string job)
{
_writer.WriteAsync(job).GetAwaiter().GetResult();
}
public void Stop()
{
_writer.Complete();
}
}</string></string>
In the above implementation, you can define how many dedicate threads will handle the jobs. They are dedicated threads, so when the job queue is empty they are just hanging there.
Implementing publisher/subscriber with System.Threading.Channels
The following code is a bit clunky, but it does the job:
public interface IJob
{
}
public class ChannelsQueuePubSub
{
private ChannelWriter<ijob> _writer;
private Dictionary<type>> _handlers = new Dictionary<type action="">>();
public ChannelsQueuePubSub()
{
var channel = Channel.CreateUnbounded<ijob>();
var reader = channel.Reader;
_writer = channel.Writer;
Task.Factory.StartNew(async () =>
{
// Wait while channel is not empty and still not completed
while (await reader.WaitToReadAsync())
{
var job = await reader.ReadAsync();
bool handlerExists =
_handlers.TryGetValue(job.GetType(), out Action<ijob> value);
if (handlerExists)
{
value.Invoke(job);
}
}
}, TaskCreationOptions.LongRunning);
}
public void RegisterHandler<t>(Action<t> handleAction) where T : IJob
{
Action<ijob> actionWrapper = (job) => handleAction((T)job);
_handlers.Add(typeof(T), actionWrapper);
}
public async Task Enqueue(IJob job)
{
await _writer.WriteAsync(job);
}
public void Stop()
{
_writer.Complete();
}
}</ijob></t></t></ijob></ijob></type></type></ijob>
Usage:
class JobA : IJob
{
}
class JobB : IJob
{
}
public class Global
{
public static int Counter = 0;
}
...
public async Task Start()
{
var q = new ChannelsQueuePubSub();
q.RegisterHandler<joba>(j => Console.WriteLine(Global.Counter));
q.RegisterHandler<jobb>(j => Global.Counter++);
await q.Enqueue(new JobA());//print
await q.Enqueue(new JobB());//add
await q.Enqueue(new JobA());//print
await q.Enqueue(new JobB());//add
await q.Enqueue(new JobB());//add
await q.Enqueue(new JobA());//print
}</jobb></joba>
The result will be (assuming the counter starts with 0): 0 1 3
.
As you can see, I had to create a little wrapper for the Action<ijob></ijob>
handler. I guess you can’t compete with Rx when it comes to subscribing to events. If you can think of a nicer way to do this, share the code in the comments section.
The Verdict on System.Threading.Channels
I really like this programming model. It’s clean and very straightforward, in contrast to Rx, though maybe not as pretty.
I think the advantages of System.Threading.Channels
are its asynchronous features and Bound capabilities. You should use it when:
- You want a simple straightforward job queue.
- You want to have one or more dedicated threads for handling the queue.
- You want to limit the queue for whatever reason. This will provide an effective asynchronous API for that.
And you shouldn’t use it when:
- You don’t want dedicated threads for queue handlers.
- You need to implement prioritization (in that case, the BlockingCollection implementation from Part 1 is best).
Thanks to Mark who commented on part 1 and brought this library to my attention.
Additional Sources for System.Threading.Channels:
Summary
Continuing our Job Queue journey, we saw 2 more models to implement job queues: Reactive Extensions and System.Threading.Channels. Both got the job done and proved to be pretty powerful for different purposes. Rx is more suited for publisher/subscriber queues and Channels for dedicated threads and Bound queues.
This turned out to be a 3-part series after all since we still need to check out TPL Dataflow. Also, in the next part , we’ll talk about failure handling and the importance of the Poison Queue. You can subscribe to the blog newsletter so as not to miss the next part of the series.
Happy coding.