I wrote a blog about replacing the timeout based test harness with a semaphore driven one here. This made things much more robust when you want blackbox type testing, fire a number of events and wait until all have been digested and their spawned child events are digested.
This worked well and robust. But it still used the Masstransit harness for hosting. This made the InMemory bus more than twice as slow as hosting Masstransit in a service, including database I/O so probably a lot slower when only looking at bus performance.
But it’s pretty easy hosting Masstransit from a none service project like a test project. Instead of configuring with AddMassTransitTestHarness use the standard AddMasstransit extension method. Now events will not be consumed when you publish them, this is because the IHostedService haven’t been started. So that’s an easy fix. If we base the code on the IHarness from my previous blog post.
public Harness(IEnumerable<IHostedService> services) { _services = services; } public async Task Start() { var source = new CancellationTokenSource(); foreach (var service in _services) await service.StartAsync(source.Token); } public async Task Stop() { var source = new CancellationTokenSource(); foreach (var service in _services) await service.StopAsync(source.Token); }
Call Start from your test setup and stop from your test teardown. This will start the background workers for Masstransit and make sure it listens and consumes events. The service will not work unless you add logging to your IoC config.
new ServiceCollection() .AddLogging();
Coupled with the harness-code from previous blog post you now have a very robust and fast test harness. Full code below
public static class HarnessSetup { public static void ConfigureTestHarness(this IInMemoryBusFactoryConfigurator bus, IBusRegistrationContext ctx) { bus.UseConsumeFilter(typeof(ConsumeFilter<>), ctx); bus.UsePublishFilter(typeof(PublishFilter<>), ctx); bus.ConnectPublishObserver(new PublishObserver()); } public static IServiceCollection AddTestHarness(this IServiceCollection services) { services.AddSingleton<IHarness, Harness>(); return services; } private class PublishObserver : IPublishObserver { public static readonly ConcurrentDictionary<object, Action<Guid>> PublishHandlers = new(); public Task PrePublish<T>(PublishContext<T> context) where T : class { if (!PublishHandlers.ContainsKey(context.Message)) { return Task.CompletedTask; } PublishHandlers[context.Message](context.MessageId!.Value); while (!PublishHandlers.Remove(context.Message, out _)) { } return Task.CompletedTask; } public Task PostPublish<T>(PublishContext<T> context) where T : class { return Task.CompletedTask; } public Task PublishFault<T>(PublishContext<T> context, Exception exception) where T : class { return Task.CompletedTask; } } private class PublishFilter<TMessage> : IFilter<PublishContext<TMessage>> where TMessage : class { private readonly IHarness _harness; public PublishFilter(IHarness harness) { _harness = harness; } public Task Send(PublishContext<TMessage> context, IPipe<PublishContext<TMessage>> next) { PublishObserver.PublishHandlers[context.Message] = _harness.MessagePublished; return Task.CompletedTask; } public void Probe(ProbeContext context) { } } private class ConsumeFilter<TMessage> : IFilter<ConsumeContext<TMessage>> where TMessage : class { private readonly IHarness _harness; public ConsumeFilter(IHarness harness) { _harness = harness; } public async Task Send(ConsumeContext<TMessage> context, IPipe<ConsumeContext<TMessage>> next) { try { await next.Send(context); } catch (Exception e) { _harness.Throw(e); return; } _harness.ConsumeMessage(context.MessageId!.Value); } public void Probe(ProbeContext context) { } } } public interface IHarness { Task Start(); Task Stop(); Task WaitForBus(); void ConsumeMessage(Guid message); void MessagePublished(Guid message); void Throw(Exception exception); IEnumerable<OutboxMessage> PopScheduledMessages(); } public class Harness : IHarness { private readonly IEnumerable<IHostedService> _services; private readonly ConcurrentDictionary<Guid, int> _publishedMessages = new (); private readonly ConcurrentBag<OutboxMessage> _scheduledMessage = new(); private readonly SemaphoreSlim _semaphore = new(0); public Harness(IEnumerable<IHostedService> services) { _services = services; } public async Task Start() { var source = new CancellationTokenSource(); foreach (var service in _services) await service.StartAsync(source.Token); } public async Task Stop() { var source = new CancellationTokenSource(); foreach (var service in _services) await service.StopAsync(source.Token); } public void MessagePublished(Guid message) => _publishedMessages[message] = 1; public void ConsumeMessage(Guid message) { while(!_publishedMessages.TryRemove(message, out _)) {} _semaphore.Release(); } private Exception? _exception; public void Throw(Exception exception) { _publishedMessages.Clear(); _exception = exception; _semaphore.Release(); } public IEnumerable<OutboxMessage> PopScheduledMessages() { var result = _scheduledMessage.ToList(); _scheduledMessage.Clear(); return result; } public async Task WaitForBus() { do { await _semaphore.WaitAsync(); } while (_publishedMessages.Count != 0); if (_exception != null) throw _exception; } }