TPL Tutorial Part I

This needed to be either a multi-part series or the longest blog post you’ve ever seen, so stay tuned for follow up posts! Also, all of the code that you see towards the end of the post can be found here. Hope you enjoy the post!

If I had to pick a favorite development-related task, it would be an opportunity to do a hard rewrite of some application that’s been around for a while. It’s typically cheaper to throw more infrastructure at a problem than it is to rewrite, so when I get the opportunity to do the latter, I relish it.

I recently had the great joy of being able to “file new” a data processing application for one of our clients. The processor was responsible for running a number of different jobs that maintained an internal database using external data from SAP. Some of the jobs were run nightly, while others were callable by the client through a web interface. Our original processor worked for the most part, but then came the scope creep. When we started, we had two nightly jobs that would run per project per night; over time that increased to five or six jobs. We also went from having ten projects to twenty, then fifty, then one hundred. Next thing we knew we had so many projects with so many jobs that the nightly process became more of a “start in the night and finish some time the next day” process.

Long story short, we did the math. We realized we were approaching the point in time when we would spend twenty-four hours a day running jobs and still fall further behind. It was rewrite time! Casting some famous advice aside, we set out to redesign and rewrite the nightly processor and the nightly jobs. Before we started coding, we sat down to think through some of the changes we wanted the new system to have.

First, we needed to shift how each individual job was composed. We had started chaining jobs together in a makeshift fashion, but we needed a better way to do that. Each job needed to be a separate unit of work for clarity, auditing purposes, and because having a “LoadTheProjectAndUpdateTheProjectDefinitionAndLoadActuals” job floating around wasn’t going to cut it. On the other hand, we also needed to be able to join individual jobs together as building blocks to create larger, dynamic jobs that were catered to each project’s needs.

Second, we needed a to take advantage of asynchronous coding. We had a little asynchronous coding in the older jobs at the obvious bottlenecks, but we hadn’t fully adopted it into the project yet.

Third, and perhaps most importantly, we needed to build our system in a way the business side would understand. It’s a stereotype that the business and technical teams speak different languages. Layers of abstraction can be very powerful when building a system, but they can also abstract away the real-world use case that the system is being built to handle. In my experience this is the largest source of bugs in a business application. It isn’t a missing semicolon or a brain fart from a developer – it is a failure to communicate from the business to the developer on what needs to happen and a lack of transparency from the developer back to the business on what actually is happening. We wanted the layout of the new system to be something we could discuss with non-technical people using everyday terms.

Enter: The Microsoft Dataflow Library

Microsoft has a fantastic Nuget package named System.Threading.Tasks.Dataflow. There’s a lot going on in the package, but the just of it is this:

The TPL Dataflow Library provides a foundation for message passing and parallelizing CPU-intensive and I/O-intensive applications that have high throughput and low latency.

https://docs.microsoft.com/en-us/dotnet/standard/parallel-programming/dataflow-task-parallel-library

Sounds great, right? The concept behind the dataflow package is to create a pipeline of connected blocks to represent your workflow. You push everything into the front of your pipeline, wait for the last block to signal that it is complete, and then you’re done! This fit what we were looking to do almost perfectly. Going back through our list of three items that we wanted to address.

Job Composition

One great thing about the dataflow model is you can build a new block that is composed of smaller blocks. We could build each of our jobs as a pipeline of blocks. Then, when multiple jobs needed to be run in succession, we could write a wrapper that would present each job as an individual block that we could connect to other blocks to make a pipeline of jobs. We no longer had to track each individual job and report the overall status of the job pipeline as the combination of the statuses of the individual jobs.

Asynchronous Coding

The TPL library enables you to fine tune your concurrency right out the gate. The dataflow blocks have a setting that determines how many inputs they can process at one point in time. Have a section of a job that needs to be super concurrent? Just up the setting! Have a section that you want to run synchronously? Drop the setting back to one! This fine tuned control was also helpful when we went from debugging on a local machine, when we want to run things synchronously to track problems down, to when we published everything to a beefed up client machine. We could update a configuration file and turn all of our synchronous blocks into asynchronous ones without changing a line of code.

Another great point with the dataflow blocks is that each of them can be given an individual action to complete with or without the async/await keywords. This means that we could section off the async/await portions without having to convert all of our other logic, aka legacy libraries, to use the keywords as well.

System Modeling

Nested for loops and if/then statements work, but try to draw that logic out in a business meeting and you’re going to get weird looks. These things don’t exist in the “real world”, so people focused on the real world are going to be disinterested in it. When we talk to our clients about processes what we typically get are explanations or, better yet, drawings of workflows that need to happen. Something like this probably looks familiar to all of you.

With dataflow blocks, you actually build your system out like this. Instead of classes and functions, you have blocks that do actions or push an input along a path based on a condition. Less differences between the business world and the developer world result in less “lost in translation” bugs. Our documentation for the jobs now consist of these types of workflows listed out, and each block has its own name. If the client says “we want to change this block to check for this condition instead of what it’s doing now”, we no longer have to try to translate that into a class/function. We go straight to that block in the code using the name and update it.

The Implementation Details

