Parallel executed Tasks with isolated scopes

My current customers infrastructure is heavily dependent on external suppliers of data. Because of the nature of the data the system often have to-do the requests in real time while the end-customer is waiting for the response. Parallel tasks comes in handy when you want to aggregate data from several end points, both because it puts less strain on the Thread Pool and that your response time will be faster because you do not need to wait for each to complete (Parallel vs Sequential).

The problem starts with frameworks that does not play nice with sharing their resources over multiple Tasks/Threads, an example of this is the Entity Framework DbContext. One way is to marshall the lifetime of the context yourself and spawn one for each parallel task. But this is not a solid design, if you use a IOC you want any object in the current graph to receive the same instance of the DbContext without bothering with lifetime code. I created a little class called TaskRunner for this purpose

It utilizes an interface called IScopedContext that I wrote about in a blog post found here!


public class TaskRunner<TScope> : ITaskRunner<TScope> where TScope : class
{
	private readonly Func<IScopedContext<TScope>> scopeFactory;

	public TaskRunner(Func<IScopedContext<TScope>> scopeFactory)
	{
		this.scopeFactory = scopeFactory;
	}

	public async Task<IEnumerable<TResult>> RunParallel<TSource, TResult>(IEnumerable<TSource> source, Func<TScope, TSource, Task<TResult>> func)
	{
		return await Task.WhenAll(source.Select(async s =>
		{
			using(var scope = scopeFactory())
			{
				return await func(scope.EntryPoint, s);
			}
		}));
	}

	public Task RunParallel<TSource>(IEnumerable<TSource> source, Func<TScope, TSource, Task> action)
	{
		return RunParallel(source, async (scope, data) =>
		{
			await action(scope, data);
			return true;
		});
	}
}

RunParallel takes a collection of data to process and a Func that will be given the correct scope, for example a DbContext. It should return what ever TResult dictates. We then create the scope for you using my IScopedContext implementation covered earlier and then pass that to the Func. Last but not least we create a Task that will complete when all Tasks are completed. Example usage

public class ListProductsQueryHandler : IQueryHandler<ListProductsQuery, IEnumerable<Product>>
{
	private readonly ITaskRunner<IService> runner;

	public ListProductsQueryHandler(ITaskRunner<IService> runner)
	{
		this.runner = runner;
	}

	public async Task<IEnumerable<Product>> Handle(ListProductsQuery query)
	{
		return await runner.RunParallel(query.Systems, (service, systemId) => service.GetProduct(systemId));
	}
}
Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s