Consuming messages from MassTransit

This sample shows how existing systems built with MassTransit can be integrated with new NServiceBus systems by using a message pipeline behavior in the NServiceBus endpoints to translate incoming MassTransit messages into a shape that can be successfully processed. This technique could also be used to migrate those systems endpoint-by-endpoint, if that is desired.

Prerequisites

This sample requires a local instance of RabbitMQ.

Message structure comparison

When using a message structured like this:

namespace Messages.Events
{
    public class MassTransitEvent
    {
        public string Text { get; set; }
    }
}

MassTransit translates the message in RabbitMQ as follows:

Properties

  • message_id: 5fdc0000-426e-001c-fcf9-08d9a30339e8
  • delivery_mode: 2
  • headers:
    • Content-Type: application/vnd.masstransit+json
    • publishId: 9
  • content_type: application/vnd.masstransit+json

Payload

{
  "messageId": "5fdc0000-426e-001c-fcf9-08d9a30339e8",
  "conversationId": "5fdc0000-426e-001c-fda0-08d9a30339e8",
  "sourceAddress": "rabbitmq://hostos/MACHINENAME_MTEndpoint_bus_m9qyyynnpayb3rk1bdc4gy3wyc?temporary=true",
  "destinationAddress": "rabbitmq://hostos/Messages.Events:MassTransitEvent",
  "messageType": [
    "urn:message:Messages.Events:MassTransitEvent"
  ],
  "message": {
    "text": "The time is 11/8/2021 4:00:50 PM -06:00"
  },
  "sentTime": "2021-11-08T22:00:50.1435641Z",
  "headers": {},
  "host": {
    "machineName": "MACHINENAME",
    "processName": "MTEndpoint",
    "processId": 13892,
    "assembly": "MTEndpoint",
    "assemblyVersion": "1.0.0.0",
    "frameworkVersion": "5.0.11",
    "massTransitVersion": "7.2.4.0",
    "operatingSystemVersion": "Microsoft Windows NT 10.0.19043.0"
  }
}

By contrast, NServiceBus structures a message on RabbitMQ differently, reserving the payload entirely for the message body, and putting the metadata into RabbitMQ properties. If NServiceBus had published the event, it would look like this:

Properties

  • $.diagnostics.originating.hostid: df9d8d0d7e92df4f82efac8bab50d815
  • NServiceBus.ContentType: application/json
  • NServiceBus.ConversationId: 5fdc0000-426e-001c-5de8-08d9a3033b36
  • NServiceBus.CorrelationId: 5fdc0000-426e-001c-5d52-08d9a3033b36
  • NServiceBus.EnclosedMessageTypes: Messages.Events.MassTransitEvent, Messages, Version=1.0.0.0, Culture=neutral, PublicKeyToken=null
  • NServiceBus.MessageId: bcf527ec-1fc6-4228-a753-adda016cd7fb
  • NServiceBus.MessageIntent: Publish
  • NServiceBus.OriginatingEndpoint: NServiceBusSubscriber
  • NServiceBus.OriginatingMachine: MACHINENAME
  • NServiceBus.RelatedTo: b26e5ffd-fa79-4eb1-b3b9-adda016cd7d9
  • NServiceBus.ReplyToAddress: NServiceBusSubscriber
  • NServiceBus.TimeSent: 2021-11-08 22:08:21:312401 Z
  • NServiceBus.Transport.RabbitMQ.ConfirmationId: 86
  • NServiceBus.Version: 7.5.0

Payload

{"Text":"The time is 11/8/2021 4:00:52 PM -06:00"}

In order for a MassTransit message to be understood, an NServiceBus pipeline behavior can be used to convert the format of the MassTransit RabbitMQ message to a shape that looks like an NServiceBus message.

MassTransit publisher

In the sample, the MassTransit publisher is largely unchanged from the example in the MassTransit Getting Started guide. Inside the Worker background service, a loop publishes an event every second until the process shuts down:

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
    while (!stoppingToken.IsCancellationRequested)
    {
        await bus.Publish(new MassTransitEvent { Text = $"The time is {DateTimeOffset.Now}" });
        await Task.Delay(1000, stoppingToken);
    }
}

Additionally, the MassTransit endpoint contains a message consumer for the event, subscribing to the event it is publishing:

public class MessageConsumer : IConsumer<MassTransitEvent>
{
    readonly ILogger<MessageConsumer> logger;

    public MessageConsumer(ILogger<MessageConsumer> logger)
    {
        this.logger = logger;
    }

