Handling Stream Properties Via the Pipeline

Component: NServiceBus
NuGet Package NServiceBus (6.x)

This sample leverages the pipeline to provide a pure stream-based approach for sending large amounts of data. It is similar to the file share DataBus in that it assumes a common network file share accessible by endpoints and uses headers to correlate between a message and its connected files on disk.

The main difference is that with streams, the data doesn't need to be loaded into memory all at once which results in a more efficient and scalable solution.

Stream storage helper

This provides an extension method to simplify passing in settings to the stream storage.

public static void SetStreamStorageLocation(this EndpointConfiguration endpointConfiguration, string location)
{
    var settings = new StreamStorageSettings
    {
        Location = location,
    };
    endpointConfiguration.RegisterComponents(x => x.RegisterSingleton(settings));
}

The helper method can then be called at configuration time.

endpointConfiguration.SetStreamStorageLocation(@"..\..\..\storage");

Write stream properties to disk

This happens as part of the outgoing pipeline, see StreamSendBehavior.cs.

public class StreamSendBehavior :
    Behavior<IOutgoingLogicalMessageContext>
{
    TimeSpan maxMessageTimeToLive = TimeSpan.FromDays(14);
    string location;

    public StreamSendBehavior(StreamStorageSettings storageSettings)
    {
        location = Path.GetFullPath(storageSettings.Location);
    }
    public override async Task Invoke(IOutgoingLogicalMessageContext context, Func<Task> next)
    {

Each stream copied to disk will need a unique key.

string GenerateKey(TimeSpan timeToBeReceived)
{
    if (timeToBeReceived > maxMessageTimeToLive)
    {
        timeToBeReceived = maxMessageTimeToLive;
    }

    var keepMessageUntil = DateTime.MaxValue;

    if (timeToBeReceived < TimeSpan.MaxValue)
    {
        keepMessageUntil = DateTime.Now + timeToBeReceived;
    }

    return Path.Combine(keepMessageUntil.ToString("yyyy-MM-dd_HH"), Guid.NewGuid().ToString());
}

Copy each stream property to disk

var timeToBeReceived = TimeSpan.MaxValue;

var extensions = context.Extensions;
if (extensions.TryGetDeliveryConstraint(out DiscardIfNotReceivedBefore constraint))
{
    timeToBeReceived = constraint.MaxTime;
}

var message = context.Message.Instance;

foreach (var property in StreamStorageHelper.GetStreamProperties(message))
{
    var sourceStream = (Stream)property.GetValue(message, null);

    // Ignore null stream properties
    if (sourceStream == null)
    {
        continue;
    }
    var fileKey = GenerateKey(timeToBeReceived);

    var filePath = Path.Combine(location, fileKey);
    Directory.CreateDirectory(Path.GetDirectoryName(filePath));

    using (var target = File.OpenWrite(filePath))
    {
        await sourceStream.CopyToAsync(target)
            .ConfigureAwait(false);
    }

    // Reset the property to null so no other serializer attempts to use the property
    property.SetValue(message, null);

    // Dispose of the stream
    sourceStream.Dispose();

    // Store the header so on the receiving endpoint the file name is known
    var headerKey = StreamStorageHelper.GetHeaderKey(message, property);
    context.Headers[$"NServiceBus.PropertyStream.{headerKey}"] = fileKey;
}

await next()
    .ConfigureAwait(false);

The file streams will appear on disk at the root of the solution in a folder called storage. Here is a sample file structure:

> storage
  > 2015-03-06_15
     > 75ab3b84-8b37-4da7-bf07-b173f2f5570d
     > 92f749bc-27a8-4ba4-bc7a-b502dffe9cd9
     > 593a4670-1c09-4bbe-80ef-c22fb5356704

Each GUID is a file containing the contents of the emptied stream.

Reading back from the stream

This happens in as part of the incoming pipeline, see StreamReceiveBehavior.cs

public class StreamReceiveBehavior :
    Behavior<IInvokeHandlerContext>
{
    string location;

    public StreamReceiveBehavior(StreamStorageSettings storageSettings)
    {
        location = Path.GetFullPath(storageSettings.Location);
    }

    public override async Task Invoke(IInvokeHandlerContext context, Func<Task> next)
    {

Copy the contents of the files on disk back into the message properties.

var message = context.MessageBeingHandled;
var streamsToCleanUp = new List<FileStream>();
foreach (var property in StreamStorageHelper
    .GetStreamProperties(message))
{
    var headerKey = StreamStorageHelper.GetHeaderKey(message, property);
    // only attempt to process properties that have an associated header
    var key = $"NServiceBus.PropertyStream.{headerKey}";
    if (!context.Headers.TryGetValue(key, out var dataBusKey))
    {
        continue;
    }

    var filePath = Path.Combine(location, dataBusKey);

    // If the file doesn't exist then something has gone wrong with the file share.
    // Perhaps the file has been manually deleted.
    // For safety send the message to the error queue
    if (!File.Exists(filePath))
    {
        var format = $"Expected a file to exist in '{filePath}'. It is possible the file has been prematurely cleaned up.";
        throw new Exception(format);
    }
    var fileStream = File.OpenRead(filePath);
    property.SetValue(message, fileStream);
    streamsToCleanUp.Add(fileStream);
}

Clean up the opened streams after message processing.

await next()
    .ConfigureAwait(false);
// Clean up all the temporary streams after handler processing
// via the "next()" delegate has occurred
foreach (var fileStream in streamsToCleanUp)
{
    fileStream.Dispose();
}

Configuring the pipeline behaviors

endpointConfiguration.Pipeline.Register<StreamSendBehavior.Registration>();
endpointConfiguration.Pipeline.Register(typeof(StreamReceiveBehavior), "Copies the shared data back to the logical messages");

The message to send

[TimeToBeReceived("00:01:00")]
public class MessageWithStream :
    ICommand
{
    public string SomeProperty { get; set; }
    public Stream StreamProperty { get; set; }
}

Sending with an HTTP stream

using (var webClient = new WebClient())
{
    var message = new MessageWithStream
    {
        SomeProperty = "This message contains a stream",
        StreamProperty = webClient.OpenRead("http://www.particular.net")
    };
    await endpointInstance.Send("Samples.PipelineStream.Receiver", message)
        .ConfigureAwait(false);
}

Sending with a file stream

var message = new MessageWithStream
{
    SomeProperty = "This message contains a stream",
    StreamProperty = File.OpenRead("FileToSend.txt")
};
await endpointInstance.Send("Samples.PipelineStream.Receiver", message)
    .ConfigureAwait(false);
When using a MemoryStream ensure that the Position is set back to 0 before sending the message. Also note that writing large amounts of data to a MemoryStream will result in significant memory usage (perhaps resulting in an OutOfMemoryException) and put pressure on the garbage collector.

Handler

public class MessageWithStreamHandler1 :
    IHandleMessages<MessageWithStream>
{
    static ILog log = LogManager.GetLogger<MessageWithStreamHandler1>();

    public async Task Handle(MessageWithStream message, IMessageHandlerContext context)
    {
        var stream = message.StreamProperty;
        log.Info($"Message received, size of stream property: {stream.Length} Bytes");
        using (var streamReader =  new StreamReader(stream))
        {
            var streamContents = await streamReader.ReadToEndAsync()
                .ConfigureAwait(false);
            log.Info($"Stream content: {streamContents.Substring(0, 20)}...");
        }
    }

}

Difference to the databus

The built-in database relies on byte arrays and memory streams to operate. As such, there are limits to the amount of data that it can send.

Related Articles


Last modified