Ways of creating multi-threaded applications in .NET Part 3. TPL and PLINQ

This is the third part of the article dedicated to the methods of creating multi-threaded apps in .NET. If you are interested in this topic, then we invite you to read Part 1 and Part 2 first.

This third part is devoted to Task Parallel Library (TPL) and Parallel Language Integrated Query (PLINQ). Though they appeared relatively recently in .NET, they are fully capable of solving complex problems on multi-core processors.

Task Parallel Library (TPL)

Task Parallel Library (TPL) is designed for execution on multi-core processors. It appeared in .NET Framework 4.0 when it became obvious that standard .NET tools for working with threads were not enough to efficiently execute multithreaded programs on multi-core processors. To use TPL’s basic functionality, you only need to add the System.Threading.Tasks namespace to the project.

using System.Threading.Tasks;

This library allows you to perform computationally complex tasks on several processor cores at the same time. Task Parallel Library simplifies the process of creating and destroying threads. The library itself uses a thread pool in its operation. Although apart from TPL, .NET contains many tools for working with threads. But starting with .NET 4.0, Microsoft recommends using TPL for creating multi-threaded applications.

Task class

The Task class is designed to speed up execution of a single, long operation. A task job is executed asynchronously in a separate thread, although TPL supports synchronous execution in the current thread.

Action delegate is passed as a parameter to the Task constructor. This delegate points to a method (function) that has no parameters and does not return a value.

If you’re interested in more, read Microsoft Roslyn – using the compiler as a service

To run a task for execution, the Task.Start() method is used.

When a Task object is executed asynchronously, the method that launched that task does not wait for its completion. Here, you can have such a situation where the method, for example Main, which launched a Task object, has already ended, while the Task object is still executing. To wait until the task is completed in the method that invoked it, the task.Wait() function is invoked.

An array of tasks can be run using the Task.Factory.StartNew() method. Here, we also pass an Action delegate as a parameter. Like the Task constructor, this constructor can take a lambda expression instead of a pointer as a function.

The task.WaitAll() method ensures that the method that launched an array of tasks for execution waits until all tasks are completed.

The Task class supports a number of properties to obtain information about the state of a task being executed:

  • AsyncState – returns the state object supplied when the Task was created;
  • CurrentID – returns the identifier of the currently executing Task;
  • Exception – returns an exception object that occurred during execution of Task;
  • Status – returns the status of the Task.

Tasks can return results. For this purpose, you need to typify the Task class when invoking the constructor of this class.

Task int task1 = new Task int(action);

To get result, you need to invoke the Result property of the Task class object.

int i = task1.Result;

The Task class allows you to create continuation tasks. These tasks will be launched after the tasks that invoked them are completed. To create and run a continuation task, the ContinueWith method needs to be invoked from the task that you want to continue.

Task task2 = task1.ContinueWith(action2);

Thus, by invoking subsequent tasks as continuations of the previous ones, you can build a certain order of execution of tasks.

Parallel class
The Parallel class is a significant part of TPL. It allows you to strongly simplify code parallelization.

The Parallel class has three main methods:

  • Parallel.For
  • Parallel.ForEach
  • Parallel.Invoke

Parallel.Invoke method

The Parallel.Invoke method allows you to parallelize a block of consecutively executed operators.

using System;
using System.Threading.Tasks;
using Threading;

namespace TPLexample
{
    class Program
    {
        static void Factorial(int x)
        {
            int result = 1;
            for (int i = 1; i <= x; i++)
            {
                result *= i;
            }
            Console.WriteLine("Running task {0}", Task.CurrentId);
            Thread.Sleep(5000);
            Console.WriteLine("Result {0}", result;
        }

        static void Display()
        {
            Console.WriteLine("Running task {0}", Task.CurrentId);
            Thread.Sleep(5000);
        }

        static void Main(string[] args)
        {
            Parallel.Invoke(Display,
                () => {
                    Console.WriteLine("Running task {0}", Task.CurrentId);
                    Thread.Sleep(5000);
                },
                () => Factorial(10));

            Console.ReadLine();
        }
    }
}

This method takes an array of Action delegates or lambda functions, separated by a semicolon (see example).

Parallel.Invoke(Display,
            () => {
                Console.WriteLine("Running task {0}", Task.CurrentId);
                Thread.Sleep(5000);
            },
                () => Factorial(10));

These methods can be of any number. They will be automatically converted into Tasks and executed asynchronously and in parallel – based on the number of logical processor cores in the system.

Parallel.For method

The Parallel.For method allows you to execute parallel iterations of loops. The method takes three parameters.

The first parameter is int – the first value of loop.

The second parameter is int – the end value of the loop.

The third parameter is Action – a delegate pointing to a method (function) or lambda expressions, separated by a semicolon. The Action delegate will be executed once per iteration.

using System;
using Threading;
using System.Threading.Tasks;

namespace ForExample
{
    class Program
    {
        static void Factorial(int x)
        {
            int result = 1;
            for (int i = 1; i <= x; i++)
            {
                result *= i;
            }
            Console.WriteLine("Running task {0}", Task.CurrentId);
            Console.WriteLine("Factorial of number {0} = {1}", x, result);
            Thread.Sleep(3000);
        }