    public Task Consume(ConsumeContext<MassTransitEvent> context)
    {
        logger.LogInformation("Received Text: {Text}", context.Message.Text);

        return Task.CompletedTask;
    }
}

Because the event is published in RabbitMQ, NServiceBus can also subscribe to it.

NServiceBus subscriber

Basic setup

The configuration for the RabbitMQ transport is standard:

var transport = endpointConfiguration.UseTransport<RabbitMQTransport>();
transport.ConnectionString("host=localhost;username=guest;password=guest");
transport.UseConventionalRoutingTopology();

Because MassTransit uses JSON to serialize its messages, NServiceBus needs to use the Json.NET serializer.

endpointConfiguration.UseSerialization<NewtonsoftSerializer>();

Although NServiceBus by default uses the marker interfaces ICommand and IEvent to identify messages, looking back at the event class, the IEvent interface is not used:

namespace Messages.Events
{
    public class MassTransitEvent
    {
        public string Text { get; set; }
    }
}

It is also not recommended to take a message assembly containing messages published by MassTransit and force it to take a dependency on NServiceBus. Instead, message conventions can be defined to identify messages by other means, such as by namespace:

endpointConfiguration.Conventions()
    .DefiningCommandsAs(type => type.Namespace?.EndsWith(".Commands") ?? false)
    .DefiningEventsAs(type => type.Namespace?.EndsWith(".Events") ?? false)
    .DefiningMessagesAs(type => type.Namespace?.EndsWith(".Messages") ?? false);

MassTransit ingest behavior

With NServiceBus set up to use RabbitMQ, serialize with JSON, and recognize MassTransit message classes as messages, a pipeline behavior manipulates the RabbitMQ message before it is deserialized, transforming it into the shape of an NServiceBus message.

This code snippet is lengthy, and is designed to translate many optional data elements into the NServiceBus message headers by manipulating JSON.NET primitives. Many solutions will not require this level of detail, because NServiceBus only requires the NServiceBus.MessageId and NServiceBus.EnclosedMessageTypes headers in order to successfully process a message.

public class MassTransitIngestBehavior : Behavior<IIncomingPhysicalMessageContext>
{
    public override Task Invoke(IIncomingPhysicalMessageContext context, Func<Task> next)
    {
        if (!context.MessageHeaders.TryGetValue("Content-Type", out var contentType) || contentType != "application/vnd.masstransit+json")
        {
            return next();
        }

        var envelope = DeserializeMassTransitPayload(context);

        AddHeaderIfExists(context, envelope, "messageId", NServiceBus.Headers.MessageId);
        AddHeaderIfExists(context, envelope, "conversationId", NServiceBus.Headers.ConversationId);
        AddHeaderIfExists(context, envelope, "correlationId", NServiceBus.Headers.CorrelationId);
        AddHeaderIfExists(context, envelope, "sourceAddress", "MassTransit.SourceAddress");
        AddHeaderIfExists(context, envelope, "destinationAddress", "MassTransit.DestinationAddress");
        AddHeaderIfExists(context, envelope, "sentTime", NServiceBus.Headers.TimeSent, sentTime =>
        {
            var parsed = DateTimeOffset.Parse(sentTime);
            return NServiceBus.DateTimeExtensions.ToWireFormattedString(parsed.DateTime);
        });

        if (envelope.ContainsKey("host"))
        {
            var host = envelope["host"] as JObject;
            AddHeaderIfExists(context, host, "machineName", NServiceBus.Headers.OriginatingMachine);
            AddHeaderIfExists(context, host, "processName", "MassTransit.Host.ProcessName");
            AddHeaderIfExists(context, host, "processId", "MassTransit.Host.ProcessId");
            AddHeaderIfExists(context, host, "assembly", "MassTransit.Host.Assembly");
            AddHeaderIfExists(context, host, "assemblyVersion", "MassTransit.Host.AssemblyVersion");
            AddHeaderIfExists(context, host, "frameworkVersion", "MassTransit.Host.FrameworkVersion");
            AddHeaderIfExists(context, host, "massTransitVersion", "MassTransit.Host.MassTransitVersion");
            AddHeaderIfExists(context, host, "operatingSystem", "MassTransit.Host.OperatingSystem");
        }

        if (envelope.ContainsKey("headers"))
        {
            var mtHeaders = envelope["headers"] as JObject;
            foreach (var header in mtHeaders.Children<JProperty>())
            {
                context.Message.Headers[header.Name] = header.Value.Value<string>();
            }
        }

        if (envelope.ContainsKey("messageType"))
        {
            var types = (envelope["messageType"] as JArray)
                .Select(token => GetTypeName(token))
                .ToArray();

            context.Message.Headers[NServiceBus.Headers.EnclosedMessageTypes] = string.Join(";", types);
        }

        if (envelope.ContainsKey("message"))
        {
            var body = envelope["message"] as JObject;
            UpdateMessagePayload(context, body);
        }

        return next();
    }

