3

I have a Producer/Consumer implementation where the number of consumers is configurable (this is a form of configurable throttling). The producer and consumer are kicked off like this:

var cts = new CancellationTokenSource();
var processCancelToken = cts.Token;

Task.Run(() => Parallel.Invoke(new ParallelOptions() { CancellationToken = processCancelToken }
                               , producer
                               , consumer
                               )
               , processCancelToken);

The producer action is quite simple and just populates a BlockingCollection with objects that are derived from System.Threading.Tasks.Task (yes it is possible!). Here is a simplified example:

var pollingInterval = 30000;

var producer = new Action(() =>
{
    Random rnd = new Random(DateTime.Now.Second);
    while (!processCancelToken.IsCancellationRequested)
    {
        for (int ii = 0; ii < 10; ii++)
        {
            var r = rnd.Next(2, 15);
            _mainQueue.Add(new Task(() =>
            {
                //this is a dummy task for illustrative purposes
                Console.WriteLine("        Queued task starting, set to sleep {0} seconds, ID: {1}", r, Thread.CurrentThread.ManagedThreadId); 
                Thread.Sleep(r*1000);
            }));
            Console.WriteLine("    Producer has added task to queue");

        }
        System.Threading.Thread.Sleep(pollingInterval);
    }
    Console.WriteLine("Exiting producer");
}

For this example it creates an anonymous task that sleeps for a random number of seconds between 2 and 15. In the real code this producer polls the database, extracts data entities representing work items, then transforms those into executable tasks that are added to the collection.

I then have a consumer task that uses a Parallel.For() to start n instances of an anonymous action which then dequeues a task from the collection, then starts and waits on the task, then repeats:

var numberConsumerThreads = 3;

var consumer = new Action(() =>
{
    Parallel.For(0, numberConsumerThreads, (x) =>
    {
        //this action should continue to dequeue work items until it is cancelled
        while (!processCancelToken.IsCancellationRequested)
        {
            var dequeuedTask = _mainQueue.Take(processCancelToken);
            Console.WriteLine("   Consumer #{0} has taken task from the queue", Thread.CurrentThread.ManagedThreadId);

            dequeuedTask.Start();
            while (!processCancelToken.IsCancellationRequested)
            {
                if (dequeuedTask.Wait(500))
                    break;
                Console.WriteLine("   Consumer #{0} task wait elapsed", Thread.CurrentThread.ManagedThreadId);
            }
        }
        Console.WriteLine("Exiting consumer #{0}", Thread.CurrentThread.ManagedThreadId);
    });
}

The question: is this an efficient way to start and operate an arbitrary number of consumers? Or is there a more efficient way of using PLINQ from within the main consumer action that both continues to execute queued tasks, blocks while there isn't any, and can still be cancelled using processCancelToken?

(Note: processCancelToken is operated separately to cancel tokens contained within the queued tasks - they are independently cancelable. This all runs within a Windows service and processCancelToken is used to cancel everything if the service is stopped).

user833115
  • 131
  • 1
  • What do you mean by "efficient?" – Robert Harvey Apr 23 '14 at 04:33
  • Quick note: my understanding of Parallel.For is that it only guarantees to run once for each of the indices in the given range, but not necessarily all at the same time. From your code it seems that you are trying to spawn exactly X consumers rather than up to X consumers. Did I perhaps miss something? At any rate, have you considered making the producer a custom IEnumerable that yields with tasks as it creates them? This would then give you the ability to use a Parallel.ForEach directly on the producer, allowing for custom PLINQ partitioners, etc. - beware buffering though. – J Trana Apr 23 '14 at 05:00
  • @RobertHarvey I mean "is the current way inefficient?". The three actions are run on the threadpool, they in turn cause another three threadpool threads to be used when they start a dequeued task. I couldn't figure out how to use Parallel.ForEach against the BlockingCollection in a way that kept going as the collection had more items added. – user833115 Apr 23 '14 at 10:13
  • @JTrana Yes, it was my intention to start just the given number of consumers, I know it's abusing the Parallel.For() somewhat. I'm happy for consumers to potentially be idle. I like your idea but I'm not sure how I'd incorporate a custom IEnumerable into the current action? – user833115 Apr 23 '14 at 10:19
  • `BlockingCollection` already has such custom `IEnumerable`: [`GetConsumingEnumerable()`](http://msdn.microsoft.com/en-us/library/dd287186). But [it doesn't work well with `Parallel.ForEach()`](http://blogs.msdn.com/b/pfxteam/archive/2010/04/06/9990420.aspx). – svick Apr 23 '14 at 11:16
  • Thanks @svick that's why I ended up with the current code I have. I will experiment with the BlockingCollectionPartitioner at the link you mentioned and see how well it works - if it does it will halve the number of threads the consumers use. – user833115 Apr 23 '14 at 12:45

0 Answers0