Handling large Stream properties via the pipeline

Component: NServiceBus | Nuget: NServiceBus (Version: 6.x)

Introduction

This sample leverages the pipeline to provide a pure stream based approach for sending large amounts of data. It is similar to the file share DataBus in that it assumes a common network file share accessible by endpoints and uses headers to correlate between a message and its connected files on disk.

Stream Storage helper

This provides an extension method to Configure to simplify passing in settings to the stream storage.

Edit
public static void SetStreamStorageLocation(this EndpointConfiguration endpointConfiguration, string location)
{
    var settings = new StreamStorageSettings
    {
        Location = location,
    };
    endpointConfiguration.RegisterComponents(x => x.RegisterSingleton(settings));
}

The helper method can then be called at configuration time.

Edit
endpointConfiguration.SetStreamStorageLocation("..\\..\\..\\storage");

Write Stream properties to disk

This happens in as part of the outgoing pipeline, see StreamSendBehavior.cs.

Edit
class StreamSendBehavior :
    Behavior<IOutgoingLogicalMessageContext>
{
    TimeSpan maxMessageTimeToLive = TimeSpan.FromDays(14);
    string location;

    public StreamSendBehavior(StreamStorageSettings storageSettings)
    {
        location = Path.GetFullPath(storageSettings.Location);
    }
    public override async Task Invoke(IOutgoingLogicalMessageContext context, Func<Task> next)
    {

Each stream copied to disk will need a unique key.

Edit
string GenerateKey(TimeSpan timeToBeReceived)
{
    if (timeToBeReceived > maxMessageTimeToLive)
    {
        timeToBeReceived = maxMessageTimeToLive;
    }

    var keepMessageUntil = DateTime.MaxValue;

    if (timeToBeReceived < TimeSpan.MaxValue)
    {
        keepMessageUntil = DateTime.Now + timeToBeReceived;
    }

    return Path.Combine(keepMessageUntil.ToString("yyyy-MM-dd_HH"), Guid.NewGuid().ToString());
}

Copy each stream property to disk

Edit
var timeToBeReceived = TimeSpan.MaxValue;
DiscardIfNotReceivedBefore constraint;

if (context.Extensions.TryGetDeliveryConstraint(out constraint))
{
    timeToBeReceived = constraint.MaxTime;
}

var message = context.Message.Instance;

foreach (var property in StreamStorageHelper.GetStreamProperties(message))
{
    var sourceStream = (Stream)property.GetValue(message, null);

    // Ignore null stream properties
    if (sourceStream == null)
    {
        continue;
    }
    var fileKey = GenerateKey(timeToBeReceived);

    var filePath = Path.Combine(location, fileKey);
    Directory.CreateDirectory(Path.GetDirectoryName(filePath));

    using (var target = File.OpenWrite(filePath))
    {
        await sourceStream.CopyToAsync(target)
            .ConfigureAwait(false);
    }

    // Reset the property to null so no other serializer attempts to use the property
    property.SetValue(message, null);

    // Dispose of the stream
    sourceStream.Dispose();

    // Store the header so on the receiving endpoint the file name is known
    var headerKey = StreamStorageHelper.GetHeaderKey(message, property);
    context.Headers[$"NServiceBus.PropertyStream.{headerKey}"] = fileKey;
}

await next()
    .ConfigureAwait(false);

On disk (at the root of the solution for this sample) it will look like this

> Storage
  > 2015-03-06_15
     > 75ab3b84-8b37-4da7-bf07-b173f2f5570d
     > 92f749bc-27a8-4ba4-bc7a-b502dffe9cd9
     > 593a4670-1c09-4bbe-80ef-c22fb5356704

Where each GUID is a file containing the contents of the emptied stream.

Reading back from the stream

This happens in as part of the incoming pipeline, see StreamReceiveBehavior.cs

Edit
class StreamReceiveBehavior :
    Behavior<IInvokeHandlerContext>
{
    string location;

    public StreamReceiveBehavior(StreamStorageSettings storageSettings)
    {
        location = Path.GetFullPath(storageSettings.Location);
    }

    public override async Task Invoke(IInvokeHandlerContext context, Func<Task> next)
    {

Copy the contents of the files on disk back into the message properties.

Edit
var message = context.MessageBeingHandled;
var streamsToCleanUp = new List<FileStream>();
foreach (var property in StreamStorageHelper
    .GetStreamProperties(message))
{
    var headerKey = StreamStorageHelper.GetHeaderKey(message, property);
    string dataBusKey;
    // only attempt to process properties that have an associated header
    var key = $"NServiceBus.PropertyStream.{headerKey}";
    if (!context.Headers.TryGetValue(key, out dataBusKey))
    {
        continue;
    }

    var filePath = Path.Combine(location, dataBusKey);

    // If the file doesn't exist then something has gone wrong with the file share. 
    // Perhaps the file has been manually deleted.
    // For safety send the message to the error queue
    if (!File.Exists(filePath))
    {
        var format = $"Expected a file to exist in '{filePath}'. It is possible the file has been prematurely cleaned up.";
        throw new Exception(format);
    }
    var fileStream = File.OpenRead(filePath);
    property.SetValue(message, fileStream);
    streamsToCleanUp.Add(fileStream);
}

Cleanup the opened streams after message processing.

Edit
await next()
    .ConfigureAwait(false);
// Clean up all the temporary streams after handler processing
// via the "next()" delegate has occurred
foreach (var fileStream in streamsToCleanUp)
{
    fileStream.Dispose();
}

Configuring the pipeline behaviors

Edit
public class StreamFeature :
    Feature
{
    internal StreamFeature()
    {
        EnableByDefault();
    }

    protected override void Setup(FeatureConfigurationContext context)
    {
        var pipeline = context.Pipeline;
        pipeline.Register<StreamReceiveRegistration>();
        pipeline.Register<StreamSendRegistration>();
    }
}

public class StreamReceiveRegistration :
    RegisterStep
{
    public StreamReceiveRegistration()
        : base(
            stepId: "StreamReceive",
            behavior: typeof(StreamReceiveBehavior),
            description: "Copies the shared data back to the logical messages")
    {
    }
}

public class StreamSendRegistration :
    RegisterStep
{
    public StreamSendRegistration()
        : base(
            stepId: "StreamSend",
            behavior: typeof(StreamSendBehavior),
            description: "Saves the payload into the shared location")
    {
        InsertAfter("MutateOutgoingMessages");
        InsertBefore("ApplyTimeToBeReceived");
    }
}

The message to send

Edit
[TimeToBeReceived("00:01:00")]
public class MessageWithStream :
    ICommand
{
    public string SomeProperty { get; set; }
    public Stream StreamProperty { get; set; }
}

Sending with a http stream

Edit
using (var webClient = new WebClient())
{
    var message = new MessageWithStream
    {
        SomeProperty = "This message contains a stream",
        StreamProperty = webClient.OpenRead("http://www.particular.net")
    };
    await endpointInstance.Send("Samples.PipelineStream.Receiver", message)
        .ConfigureAwait(false);
}

Sending with a file stream

Edit
var message = new MessageWithStream
{
    SomeProperty = "This message contains a stream",
    StreamProperty = File.OpenRead("FileToSend.txt")
};
await endpointInstance.Send("Samples.PipelineStream.Receiver", message)
    .ConfigureAwait(false);
If using a MemoryStream ensure that the Position is set back to 0 before sending the message. Also note that writing large amounts of data to a MemoryStream will result in significant memory usage (perhaps resulting in an OutOfMemoryException) and put pressure on Garbage Collection.

Handler

Edit
public class MessageWithStreamHandler1 :
    IHandleMessages<MessageWithStream>
{
    static ILog log = LogManager.GetLogger<MessageWithStreamHandler1>();

    public async Task Handle(MessageWithStream message, IMessageHandlerContext context)
    {
        var stream = message.StreamProperty;
        log.Info($"Message received, size of stream property: {stream.Length} Bytes");
        using (var streamReader =  new StreamReader(stream))
        {
            var streamContents = await streamReader.ReadToEndAsync()
                .ConfigureAwait(false);
            log.Info($"Stream content: {streamContents.Substring(0, 20)}...");
        }
    }

}

Difference to the Databus

The built in DataBus relies on byte arrays and memory streams to operate. As such it has limitations in the amount of data it can send.

Related Articles


Last modified