Handling large Stream properties via the pipeline

Component: NServiceBus
NuGet NServiceBus (6.x)

Introduction

This sample leverages the pipeline to provide a pure stream based approach for sending large amounts of data. It is similar to the file share DataBus in that it assumes a common network file share accessible by endpoints and uses headers to correlate between a message and its connected files on disk.

Stream Storage helper

This provides an extension method to Configure to simplify passing in settings to the stream storage.

public static void SetStreamStorageLocation(this EndpointConfiguration endpointConfiguration, string location)
{
    var settings = new StreamStorageSettings
    {
        Location = location,
    };
    endpointConfiguration.RegisterComponents(x => x.RegisterSingleton(settings));
}

The helper method can then be called at configuration time.

endpointConfiguration.SetStreamStorageLocation("..\\..\\..\\storage");

Write Stream properties to disk

This happens in as part of the outgoing pipeline, see StreamSendBehavior.cs.

class StreamSendBehavior :
    Behavior<IOutgoingLogicalMessageContext>
{
    TimeSpan maxMessageTimeToLive = TimeSpan.FromDays(14);
    string location;

    public StreamSendBehavior(StreamStorageSettings storageSettings)
    {
        location = Path.GetFullPath(storageSettings.Location);
    }
    public override async Task Invoke(IOutgoingLogicalMessageContext context, Func<Task> next)
    {

Each stream copied to disk will need a unique key.

string GenerateKey(TimeSpan timeToBeReceived)
{
    if (timeToBeReceived > maxMessageTimeToLive)
    {
        timeToBeReceived = maxMessageTimeToLive;
    }

    var keepMessageUntil = DateTime.MaxValue;

    if (timeToBeReceived < TimeSpan.MaxValue)
    {
        keepMessageUntil = DateTime.Now + timeToBeReceived;
    }

    return Path.Combine(keepMessageUntil.ToString("yyyy-MM-dd_HH"), Guid.NewGuid().ToString());
}

Copy each stream property to disk

var timeToBeReceived = TimeSpan.MaxValue;
DiscardIfNotReceivedBefore constraint;

if (context.Extensions.TryGetDeliveryConstraint(out constraint))
{
    timeToBeReceived = constraint.MaxTime;
}

var message = context.Message.Instance;

foreach (var property in StreamStorageHelper.GetStreamProperties(message))
{
    var sourceStream = (Stream)property.GetValue(message, null);

    // Ignore null stream properties
    if (sourceStream == null)
    {
        continue;
    }
    var fileKey = GenerateKey(timeToBeReceived);

    var filePath = Path.Combine(location, fileKey);
    Directory.CreateDirectory(Path.GetDirectoryName(filePath));

    using (var target = File.OpenWrite(filePath))
    {
        await sourceStream.CopyToAsync(target)
            .ConfigureAwait(false);
    }

    // Reset the property to null so no other serializer attempts to use the property
    property.SetValue(message, null);

    // Dispose of the stream
    sourceStream.Dispose();

    // Store the header so on the receiving endpoint the file name is known
    var headerKey = StreamStorageHelper.GetHeaderKey(message, property);
    context.Headers[$"NServiceBus.PropertyStream.{headerKey}"] = fileKey;
}

await next()
    .ConfigureAwait(false);

On disk (at the root of the solution for this sample) it will look like this

> Storage
  > 2015-03-06_15
     > 75ab3b84-8b37-4da7-bf07-b173f2f5570d
     > 92f749bc-27a8-4ba4-bc7a-b502dffe9cd9
     > 593a4670-1c09-4bbe-80ef-c22fb5356704

Where each GUID is a file containing the contents of the emptied stream.

Reading back from the stream

This happens in as part of the incoming pipeline, see StreamReceiveBehavior.cs

class StreamReceiveBehavior :
    Behavior<IInvokeHandlerContext>
{
    string location;

    public StreamReceiveBehavior(StreamStorageSettings storageSettings)
    {
        location = Path.GetFullPath(storageSettings.Location);
    }

    public override async Task Invoke(IInvokeHandlerContext context, Func<Task> next)
    {

Copy the contents of the files on disk back into the message properties.

var message = context.MessageBeingHandled;
var streamsToCleanUp = new List<FileStream>();
foreach (var property in StreamStorageHelper
    .GetStreamProperties(message))
{
    var headerKey = StreamStorageHelper.GetHeaderKey(message, property);
    string dataBusKey;
    // only attempt to process properties that have an associated header
    var key = $"NServiceBus.PropertyStream.{headerKey}";
    if (!context.Headers.TryGetValue(key, out dataBusKey))
    {
        continue;
    }

    var filePath = Path.Combine(location, dataBusKey);

    // If the file doesn't exist then something has gone wrong with the file share. 
    // Perhaps the file has been manually deleted.
    // For safety send the message to the error queue
    if (!File.Exists(filePath))
    {
        var format = $"Expected a file to exist in '{filePath}'. It is possible the file has been prematurely cleaned up.";
        throw new Exception(format);
    }
    var fileStream = File.OpenRead(filePath);
    property.SetValue(message, fileStream);
    streamsToCleanUp.Add(fileStream);
}

Cleanup the opened streams after message processing.

await next()
    .ConfigureAwait(false);
// Clean up all the temporary streams after handler processing
// via the "next()" delegate has occurred
foreach (var fileStream in streamsToCleanUp)
{
    fileStream.Dispose();
}

Configuring the pipeline behaviors

public class StreamFeature :
    Feature
{
    internal StreamFeature()
    {
        EnableByDefault();
    }

    protected override void Setup(FeatureConfigurationContext context)
    {
        var pipeline = context.Pipeline;
        pipeline.Register<StreamReceiveRegistration>();
        pipeline.Register<StreamSendRegistration>();
    }
}

public class StreamReceiveRegistration :
    RegisterStep
{
    public StreamReceiveRegistration()
        : base(
            stepId: "StreamReceive",
            behavior: typeof(StreamReceiveBehavior),
            description: "Copies the shared data back to the logical messages")
    {
    }
}

public class StreamSendRegistration :
    RegisterStep
{
    public StreamSendRegistration()
        : base(
            stepId: "StreamSend",
            behavior: typeof(StreamSendBehavior),
            description: "Saves the payload into the shared location")
    {
        InsertAfter("MutateOutgoingMessages");
        InsertBefore("ApplyTimeToBeReceived");
    }
}

The message to send

[TimeToBeReceived("00:01:00")]
public class MessageWithStream :
    ICommand
{
    public string SomeProperty { get; set; }
    public Stream StreamProperty { get; set; }
}

Sending with a http stream

using (var webClient = new WebClient())
{
    var message = new MessageWithStream
    {
        SomeProperty = "This message contains a stream",
        StreamProperty = webClient.OpenRead("http://www.particular.net")
    };
    await endpointInstance.Send("Samples.PipelineStream.Receiver", message)
        .ConfigureAwait(false);
}

Sending with a file stream

var message = new MessageWithStream
{
    SomeProperty = "This message contains a stream",
    StreamProperty = File.OpenRead("FileToSend.txt")
};
await endpointInstance.Send("Samples.PipelineStream.Receiver", message)
    .ConfigureAwait(false);
If using a MemoryStream ensure that the Position is set back to 0 before sending the message. Also note that writing large amounts of data to a MemoryStream will result in significant memory usage (perhaps resulting in an OutOfMemoryException) and put pressure on Garbage Collection.

Handler

public class MessageWithStreamHandler1 :
    IHandleMessages<MessageWithStream>
{
    static ILog log = LogManager.GetLogger<MessageWithStreamHandler1>();

    public async Task Handle(MessageWithStream message, IMessageHandlerContext context)
    {
        var stream = message.StreamProperty;
        log.Info($"Message received, size of stream property: {stream.Length} Bytes");
        using (var streamReader =  new StreamReader(stream))
        {
            var streamContents = await streamReader.ReadToEndAsync()
                .ConfigureAwait(false);
            log.Info($"Stream content: {streamContents.Substring(0, 20)}...");
        }
    }

}

Difference to the Databus

The built in DataBus relies on byte arrays and memory streams to operate. As such it has limitations in the amount of data it can send.

Related Articles


Last modified