Building a More Useful Outbox for Reliable Messaging

I blog not at all, or a lot at once. Nothing in between. In the past couple weeks I’ve written:

As a follow up to those posts, Jasper comes with a robust transactional inbox/outbox implementation that partially runs as a background service using .NET’s hosted service mechanism. Jasper’s version of this functionality differs quite a bit in both implementation from other toolsets in the .NET space. Here’s a rundown so far:

  • When used, Jasper is persisting the outgoing message, metadata, and the intended destination in the hosting service’s application database (or at least that’s the way it’s intended to work).
  • A background service is running in every node to ensure that these outgoing messages don’t get marooned or fail to be sent out
  • Pending messages do not get “lost” if the process dies or is shut down unexpectedly
  • Pending messages are resumed if a process is just restarted
  • If running in a cluster, the other nodes can detect that another node is no longer running, and take on the persisted, outbound messages from that original node
  • So far, the outbox has support for either Postgresql or Sql Server as the backing database.
  • Not surprisingly, there’s much deeper integration with Marten, but there’s also support for using the outbox with EF Core
  • Jasper’s outbox is actually usable completely outside of Jasper command handlers. You might have to write a little more explicit code to use it within ASP.Net Core controllers or Minimal API methods for example, but it’s perfectly usable in those scenarios. This is unique to Jasper and currently unsupported in the most popular service bus tools in .NET at the time I’m writing this post.
  • Jasper comes with some database schema management baked in for the necessary database objects needed to integrate the outbox to make things easier to get up and going.
  • There’s also some command line tooling baked into Jasper that will hopefully be helpful at development or deployment time to manage the persisted message storage

Now, on to some sample code!

To reuse a code sample from earlier this week, let’s say we’re building a new web service for potential customers to make online reservations for their favorite restaurants. We’re also going to be using Jasper as a command and/or message bus (and Marten as the backing persistence infrastructure), so we might have a command handler for confirming a reservation like this simple example:

public static class ConfirmReservationHandler
{
    [Transactional]
    public static async Task<ReservationConfirmed> Handle(ConfirmReservation command, IDocumentSession session)
    {
        var reservation = await session.LoadAsync<Reservation>(command.ReservationId);

        reservation!.IsConfirmed = true;

        session.Store(reservation);

        // Kicking out a "cascaded" message
        return new ReservationConfirmed(reservation.Id);
    }
}

As I described in an earlier post, that handler — in conjunction with some Jasper middleware brought in by the [Transactional] attribute — is making database changes and publishing a corresponding message about those very changes. Those two actions are an atomic unit of logical work. They need to either both succeed, or not happen at all. We also need the message to only go out after the database changes succeed. Otherwise, there’s a possible race condition between the database changes being committed and the ReservationConfirmed message being processed in another thread with incorrect or inconsistent system data. And take my word for it on that last point, because I’ve seen that happen and directly cause production bugs.

Let’s quickly rule out an old fashioned two phase commit because that brings a lot of headaches I could do without for the rest of my career. As I mentioned in some of my earlier posts, Jasper comes out of the box with a transactional outbox built in for specifically the scenario shown in the handler above. Jasper has an add on Nuget library called Jasper.Persistence.Marten that adds some helpful integration between Jasper’s built in inbox/outbox “durability agent” and Marten. The setup code for that integration is just this:

// Normal Marten integration
builder.Services.AddMarten(opts =>
{
    opts.Connection("Host=localhost;Port=5433;Database=postgres;Username=postgres;password=postgres");
})
    // NEW! Adding Jasper outbox integration to Marten in the "messages"
    // database schema
    .IntegrateWithJasper("messages");

and Jasper’s own integration setup:

builder.Host.UseJasper(opts =>
{
    // more Jasper configuration for error handling etc.
});

To add some context, here’s the actual code (with some reformatting and annotations from me) that Jasper builds up at runtime around our command handler:

public class ConfirmReservationHandler615381178 : MessageHandler
{
    private readonly OutboxedSessionFactory _outboxedSessionFactory;

    public ConfirmReservationHandler615381178(OutboxedSessionFactory outboxedSessionFactory)
    {
        _outboxedSessionFactory = outboxedSessionFactory;
    }

    public override async Task HandleAsync(IExecutionContext context, CancellationToken cancellation)
    {
        var confirmReservation = (ConfirmReservation)context.Envelope.Message;

        // This is creating a Marten session that's hooked into the Jasper outbox for
        // the incoming ConfirmReservation message being processed here
        await using var documentSession = _outboxedSessionFactory.OpenSession(context);

        var reservationConfirmed = await ConfirmReservationHandler.Handle(confirmReservation, documentSession)
            .ConfigureAwait(false);
        // Outgoing, cascaded message
        await context.EnqueueCascadingAsync(reservationConfirmed).ConfigureAwait(false);

        // Commit the unit of work *and* start sending the ReservationConfirmed message
        await documentSession.SaveChangesAsync(cancellation).ConfigureAwait(false);
    }
}

There’s a lot going on up above behind the scenes, but the key elements of the message handler workflow code with the outbox above are:

  • The outgoing message will not actually be sent to its destination (a Rabbit MQ exchange? a local queue? a Pulsar topic?) until the database changes succeed. That’s true for messages directly sent to Jasper’s IMessagePublisher or IExecutionContext services within a command handler method as well.
  • The outgoing messages and all necessary metadata (including the destination) are persisted to a database table inside the exact same database transaction as the changes to the persistent model in the handler above. In the case of the Marten integration, the messages are persisted in the same database command as the Reservation document changes as a way to improve performance by reducing network round trips to the database server.
  • After the transaction succeeds, the previously registered Jasper outbox is notified by a Marten session listener to start publishing the outgoing “cascaded” messages in Jasper’s background sending queues which span across command executions.
  • If the transaction fails, no messages will be sent out

Leave a comment