A better Masstransit Test harness

At my latest customer project I choose to use Masstransit for events and Sagas. Its been a bumpy ride with outboxes and such, but now we have a pretty stable foundation to build upon. One problem have been testing. I like black box testing of our domain. Something like this.


[TestClass]
public class When_doing_a_complete_booking : BusinessTest
{
    private Booking _result;
    private DateTime _date;
 
    [TestInitialize]
    public void Context()
    {
        Guid bookingKey = Guid.Empty;
        _date = DateTime.Now.AddDays(5);
 
        _result = Given(db => /* Setup here */)
            .When(() => new SearchQuery{ Date = _date, ...})
            .And(result =>
            {
                bookingKey = result.First().BookingKey;
                return new ReserveCommand { BookingKey = bookingKey, ... };
            })
            .And(() => new ConfirmCommand
            {
                BookingKey = bookingKey, 
                ...
            })
            .Then(db => db.Set<booking>().FirstOrDefaultAsync(b => b.BookingKey == bookingKey));
    }
 
    [TestMethod]
    public void It_should_book_correctly ()
    {
        Assert.IsNotNull(_result);
        Assert.IsTrue(...);
    }
}

Masstransit harness really doesn’t support black box type testing. Chris Patterson favors a more unit testing-oriented approach, were you fire events and your Tests assert that events were consumed. You can await consumption with the built in harness, but its timeout oriented which makes it slow and unstable.

I built my own harness on top of the vanilla one using consume and publish filters. You publish a event and call WaitForBus which will await all events spawned from sagas or consumers. When WaitForBus returns you can assert or publish more events. Setup my harness like.

new ServiceCollection()
.AddMassTransitTestHarness(cfg =>
    {        
        cfg.UsingInMemory((ctx, mem) =>
        {
            mem.ConfigureTestHarness(ctx);
            mem.ConfigureEndpoints(ctx);
        });
    })
    .AddTestHarness();

This will hook up required filters plus adding my custom IHarness that contains the WaitForBus method. After that you can publish a event on the bus and await using the method above. Full source code.

public static class HarnessSetup
{
    public static void ConfigureTestHarness(this IInMemoryBusFactoryConfigurator bus, IBusRegistrationContext ctx)
    {
         bus.UseConsumeFilter(typeof(ConsumeFilter<>), ctx);
         bus.UsePublishFilter(typeof(PublishFilter<>), ctx);
         bus.ConnectPublishObserver(new PublishObserver());
    }

    public static IServiceCollection AddTestHarness(this IServiceCollection services)
    {
        services.AddSingleton<IHarness, Harness>();
        return services;
    }

    private class PublishObserver : IPublishObserver
    {
        public static readonly ConcurrentDictionary<object, Action<Guid>> PublishHandlers = new();

        public Task PrePublish<T>(PublishContext<T> context) where T : class
        {
            if (!PublishHandlers.ContainsKey(context.Message))
            {
                return Task.CompletedTask;
            }

            PublishHandlers[context.Message](context.MessageId!.Value);
            while (!PublishHandlers.Remove(context.Message, out _)) { }

            return Task.CompletedTask;
        }

        public Task PostPublish<T>(PublishContext<T> context) where T : class
        {
            return Task.CompletedTask;
        }

        public Task PublishFault<T>(PublishContext<T> context, Exception exception) where T : class
        {
            return Task.CompletedTask;
        }
    }

    private class PublishFilter<TMessage> : IFilter<PublishContext<TMessage>> where TMessage : class
    {
        private readonly IHarness _harness;

        public PublishFilter(IHarness harness)
        {
            _harness = harness;
        }

        public Task Send(PublishContext<TMessage> context, IPipe<PublishContext<TMessage>> next)
        {
            PublishObserver.PublishHandlers[context.Message] = _harness.MessagePublished;
            return Task.CompletedTask;
        }

        public void Probe(ProbeContext context)
        {
        }
    }

    private class ConsumeFilter<TMessage> : IFilter<ConsumeContext<TMessage>> where TMessage : class
    {
        private readonly IHarness _harness;

        public ConsumeFilter(IHarness harness)
        {
            _harness = harness;
        }

        public async Task Send(ConsumeContext<TMessage> context, IPipe<ConsumeContext<TMessage>> next)
        {
            try
            {
                await next.Send(context);
            }
            catch (Exception e)
            {
                _harness.Throw(e);
                return;
            }

            _harness.ConsumeMessage(context.MessageId!.Value);
            _harness.SemaphoreSlim.Release();
        }

        public void Probe(ProbeContext context)
        {
        }
    }
}

public interface IHarness
{
    Task Start();
    Task WaitForBus();
    void ConsumeMessage(Guid message);
    void MessagePublished(Guid message);
    SemaphoreSlim SemaphoreSlim { get; }
    void Throw(Exception exception);
}

public class Harness : IHarness
{
    private readonly ITestHarness _harness;
    private readonly ConcurrentDictionary<Guid, int> _publishedMessages  = new ();

    public Harness(ITestHarness testHarness)
    {
        _harness = testHarness;
    }

    public Task Start() => _harness.Start();

    public void MessagePublished(Guid message) => _publishedMessages[message] = 1;

    public void ConsumeMessage(Guid message)
    {
        while(!_publishedMessages.TryRemove(message, out _)) {}
    }

    public SemaphoreSlim SemaphoreSlim { get; } = new(0);
    private Exception? _exception;

    public void Throw(Exception exception)
    {
        _publishedMessages.Clear();
        _exception = exception;
        SemaphoreSlim.Release();
    }

    public async Task WaitForBus()
    {
        do
        {
            await SemaphoreSlim.WaitAsync();
        } 
        while (_publishedMessages.Count != 0);

        if (_exception != null) throw _exception;
    }
}

One hack to note here is that the Publish filter doesn’t enqueue the message id that we want to wait for. This is because if you override the message id from the publish callback the publish filter will pickup the original message id not the overridden one. However IPublishObserver will pickup the correct overridden id. Problem with IPublishObserver is that it does not support IoC and will break your dependency chain. To overcome this I queue the original reference to the my message queue and use the message as indexer. When the PrePublish method hits it looks up the correct message queue and queues the message with the correct overridden message id. A bit hacky but it works. Should work for users that haven’t overridden message id also, but not tested.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s