At my latest customer project I choose to use Masstransit for events and Sagas. Its been a bumpy ride with outboxes and such, but now we have a pretty stable foundation to build upon. One problem have been testing. I like black box testing of our domain. Something like this.
[TestClass] public class When_doing_a_complete_booking : BusinessTest { private Booking _result; private DateTime _date; [TestInitialize] public void Context() { Guid bookingKey = Guid.Empty; _date = DateTime.Now.AddDays(5); _result = Given(db => /* Setup here */) .When(() => new SearchQuery{ Date = _date, ...}) .And(result => { bookingKey = result.First().BookingKey; return new ReserveCommand { BookingKey = bookingKey, ... }; }) .And(() => new ConfirmCommand { BookingKey = bookingKey, ... }) .Then(db => db.Set<booking>().FirstOrDefaultAsync(b => b.BookingKey == bookingKey)); } [TestMethod] public void It_should_book_correctly () { Assert.IsNotNull(_result); Assert.IsTrue(...); } }
Masstransit harness really doesn’t support black box type testing. Chris Patterson favors a more unit testing-oriented approach, were you fire events and your Tests assert that events were consumed. You can await consumption with the built in harness, but its timeout oriented which makes it slow and unstable.
I built my own harness on top of the vanilla one using consume and publish filters. You publish a event and call WaitForBus which will await all events spawned from sagas or consumers. When WaitForBus returns you can assert or publish more events. Setup my harness like.
new ServiceCollection() .AddMassTransitTestHarness(cfg => { cfg.UsingInMemory((ctx, mem) => { mem.ConfigureTestHarness(ctx); mem.ConfigureEndpoints(ctx); }); }) .AddTestHarness();
This will hook up required filters plus adding my custom IHarness that contains the WaitForBus method. After that you can publish a event on the bus and await using the method above. Full source code.
public static class HarnessSetup { public static void ConfigureTestHarness(this IInMemoryBusFactoryConfigurator bus, IBusRegistrationContext ctx) { bus.UseConsumeFilter(typeof(ConsumeFilter<>), ctx); bus.UsePublishFilter(typeof(PublishFilter<>), ctx); bus.ConnectPublishObserver(new PublishObserver()); } public static IServiceCollection AddTestHarness(this IServiceCollection services) { services.AddSingleton<IHarness, Harness>(); return services; } private class PublishObserver : IPublishObserver { public static readonly ConcurrentDictionary<object, Action<Guid>> PublishHandlers = new(); public Task PrePublish<T>(PublishContext<T> context) where T : class { if (!PublishHandlers.ContainsKey(context.Message)) { return Task.CompletedTask; } PublishHandlers[context.Message](context.MessageId!.Value); while (!PublishHandlers.Remove(context.Message, out _)) { } return Task.CompletedTask; } public Task PostPublish<T>(PublishContext<T> context) where T : class { return Task.CompletedTask; } public Task PublishFault<T>(PublishContext<T> context, Exception exception) where T : class { return Task.CompletedTask; } } private class PublishFilter<TMessage> : IFilter<PublishContext<TMessage>> where TMessage : class { private readonly IHarness _harness; public PublishFilter(IHarness harness) { _harness = harness; } public Task Send(PublishContext<TMessage> context, IPipe<PublishContext<TMessage>> next) { PublishObserver.PublishHandlers[context.Message] = _harness.MessagePublished; return Task.CompletedTask; } public void Probe(ProbeContext context) { } } private class ConsumeFilter<TMessage> : IFilter<ConsumeContext<TMessage>> where TMessage : class { private readonly IHarness _harness; public ConsumeFilter(IHarness harness) { _harness = harness; } public async Task Send(ConsumeContext<TMessage> context, IPipe<ConsumeContext<TMessage>> next) { try { await next.Send(context); } catch (Exception e) { _harness.Throw(e, context.MessageId!.Value); return; } _harness.ConsumeMessage(context.MessageId!.Value); _harness.SemaphoreSlim.Release(); } public void Probe(ProbeContext context) { } } } public interface IHarness { Task Start(); Task WaitForBus(); void ConsumeMessage(Guid message); void MessagePublished(Guid message); SemaphoreSlim SemaphoreSlim { get; } void Throw(Exception exception, Guid message); } public class Harness : IHarness { private readonly ITestHarness _harness; private readonly ConcurrentDictionary<Guid, int> _publishedMessages = new (); public Harness(ITestHarness testHarness) { _harness = testHarness; } public Task Start() => _harness.Start(); public void MessagePublished(Guid message) => _publishedMessages[message] = 1; public void ConsumeMessage(Guid message) { while(!_publishedMessages.TryRemove(message, out _)) {} } public SemaphoreSlim SemaphoreSlim { get; } = new(0); private Exception? _exception; public void Throw(Exception exception, Guid message) { _exception = exception; ConsumeMessage(message); } public async Task WaitForBus() { do { await SemaphoreSlim.WaitAsync(); } while (_publishedMessages.Count != 0); if (_exception != null) throw _exception; } }
One hack to note here is that the Publish filter doesn’t enqueue the message id that we want to wait for. This is because if you override the message id from the publish callback the publish filter will pickup the original message id not the overridden one. However IPublishObserver will pickup the correct overridden id. Problem with IPublishObserver is that it does not support IoC and will break your dependency chain. To overcome this I queue the original reference to the my message queue and use the message as indexer. When the PrePublish method hits it looks up the correct message queue and queues the message with the correct overridden message id. A bit hacky but it works. Should work for users that haven’t overridden message id also, but not tested.
Multiple subscribers to same event
As it turns out, the above code is not working when dealing with multiple subscribers to the same event type. There is no solid solution to this. Since there will only be one publish but several consumptions. In our case, only Saga events can be duplicated, so we solved it by checking which Sagas listen to which events.
private static IEnumerable<ServiceDescriptor>? _sagaFactories; private static Dictionary<string, int>? _expectedEvents; public static void ConfigureTestHarness(this IInMemoryBusFactoryConfigurator bus, IBusRegistrationContext ctx) { _expectedEvents ??= _sagaFactories!.SelectMany(sd => (IEnumerable<Event>)(sd.ImplementationFactory!(ctx) as dynamic).Events) .Select(e => e.GetType().GetGenericArguments().Single()) .GroupBy(e => e.FullName!) .ToDictionary(grp => grp.Key, grp => grp.Count()); bus.UseConsumeFilter(typeof(ConsumeFilter<>), ctx); bus.UsePublishFilter(typeof(PublishFilter<>), ctx); bus.ConnectPublishObserver(new PublishObserver()); } public static IServiceCollection AddTestHarness(this IServiceCollection services) { var lookup = typeof(SagaStateMachine<>); _sagaFactories ??= services.Where(sd => sd.ServiceType.IsGenericType && sd.ServiceType.GetGenericTypeDefinition() == lookup).ToList(); return services .AddSingleton<IHarness, Harness>() .AddLogging(cfg => cfg.AddProvider(new TestLogProvider())); }
Using reflection, I retrieve events for all Sagas and group them by type. This way, I have a lookup and know in advance how many consumers should be fired. I now need a new method on the Harness interface.
void AddExpectedMessageCount(Guid messageId, int count);
I call this method from the beginning of the ConsumeFilter.Send method.
public async Task Send(ConsumeContext<TMessage> context, IPipe<ConsumeContext<TMessage>> next) { var id = context.MessageId!.Value; _harness.AddExpectedMessageCount(id, _expectedEvents!.TryGetValue(context.Message.GetType().FullName!, out var count) ? count : 1); try { await next.Send(context); } catch (Exception e) { _harness.Throw(e, id); return; } _harness.ConsumeMessage(id); }
This ensures that the Harness knows how many consumptions of a message are expected. Finally, I update the ConsumeMessage method.
private readonly ConcurrentDictionary<Guid, ConcurrentQueue<Guid>> _expectedMessages = new (); public void AddExpectedMessageCount(Guid messageId, int count) { _expectedMessages.GetOrAdd(messageId, id => new ConcurrentQueue<Guid>(Enumerable.Range(0, count).Select(_ => id))); } public void ConsumeMessage(Guid message) { var received = _expectedMessages[message]; lock (received) { if (!received.TryDequeue(out _)) return; if (received.Count > 0) return; if (!_expectedMessages.TryRemove(message, out _)) return; if (!_publishedMessages.TryRemove(message, out _)) return; _semaphore.Release(); } }
We call _semaphore.Release() only if all messages have been consumed for a specific message ID. This releases the WaitForBus method, allowing the test to proceed.
Spent hours on trying to get this to work, only to figure out that I needed to start the test harness… Hope this hint helps someone else!
Glad you got it sorted! 🙂 Also make sure to check out my blog about replacing the built in harness hosting. https://andersmalmgren.com/2022/11/23/getting-rid-of-the-slow-masstransit-test-harness/
Stephan, I have refactored my harness since this blog was written. It now supports that multiple sagas subscribe to the same message type. Please have a look at the updated blog above.
Thanks!