There’s a lot more that goes into the system we actually ended up building, but before we can get to that we need to start with some basics of the TPL library and, more specifically, the dataflow blocks. I’ve mentioned “blocks” enough that we should start there.

A dataflow block is a compartmentalized section of logic that can receive and/or emit an object. You have two basic types of blocks: source blocks and target blocks. In the package these are represented by the ISourceBlock and ITargetBlock interfaces. Source blocks can output objects, target blocks can receive objects, and a block that is both a source and a target can receive an input and push out an output.

The simplest prebuilt block that the dataflow package has is probably the ActionBlock<T>. An ActionBlock is basically a block wrapping an Action with an input of T. Once you initialize it, you can feed the block as many T’s as you want, and the block will run the action against all of the input objects. Since it receives blocks but does not emit any, we know it implements the ITargetBlock interface and not the ISourceBlock one. Let’s look at an example.

public static class ActionBlockTutorial
 {
     public struct Settings 
     {
         public const int MaxParallelCount = 1;
     };

     public static async Task RunSampleOne<T>(Action<T> action, IEnumerable<T> items) 
     {
         var block = new ActionBlock<T>(action, new ExecutionDataflowBlockOptions
         {
             MaxDegreeOfParallelism = Settings.MaxParallelCount
         });
         foreach (var item in items) 
         {
              block.Post(item);
         }
         block.Complete();
         await block.Completion;
     }
}

In this example we take an action and a list of items that need that action run against them. About halfway down you see us create a new instance of an ActionBlock and give it a maximum parallel count. This setting determines how many input items can be processed at once. After we create the block, we post each of our items to it one by one. We then signal to the block that it is done getting new items by calling its Complete function. Finally, we wait for the block to signal back it is done processing each of the items.

We only have one block in this example, but when you have more blocks the series of events is basically the same with just one extra step – linking your blocks. You’ll see this sequence of events over and over again:

  1. Initialize blocks
  2. Connect blocks to form pipeline
  3. Post items to initial block
  4. Signal to the initial block that it is done getting new items
  5. Wait for the final block to signal back it is complete

I’ve written a unit test to validate that the RunSampleOne function does what you’d expect: given an action and a collection of objects, run the action with each of the input objects. To begin, I have a factory function that will generate a sample input. The input interface ISampleInput is straightforward. It has a single Id property and an asynchronous Process function.

public interface ISampleInput
{
    int Id { get; }
    Task Process();
}

I also have a factory function that will generate a mock of the ISampleInput object using the Moq package. It’s important to use a mock here so that the unit test can verify that the Process method was called and completed on each of the inputs.

static Random rando = new Random();
private static int _count = 1;

private Mock<ISampleInput> MockInputFactoryCreate()
{
    var localCount = _count++;

    var mockInput = new Mock<ISampleInput>();

    // Set ID
    mockInput
        .SetupGet(x => x.Id)
        .Returns(localCount);

    // When Process called sleep for a random amount so jobs will complete in a different order
    mockInput
        .Setup(input => input.Process())
        .Returns(() =>
        {
            Thread.Sleep(rando.Next(0, 500));
            Log($"Processing item {mockInput.Object.Id}");
            return Task.CompletedTask;
        });

    return mockInput;
}

With these in place, the unit test is straight forward. We create a list of inputs using our factory method. We call our RunSampleOne function and pass it the inputs and an action that, given an input, will asynchronously call the Process method on that input. Finally, we loop through the inputs and verify that each of them had the Process method called.

static void Log(string msg) => Console.Error.WriteLine(msg);

[Test]
public async Task TestSampleOne()
{
    var inputs = new[] { MockInputFactoryCreate(), MockInputFactoryCreate(), MockInputFactoryCreate() };

    await ActionBlockTutorial.RunSampleOne(
        action: async (input) => await input.Process(),
        items: inputs.Select(input => input.Object)
    );

    // Verify process was called
    foreach (var input in inputs)
    {
        input.Verify(obj => obj.Process(), $"Input {input.Object.Id} did not get processed");
    }
}

Closing Thoughts

If you run this you will see that the order of the log statements is not sequential, so we know we are processing the inputs concurrently. If you drop the max degree of parallelism from four to one and rerun it you will see the log messages showing the jobs process in sequential order.

If you step through the RunSampleOne function by putting a breakpoint in the “Process” implementation and a breakpoint in the RunSampleOne function you will see that all of the items are posted to the job before any of the Process calls begin to run. If your posting logic has delays (e.g. put a Task.Delay call in the foreach loop) your blocks will begin to process on another thread while you are still posting new inputs.

While you’ve got breakpoints set, examine your ActionBlock as you post to it.You will see that the block has an InputQueue that is incremented as each block is posted and is decremented as each block is processed. There are other properties on this block that can be helpful when you are debugging these pipelines as well. Most noticeably are the IsCompleted and IsDecliningPermanently properties. We will talk about those next time!

That will wrap up this first post. This post was a little light on the code, but next time we will jump right into what a full-blown pipeline looks like after exploring a couple more block types that the TPL library comes predefined with. If you’ve got any questions or want to see something specific in the Dataflow package, let us know in the comments!

Christopher Thompson

Related Posts