This sample leverages the message handling pipeline to provide a pure stream-based approach for sending large amounts of data. It is similar to the file share data bus in that it assumes a common network file share accessible by endpoints and uses headers to correlate between a message and its connected files on disk.
The main difference is that with streams, the data doesn't need to be loaded into memory all at once which results in a more efficient and scalable solution.
Stream storage helper
This provides an extension method to simplify passing in settings to the stream storage.
public static void SetStreamStorageLocation(this EndpointConfiguration endpointConfiguration, string location)
{
var settings = new StreamStorageSettings
{
Location = location,
};
endpointConfiguration.RegisterComponents(x => x.AddSingleton(settings));
}
The helper method can then be called at configuration time.
endpointConfiguration.SetStreamStorageLocation(@"..\..\..\..\storage");
Write stream properties to disk
This happens as part of the outgoing pipeline, see StreamSendBehavior.
.
public class StreamSendBehavior :
Behavior<IOutgoingLogicalMessageContext>
{
TimeSpan maxMessageTimeToLive = TimeSpan.FromDays(14);
string location;
public StreamSendBehavior(StreamStorageSettings storageSettings)
{
location = Path.GetFullPath(storageSettings.Location);
}
[System.Diagnostics.CodeAnalysis.SuppressMessage("NServiceBus.Code", "NSB0002:Forward the 'CancellationToken' property of the context parameter to methods", Justification = ".NET 4.8 does not support the overload")]
public override async Task Invoke(IOutgoingLogicalMessageContext context, Func<Task> next)
{
Each stream copied to disk will need a unique key.
string GenerateKey(TimeSpan timeToBeReceived)
{
if (timeToBeReceived > maxMessageTimeToLive)
{
timeToBeReceived = maxMessageTimeToLive;
}
var keepMessageUntil = DateTime.MaxValue;
if (timeToBeReceived < TimeSpan.MaxValue)
{
keepMessageUntil = DateTime.Now + timeToBeReceived;
}
return Path.Combine(keepMessageUntil.ToString("yyyy-MM-dd_HH"), Guid.NewGuid().ToString());
}
Copy each stream property to disk
var timeToBeReceived = TimeSpan.MaxValue;
var extensions = context.Extensions;
if (extensions.TryGet(out DiscardIfNotReceivedBefore constraint))
{
timeToBeReceived = constraint.MaxTime;
}
var message = context.Message.Instance;
foreach (var property in StreamStorageHelper.GetStreamProperties(message))
{
var sourceStream = (Stream)property.GetValue(message, null);
// Ignore null stream properties
if (sourceStream == null)
{
continue;
}
var fileKey = GenerateKey(timeToBeReceived);
var filePath = Path.Combine(location, fileKey);
Directory.CreateDirectory(Path.GetDirectoryName(filePath));
using (var target = File.OpenWrite(filePath))
{
await sourceStream.CopyToAsync(target)
.ConfigureAwait(false);
}
// Reset the property to null so no other serializer attempts to use the property
property.SetValue(message, null);
// Dispose of the stream
sourceStream.Dispose();
// Store the header so on the receiving endpoint the file name is known
var headerKey = StreamStorageHelper.GetHeaderKey(message, property);
context.Headers[$"NServiceBus.PropertyStream.{headerKey}"] = fileKey;
}
await next()
.ConfigureAwait(false);
The file streams will appear on disk at the root of the solution in a folder called storage
. Here is a sample file structure:
> storage
> 2015-03-06_15
> 75ab3b84-8b37-4da7-bf07-b173f2f5570d
> 92f749bc-27a8-4ba4-bc7a-b502dffe9cd9
> 593a4670-1c09-4bbe-80ef-c22fb5356704
Each GUID is a file containing the contents of the emptied stream.
Reading back from the stream
This happens in as part of the incoming pipeline, see StreamReceiveBehavior.
public class StreamReceiveBehavior :
Behavior<IInvokeHandlerContext>
{
string location;
public StreamReceiveBehavior(StreamStorageSettings storageSettings)
{
location = Path.GetFullPath(storageSettings.Location);
}
public override async Task Invoke(IInvokeHandlerContext context, Func<Task> next)
{
Copy the contents of the files on disk back into the message properties.
var message = context.MessageBeingHandled;
var streamsToCleanUp = new List<FileStream>();
foreach (var property in StreamStorageHelper
.GetStreamProperties(message))
{
var headerKey = StreamStorageHelper.GetHeaderKey(message, property);
// only attempt to process properties that have an associated header
var key = $"NServiceBus.PropertyStream.{headerKey}";
if (!context.Headers.TryGetValue(key, out var dataBusKey))
{
continue;
}
var filePath = Path.Combine(location, dataBusKey);
// If the file doesn't exist then something has gone wrong with the file share.
// Perhaps the file has been manually deleted.
// For safety send the message to the error queue
if (!File.Exists(filePath))
{
var format = $"Expected a file to exist in '{filePath}'. It is possible the file has been prematurely cleaned up.";
throw new Exception(format);
}
var fileStream = File.OpenRead(filePath);
property.SetValue(message, fileStream);
streamsToCleanUp.Add(fileStream);
}
Clean up the opened streams after message processing.
await next()
.ConfigureAwait(false);
// Clean up all the temporary streams after handler processing
// via the "next()" delegate has occurred
foreach (var fileStream in streamsToCleanUp)
{
fileStream.Dispose();
}
Configuring the pipeline behaviors
endpointConfiguration.Pipeline.Register<StreamSendBehavior.Registration>();
endpointConfiguration.Pipeline.Register(typeof(StreamReceiveBehavior), "Copies the shared data back to the logical messages");
The message to send
[TimeToBeReceived("00:01:00")]
public class MessageWithStream :
ICommand
{
public string SomeProperty { get; set; }
public Stream StreamProperty { get; set; }
}
Sending with an HTTP stream
using (var httpClient = new HttpClient())
{
var message = new MessageWithStream
{
SomeProperty = "This message contains a stream",
StreamProperty = await httpClient.GetStreamAsync("http://www.particular.net")
};
await endpointInstance.Send("Samples.PipelineStream.Receiver", message)
.ConfigureAwait(false);
}
Sending with a file stream
var message = new MessageWithStream
{
SomeProperty = "This message contains a stream",
StreamProperty = File.OpenRead("FileToSend.txt")
};
await endpointInstance.Send("Samples.PipelineStream.Receiver", message)
.ConfigureAwait(false);
MemoryStream
ensure that the Position is set back to 0
before sending the message. Also note that writing large amounts of data to a MemoryStream
will result in significant memory usage (perhaps resulting in an OutOfMemoryException
) and put pressure on the garbage collector.Handler
public class MessageWithStreamHandler1 :
IHandleMessages<MessageWithStream>
{
static ILog log = LogManager.GetLogger<MessageWithStreamHandler1>();
public async Task Handle(MessageWithStream message, IMessageHandlerContext context)
{
var stream = message.StreamProperty;
log.Info($"Message received, size of stream property: {stream.Length} Bytes");
using (var streamReader = new StreamReader(stream))
{
string streamContents;
#if NET7_0
streamContents = await streamReader.ReadToEndAsync(context.CancellationToken).ConfigureAwait(false);
#else
streamContents = await streamReader.ReadToEndAsync().ConfigureAwait(false);
#endif
log.Info($"Stream content: {streamContents.Substring(0, 20)}...");
}
}
}
Difference to the databus
The built-in databus relies on byte arrays and memory streams to operate. As such, there are limits to the amount of data that it can send.