        static void Main(string[] args)
        {
            Parallel.For(1, 10, Factorial);

            Console.ReadLine();
        }
    }
}

In the code given above, the factorials of numbers from 1 to 9 are calculated. In this case, factorial calculation operations are performed not sequentially, but in parallel. Therefore, the factorials of numbers are outputted chaotically as parallel factorial calculation operations are completed. The console output example illustrates this:


Figure 1 Calculating the factorials of different numbers in the Parallel.For loop.

Parallel.ForEach method

This method traverses the collection implementing the IEnumerable interface. Just like the foreach operator, but unlike the classical foreach, it performs parallel access to elements in this collection. This method is parameterized and has the following definition:

ParallelLoopResult ForEach<TSource>(IEnumerable<TSource> source, Action<TSource> body);

where the first parameter represents the collection in which enumeration will be made, the second parameter is an Action delegate (or lambda expression), executed once per iteration of the loop for each element of the IEnumerable collection. Parallel.ForEach returns a ParallelLoopResult structure that contains data about execution of a parallelized loop. The following example illustrates the use of Parallel.Foreach.

using System;
using System.Collections.Generic;
using System.Threading
using System.Threading.Tasks;

namespace ForeachExample
{
    class Program
    {
        static void Factorial(int x)
        {
            int result = 1;

            for (int i = 1; i <= x; i++)
            {
                result *= i;
            }
            Console.WriteLine("Running task {0}", Task.CurrentId);
            Console.WriteLine("Factorial of {0} = {1}", x, result);
            Thread.Sleep(5000);
        }

        static void Main(string[] args)
        {
            ParallelLoopResult result = Parallel.ForEach<int>(new List<int>() { 1, 2, 4, 8, 3, 9, 5, 25 },
                Factorial);

            Console.ReadLine();
        }
    }
}

Iterations of the Parallel.Foreach loop are terminated in an order different from the order the numbers in the initial sequence were found. The order of output in the console depends on the execution time of the next iteration of the parallel loop, number of concurrent iterations in the loop, and complexity of calculating the factorial of a number. The more complex the factorial calculation operation is, the longer execution of iteration of the loop as it is found will take, as evidenced by the console output:


Figure 2 Calculating the factorials of numbers in the Parallel.Foreach loop.

Early termination of loop

Just like in classical loops for and foreach, which provide for early exit from the loop using the break operator, the Parallel.For and Parallel.ForEach methods provide for early exit from a loop.

using System;
using System.Threading.Tasks;

namespace ParallelBreak
{
    class Program
    {
        static void Factorial(int x, ParallelLoopState pls)
        {
            int result = 1;

            for (int i = 1; i <= x; i++)
            {
                result *= i;
                if (i == 6)
                    pls.Break();
            }
            Console.WriteLine("Running task {0}", Task.CurrentId);
            Console.WriteLine("Factorial of {0} = {1}", x, result);
        }

        static void Main(string[] args)
        {
            ParallelLoopResult result = Parallel.For(1, 8, Factorial);

            if (!result.IsCompleted)
                Console.WriteLine("Loop ended on iteration number {0}", result.LowestBreakIteration);
            Console.ReadLine();
        }
    }
}

To exit a loop ahead of time, you need to pass the ParallelLoopState class object as a second parameter to the Parallel.ForEach (or Parallel.For) method used as a second parameter (Action delegate). Then, the Break() method of the parallelLoopState object can be invoked anywhere in the code of the function wrapped in this delegate. When running Parallel.ForEach, once the system encounters the Break method, it will exit this loop at the first opportunity in all threads and return the ParallelLoopResult object.

If you’re interested in more, read .NET Core Framework Complete Review

The ParallelLoopResult object returned by the Parallel.For and Parallel.ForEach loops contains two important loop state properties:

  • bool IsCompleted – determines whether the loop completed its work or whether its work was interrupted prematurely;
  • int LowestBreakIteration – returns the smallest index (from the number of indices of iterations being processed in parallel) at which the loop was interrupted.
  • The result of this example is shown in the console output below.


Figure 3 Early termination of the Parallel loop by a command in the loop code.

There is also a way to abort a loop using CancellationToken. And this method works both with Parallel methods and with the tasks represented by Task objects. This is useful when you need to abort an operation that has taken too long or when the delegate passed to the parallel method (Task, TaskFactory, Parallel.For, Parallel.ForEach, Parallel.Invoke) is represented as a lambda function.

To cancel a parallel operation with CancellationToken, you need to:

  1. Connect the System.Threading namespace (in addition to those already existing in the System and System.Threading.Tasks namespaces in the project);
  2. Create an object of the CancellationTokenSource class;
    CancellationTokenSource CTS = new CancellationTokenSource();
  3. Obtain a CancellationToken token from the CancellationTokenSource object;
    CancellationToken token = CTS.Token;
  4. Catch token’s requestion using the following structure:
  5. if (token.IsCancellationRequested)
    {
        Console.WriteLine("Operation interrupted");
        return;
    }

  6. Cancel the operation by invoking the Cancel() method of the CancellationTokenSource class object;
    CTS.Cancel();

The example below illustrates the use of CancellatrionToken.

using System;
using System.Threading;
using System.Threading.Tasks;

namespace ParallelToken
{
    class Program
    {
        static void Main(string[] args)
        {
            CancellationTokenSource CTS = new CancellationTokenSource();
            CancellationToken token = CTS.Token;
            int number = 6;

            Task task1 = new Task(() =>
            {
                int result = 1;
                for (int i = 1; i <= number; i++)
                {
                    if (token.IsCancellationRequested)
                    {
                        Console.WriteLine("Operation interrupted");
                        return;
                    }

                    result *= i;
                    Console.WriteLine("Factorial of {0} = {1}", i, result);
                    Thread.Sleep(5000);
                }
            });
            task1.Start();

            Console.WriteLine("Enter N to cancel the operation or wait for it to finish");
            string s = Console.ReadLine();
            if (s == "N")
            {
                CTS.Cancel();
                Console.WriteLine("Cancelled by user. Press any key to exit");
                Console.ReadKey();
            }

            Console.Read();
        }
    }
}

This example displays the following console output:


Figure 4 Early termination of the Parallel loop with CancellationToken.

CancellationToken can be passed to an external method as an argument:

static void Factorial(int x, CancellationToken token);

In the method itself, you only need to check whether there is already a request to cancel the operation and complete the parallel operation.

if (token.IsCancellationRequested)
{
    Console.WriteLine("Operation interrupted");
    return
}

You can override the Parallel.For() and Parallel.Foreach() methods by adding one more parameter to them – the ParallelOptions class object – in which you can install CancellationToken:

Parallel.ForEach<int>(new List<int>() { 1, 2, 3, 4, 5 }, new ParallelOptions { CancellationToken = token }, Factorial);

But in this case, it will be necessary to catch the operationCancelledException exception, which occurred when the operation was canceled – with the following construction:

try
{
    Parallel.For(1, 5, new ParallelOptions { CancellationToken = token }, Factorial);
}
catch (OperationCanceledException ex)
{
    Console.WriteLine("Operation interrupted");
}
finally
{
    CTS.Dispose();
}

In this case, the parallel loop will be terminated, while the resulting exception will not stop the entire application.

Parallel LINQ (PLINQ)

LINQ was designed as a data query interface, which, based on the collection query results, processes them sequentially. Beginning with .NET 4.0, the ParallelEnumerable class appeared in the System.Linq namespace, allowing you to access the collection in parallel – using the capabilities of all the system’s processors.

However, by default, PLINQ processes data sequentially. Transition to parallel processing occurs if it really leads to faster query data processing.

But, as a rule, in parallel data query operations, there are additional costs. In this case, priority is given to sequential data processing. Therefore, PLINQ is usually applied in very large collections or in complex query operations, where it is really possible to achieve benefits when parallelizing operations.

It should also be taken into account that when sharing access to the same data from multiple threads, access blocking will be enabled, which will also have a big impact on PLINQ performance.

AsParallel() method

This method allows parallelizing a query to a data source. When this method is invoked, the data source is divided into parts (if possible) and then, operations are performed on each part as individual thread.

In fact, this is a normal LINQ query, but the AsParallel() method is also applied to the data source.

static int Factorial(int x)
{
    int result = 1;

    for (int i = 1; i <= x; i++)
    {
        result *= i;
    }
    Console.WriteLine("Factorial of {0} = {1}", x, result);
    return result;
}

static void Main(string[] args)
{
    int[] nums = new int[] { -6, -2, 0, 1, 2, 4, 3, 5, 6, 7, 8, };
    var factorials = from n in nums.AsParallel()
            select Factorial(n);
}

or

var factorials = nums.AsParallel().Select(x => Factorial(x));

ForAll() method

This method optimizes parallel queries even more. An algorithm like Parallel.Foreach is used to output results in this case. But at the same time, when the ForAll() method is used, delays increase during query execution due to assembly of data received from different threads into one set and enumeration of the data in a loop.

The ForAll() method takes an Action delegate or a lambda function as an argument.

int[] nums = new int[] { -6, -2, 0, 1, 2, 4, 3, 5, 6, 7, 8, };
    (from n in nums.AsParallel()
    where n > 0
    select Factorial(n)).ForAll(n => Console.WriteLine(n));

When executing a parallel query, the resulting selection can be constructed as you like and will be unordered. You can apply the LINQ OrderBy() method or the orderby operator, but this method will sort the sample data in an alphabetical order.

var factorials = from n in nums.AsParallel()
    where n > 0
    orderby n
    select Factorial(n);

However, this order will be different from the order in which they were located in the data source. If you want to organize the data according to the original sequence, then the AsOrdered() operator is used. In this case, this sorting will carry additional costs during query execution. If further manipulations on the set ordered by the AsOrdered() method are required, and the ordering itself is no longer required, the AsUnordered method is used.

var factorials = from n in nums.AsParallel().AsOrdered()
    where n > 0
    select Factorial(n);
var query = from n in factorials.AsUnordered()
    where n > 100
    select n;
query.ForAll(n => Console.WriteLine(n));

PLINQ error handling

When a parallel query is executed, the data source is divided into parts, and each part is processed in a separate thread. But if an error occurs in one of the threads, the system will interrupt execution of all threads. This will throw an AgregateException exception. The following code contains not only numbers but also a string in the data source (array). Therefore, an error occurs when you try to calculate the factorial from the row.

object[] nums2 = new object[] { 1, 2, 3, 4, 5, "oops" };

factorials = from n in nums2.AsParallel()
        let x = (int )n
        select Factorial(x);
try
{
    factorials.ForAll(n => Console.WriteLine(n));
}
catch (AggregateException ex)
{
    foreach (var e in ex.InnerExceptions)
    {
        Console.WriteLine(e.Message);
    }
}

Here, the resulting exception is an AggregateException exception, as in the Parallel class methods. This exception should be caught and its InnerExceptions property should be accessed to determine the type of exceptions that occurred.

Early termination of PLINQ queries

In the event that you need to abort an operation being executed by PLINQ before it finishes (for example, by timeout), you can use the WithCancellation() method in the query, which you can pass to CancellationToken as in the example below.

using System;
using System.Linq;
using System.Threading
using System.Threading.Tasks;

namespace PlinqCancel
{
    class Program
    {
        static int Factorial(int x)
        {
            int result = 1;

            for (int i = 1; i <= x; i++)
            {
                result *= i;
            }
            Console.WriteLine("Factorial of {0} = {1}", x, result);
            Thread.Sleep(1000);
            return result;
        }

