Getting Started
Architecture
NServiceBus
Transports
Persistence
ServiceInsight
ServicePulse
ServiceControl
Monitoring
Samples

Databus with SystemJsonSerializer message serializer

Although System.Text.Json is one of the supported serializers from NServiceBus 8, System.Text.Json does not support ISerializable . Hence when trying to send large messages through "DataBusProperty<>" using a Databus and when using the SystemJsonSerializer as the message serializer in the endpoint configuration, an unhandled exception is thrown. This can be overcome by using a custom converter that basically does the serialization and deserialization when trying send large messages through "DataBusProperty" using a Databus.

This sample shows how to send large attachments with NServiceBus via Windows file share using custom converter for SystemJsonSerializer

Running the sample

  1. Run the solution. Two console applications start.
  2. Find the Sender application by looking for the one with "Sender" in its path
  3. Press D in the window to send a large message. A message has just been sent that is larger than the limit allowed by the learning transport. NServiceBus sends it as an attachment, allowing it to reach the Receiver application.

Code walk-through

This sample contains three projects:

  • Messages - A class library containing shared code including the message definition and the custom data bus serializer converter.
  • Sender - A console application responsible for sending the large messages.
  • Receiver - A console application responsible for receiving the large messages from Sender.

Messages project

The message in the Messages project utilizes the data bus mechanism:

//the data bus is allowed to clean up transmitted properties older than the TTBR
[TimeToBeReceived("00:01:00")]
public class MessageWithLargePayload :
    ICommand
{
    public string SomeProperty { get; set; }
    public DataBusProperty<byte[]> LargeBlob { get; set; }
}

DataBusProperty<byte[]> instructs NServiceBus to treat the LargeBlob property as an attachment. It is sent separately from other message properties.

When sending a message using the file share data bus, the DataBus properties get serialized to a file. Other properties are included in a message sent to the Receiving endpoint.

The TimeToBeReceived attribute indicates that the message can be deleted after one minute if not processed by the receiver. The message payload remains in the storage directory after the message is cleaned by the NServiceBus framework.

Following is an example of the message with DataBus property that is sent to the receiving endpoint:

{
    "SomeProperty":"This message contains a large blob that will be sent on the data bus",
    "LargeBlob":
    {
        "Key":"2014-09-29_09\\67de3a8e-0563-40d5-b81b-6f7b27d6431e",
        "HasValue":true
    }
}

Custom SystemJsonSerializer Converter

In this sample, both the Sender and Receiver endpoints use SystemJsonSerializer for the message serialization. Since System.Text.Json does not support ISerializable , a custom converter is required that basically does the serialization and deserialization when trying send large messages through "DataBusProperty" using a Databus

This sample follows a factory pattern that inherits from JsonConverterFactory to create the converter.

public class DatabusPropertyConverterFactory : JsonConverterFactory
{
    public override bool CanConvert(Type typeToConvert)
    {
        if (!typeToConvert.IsGenericType)
        {
            return false;
        }
        if (typeToConvert.GetGenericTypeDefinition() != typeof(DataBusProperty<>))
        {
            return false;
        }
        return true;
    }

    public override JsonConverter CreateConverter(Type typeToConvert, JsonSerializerOptions options)
    {
        Type keyType = typeToConvert.GetGenericArguments()[0];

        JsonConverter converter = (JsonConverter)Activator.CreateInstance(
            typeof(DatabusPropertyConverter<>).MakeGenericType(
                new Type[] { keyType }),
            BindingFlags.Instance | BindingFlags.Public,
            binder: null,
            args: new object[] { options },
            culture: null)!;

        return converter;
    }
}
public class DatabusPropertyConverter<T> : JsonConverter<DataBusProperty<T>> where T : class
{
    public DatabusPropertyConverter(JsonSerializerOptions options)
    {

    }
    public override DataBusProperty<T> Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options)
    {
        var sdp = JsonSerializer.Deserialize<SerializableDatabusProperty>(reader.GetString());
        DataBusProperty<T> databusProp = new DataBusProperty<T>();
        databusProp.HasValue = sdp.HasValue;
        databusProp.Key = sdp.Key;
        return databusProp;
    }

    public override void Write(Utf8JsonWriter writer, DataBusProperty<T> value, JsonSerializerOptions options)
    {
        SerializableDatabusProperty sdp = new SerializableDatabusProperty();
        sdp.HasValue = value.HasValue;
        sdp.Key = value.Key;
        string jsonStr = JsonSerializer.Serialize(sdp);
        writer.WriteStringValue(jsonStr);

    }

    private class SerializableDatabusProperty
    {
        public string Key { get; set; }
        public bool HasValue { get; set; }

    }

}

Configuring the databus location

Both the Sender and Receive project must share a common location to store large binary objects. This is done by calling FileShareDataBus. This code instructs NServiceBus to use the FileSharing transport mechanism for the attachment.

var dataBus = endpointConfiguration.UseDataBus<FileShareDataBus, SystemJsonDataBusSerializer>();
dataBus.BasePath(@"..\..\..\..\storage");

Custom serialization option

Both the Sender and Receiver endpoints use custom serialization options to modify how the serialization and deserialization is performed

var jsonSerializerOptions = new JsonSerializerOptions();
jsonSerializerOptions.Converters.Add(new DatabusPropertyConverterFactory());
endpointConfiguration.UseSerialization<SystemJsonSerializer>().Options(jsonSerializerOptions);

Sender project

The following Sender project code sends the MessageWithLargePayload message, using the NServiceBus attachment mechanism:

var message = new MessageWithLargePayload
{
    SomeProperty = "This message contains a large blob that will be sent on the data bus",
    LargeBlob = new DataBusProperty<byte[]>(new byte[1024*1024*5]) //5MB
};
await endpointInstance.Send("Samples.DataBus.Receiver", message);

Receiver project

This is the receiving message handler:

public class MessageWithLargePayloadHandler :
    IHandleMessages<MessageWithLargePayload>
{
    static ILog log = LogManager.GetLogger<MessageWithLargePayloadHandler>();

    public Task Handle(MessageWithLargePayload message, IMessageHandlerContext context)
    {
        log.Info($"Message received, size of blob property: {message.LargeBlob.Value.Length} Bytes");
        return Task.CompletedTask;
    }
}

Related Articles