    static JObject DeserializeMassTransitPayload(IIncomingPhysicalMessageContext context)
    {
        using (var memoryStream = new MemoryStream(context.Message.Body))
        using (var streamReader = new StreamReader(memoryStream))
        using (var jsonReader = new JsonTextReader(streamReader))
        {
            var envelope = serializer.Deserialize(jsonReader, typeof(JObject)) as JObject;
            return envelope;
        }
    }

    static void AddHeaderIfExists(IIncomingPhysicalMessageContext context, JObject massTransitContainer, string incomingHeaderName, string nservicebusHeaderName, Func<string, string> transform = null)
    {
        if (massTransitContainer.ContainsKey(incomingHeaderName))
        {
            var value = massTransitContainer[incomingHeaderName].Value<string>();
            if (transform != null)
            {
                value = transform(value);
            }
            context.Message.Headers[nservicebusHeaderName] = value;
        }
    }

    static void UpdateMessagePayload(IIncomingPhysicalMessageContext context, JObject body)
    {
        using (var memoryStream = new MemoryStream())
        using (var streamReader = new StreamWriter(memoryStream))
        using (var jsonWriter = new JsonTextWriter(streamReader))
        {
            serializer.Serialize(jsonWriter, body);
            jsonWriter.Flush();
            context.UpdateMessage(memoryStream.ToArray());
        }
    }

    static string GetTypeName(JToken typeToken)
    {
        var typeString = typeToken.Value<string>();
        if (typeString.StartsWith(urnTypePrefix))
        {
            typeString = typeString.Substring(urnTypePrefix.Length);
        }
        typeString = typeString.Replace(":", ".");
        return typeString;
    }

    static readonly JsonSerializer serializer = new JsonSerializer();
    const string urnTypePrefix = "urn:message:";
}

Registering the behavior to be run as part of the message processing pipeline requires one additional line of configuration:

endpointConfiguration.Pipeline.Register(typeof(MassTransitIngestBehavior), "Ingests MassTransit messages.");

For completeness, here is the NServiceBus message handler, which is very simple and does not need to know that the message it is processing originally came from MassTransit.

public class MassTransitEventHandler : IHandleMessages<MassTransitEvent>
{
    public Task Handle(MassTransitEvent message, IMessageHandlerContext context)
    {
        logger.Info($"Received Text: {message.Text}");
        return Task.CompletedTask;
    }

    static ILog logger = LogManager.GetLogger<MassTransitEventHandler>();
}

Output

When running the sample, both endpoints receive and process each event published by MassTransit.

MassTransit Endpoint

 info: MassTransit[0]
       Configured endpoint Message, Consumer: MTEndpoint.MessageConsumer
 info: Microsoft.Hosting.Lifetime[0]
       Application started. Press Ctrl+C to shut down.
 info: Microsoft.Hosting.Lifetime[0]
       Hosting environment: Development
 info: Microsoft.Hosting.Lifetime[0]
       Content root path: C:\code\masstransit-messages\MTEndpoint
 info: MassTransit[0]
       Bus started: rabbitmq://hostos/
 info: MTEndpoint.MessageConsumer[0]
       Received Text: The time is 11/8/2021 4:39:33 PM -06:00
 info: MTEndpoint.MessageConsumer[0]
       Received Text: The time is 11/8/2021 4:39:34 PM -06:00
 info: MTEndpoint.MessageConsumer[0]
       Received Text: The time is 11/8/2021 4:39:35 PM -06:00

NServiceBus Subscriber

 info: Microsoft.Hosting.Lifetime[0]
       Application started. Press Ctrl+C to shut down.
 info: Microsoft.Hosting.Lifetime[0]
       Hosting environment: Development
 info: Microsoft.Hosting.Lifetime[0]
       Content root path: C:\code\masstransit-messages\NServiceBusSubscriber
 info: NServiceBusSubscriber.MassTransitEventHandler[0]
       Received Text: The time is 11/8/2021 4:39:33 PM -06:00
 info: NServiceBusSubscriber.MassTransitEventHandler[0]
       Received Text: The time is 11/8/2021 4:39:34 PM -06:00
 info: NServiceBusSubscriber.MassTransitEventHandler[0]
       Received Text: The time is 11/8/2021 4:39:35 PM -06:00

Related Articles


Last modified