        static void Main(string[] args)
        {
            CancellationTokenSource cts = new CancellationTokenSource
            new Task(() =>
            {
                Thread.Sleep(500);
                cts.Cancel();
            }).Start();
            try
            {
                int[] numbers = new int[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
                var factorials = from n in
numbers.AsParallel().WithCancellation(cts.Token)
                        select Factorial(n);
                foreach (var n in factorials)
                    Console.WriteLine(n);
            }
            catch (AggregateException ex)
            {
                if (ex.InnerExceptions != null)
                {
                    foreach (Exception e in ex.InnerExceptions)
                        Console.WriteLine(e.Message);
                }
            }
            finally
            {
                cts.Dispose();
            }
            Console.ReadLine();
        }
    }
}

In this example, two threads are started. In the main thread, there is a parallel query with possible early termination.

var factorials = from n in numbers.AsParallel().WithCancellation(cts.Token)
        select Factorial(n);

A parallel query is interrupted (after a certain time has elapsed) as an additional thread created using the Task object.

new Task(() =>
{
    Thread.Sleep(500);
    cts.Cancel();
}).Start();

A console output of the example is shown below.


Figure 5 Early termination of parallel query using CancellationToken.

The cts.Cancel() method, as with the Parallel class, causes the OperationCancelledException exception to be thrown, which must be processed in the try { } catch block, otherwise it will crash the program. The AggregateException exception that will be thrown if any other exception occurs in one of the PLINQ threads should also be handled.

Conclusion

Despite the fact that TPL and PLINQ are relatively new in .NET, they are fully capable of solving complex problems on multi-core processors. TPL, for example, automatically parallelizes tasks between available processor cores, like ThreadPool.

The difference between TPL and ThreadPool is that TPL (like Thread objects) is designed to solve long computationally complex tasks. But if Thread objects need to be created and destroyed manually, then TPL creates threads automatically and exactly as much as is necessary for the most effective solution of the task.

The PLINQ library as a whole is similar to TPL. However, it is optimized for queries to data sources, which cannot always be effectively paralleled.
In the next part of the article, we’ll look at the thread synchronization mechanisms. Stay tuned!

If you are interested in ways of creating multi-threaded applications in .NET, we invite you to read Part 1 and Part 2.