This sample show how to build a transport using the file system as a data store.
Sample Structure
The sample has two endpoints, Endpoint1
and Endpoint2
.
Both endpoints are configured to use the FileTransport
.
endpointConfiguration.UseTransport<FileTransport>();
Message Flow
Endpoint1 Starts
Endpoint1
starts the message flow with a send of MessageA
to Endpoint2
.
var messageA = new MessageA();
await endpointInstance.Send("Samples.CustomTransport.Endpoint2", messageA)
.ConfigureAwait(false);
Endpoint2 Handles and Replies
Endpoint2
has a Handler for MessageA
and replies with MessageB
public class MessageAHandler :
IHandleMessages<MessageA>
{
static ILog log = LogManager.GetLogger<MessageAHandler>();
public Task Handle(MessageA message, IMessageHandlerContext context)
{
log.Info("MessageA Handled");
log.Info("Replying with MessageB");
return context.Reply(new MessageB());
}
}
Endpoint1 handles the reply
Endpoint1
then handles that reply.
public class MessageBHandler :
IHandleMessages<MessageB>
{
static ILog log = LogManager.GetLogger<MessageBHandler>();
public Task Handle(MessageB message, IMessageHandlerContext context)
{
log.Info("MessageB Handled");
return Task.CompletedTask;
}
}
Running the sample
Start the sample with both Endpoint1
and Endpoint2
as startup projects. This will force both endpoints to create their underlying file system queues.
Now start Endpoint1
. Notice it will send a message at startup.
Now look at the file system for Endpoint2
%temp%\
. It will contain the message headers in the root and the message body in %temp%\
.
Now start Endpoint2
. The reply message will appear in its file system %temp%\
.
Transport implementation
Transport definition
The TransportDefinition
allows a transport to define how it interacts with the core of NServiceBus.
public class FileTransport :
TransportDefinition
{
public override bool RequiresConnectionString => false;
public override TransportInfrastructure Initialize(SettingsHolder settings, string connectionString)
{
return new FileTransportInfrastructure();
}
public override string ExampleConnectionStringForErrorMessage { get; } = "";
}
public class FileTransportInfrastructure :
TransportInfrastructure
{
public override TransportReceiveInfrastructure ConfigureReceiveInfrastructure()
{
return new TransportReceiveInfrastructure(
messagePumpFactory: () => new FileTransportMessagePump(),
queueCreatorFactory: () => new FileTransportQueueCreator(),
preStartupCheck: () => Task.FromResult(StartupCheckResult.Success));
}
public override TransportSendInfrastructure ConfigureSendInfrastructure()
{
return new TransportSendInfrastructure(
dispatcherFactory: () => new Dispatcher(),
preStartupCheck: () => Task.FromResult(StartupCheckResult.Success));
}
public override TransportSubscriptionInfrastructure ConfigureSubscriptionInfrastructure()
{
throw new NotImplementedException();
}
public override EndpointInstance BindToLocalEndpoint(EndpointInstance instance)
{
return instance;
}
public override string ToTransportAddress(LogicalAddress logicalAddress)
{
var endpointInstance = logicalAddress.EndpointInstance;
var discriminator = endpointInstance.Discriminator ?? "";
var qualifier = logicalAddress.Qualifier ?? "";
return Path.Combine(endpointInstance.Endpoint, discriminator, qualifier);
}
public override IEnumerable<Type> DeliveryConstraints
{
get
{
yield return typeof(DiscardIfNotReceivedBefore);
}
}
public override TransportTransactionMode TransactionMode
{
get
{
return TransportTransactionMode.ReceiveOnly;
}
}
public override OutboundRoutingPolicy OutboundRoutingPolicy
{
get
{
return new OutboundRoutingPolicy(
sends: OutboundRoutingType.Unicast,
publishes: OutboundRoutingType.Unicast,
replies: OutboundRoutingType.Unicast);
}
}
}
Storage location
This transport is hard-coded to persist messages to %TEMP%FileTransport/
.
public static class BaseDirectoryBuilder
{
public static string BuildBasePath(string address)
{
var temp = Environment.ExpandEnvironmentVariables("%temp%");
var fullPath = Path.Combine(temp, "FileTransport", address);
Directory.CreateDirectory(fullPath);
return fullPath;
}
}
Header serializer
To serialize headers this transport uses JSON via DataContractJsonSerializer
static class HeaderSerializer
{
public static string Serialize(Dictionary<string, string> instance)
{
var serializer = BuildSerializer();
using (var stream = new MemoryStream())
{
serializer.WriteObject(stream, instance);
return Encoding.UTF8.GetString(stream.ToArray());
}
}
public static Dictionary<string, string> DeSerialize(string json)
{
var serializer = BuildSerializer();
using (var stream = new MemoryStream(Encoding.UTF8.GetBytes(json)))
{
return (Dictionary<string, string>) serializer.ReadObject(stream);
}
}
static DataContractJsonSerializer BuildSerializer()
{
var settings = new DataContractJsonSerializerSettings
{
UseSimpleDictionaryFormat = true,
};
return new DataContractJsonSerializer(typeof(Dictionary<string, string>), settings);
}
}
Queue creation
At startup a transport can optionally create queues.
class FileTransportQueueCreator :
ICreateQueues
{
public Task CreateQueueIfNecessary(QueueBindings queueBindings, string identity)
{
foreach (var address in queueBindings.SendingAddresses)
{
CreateQueueDirectory(address);
}
foreach (var address in queueBindings.ReceivingAddresses)
{
CreateQueueDirectory(address);
}
return Task.CompletedTask;
}
static void CreateQueueDirectory(string address)
{
var fullPath = BaseDirectoryBuilder.BuildBasePath(address);
var committedPath = Path.Combine(fullPath, ".committed");
Directory.CreateDirectory(committedPath);
var bodiesPath = Path.Combine(fullPath, ".bodies");
Directory.CreateDirectory(bodiesPath);
}
}
Handling transactions
How a transport handles transactions differs greatly between specific implementations. For demonstration purposes this transport uses a highly simplified file system based transaction.
class DirectoryBasedTransaction :
IDisposable
{
string basePath;
bool committed;
string transactionDir;
public DirectoryBasedTransaction(string basePath)
{
this.basePath = basePath;
var transactionId = Guid.NewGuid().ToString();
transactionDir = Path.Combine(basePath, ".pending", transactionId);
}
public string FileToProcess { get; private set; }
public void BeginTransaction(string incomingFilePath)
{
Directory.CreateDirectory(transactionDir);
FileToProcess = Path.Combine(transactionDir, Path.GetFileName(incomingFilePath));
File.Move(incomingFilePath, FileToProcess);
}
public void Commit() => committed = true;
public void Dispose()
{
if (!committed)
{
// rollback by moving the file back to the main dir
File.Move(FileToProcess, Path.Combine(basePath, Path.GetFileName(FileToProcess)));
}
Directory.Delete(transactionDir, true);
}
}
Dispatcher
The dispatcher is responsible for translating a message (its binary body and headers) and placing it onto the underlying transport technology.
class Dispatcher :
IDispatchMessages
{
public Task Dispatch(TransportOperations outgoingMessages, TransportTransaction transaction, ContextBag context)
{
foreach (var operation in outgoingMessages.UnicastTransportOperations)
{
var basePath = BaseDirectoryBuilder.BuildBasePath(operation.Destination);
var nativeMessageId = Guid.NewGuid().ToString();
var bodyPath = Path.Combine(basePath, ".bodies", $"{nativeMessageId}.xml");
var dir = Path.GetDirectoryName(bodyPath);
if (!Directory.Exists(dir))
{
Directory.CreateDirectory(dir);
}
File.WriteAllBytes(bodyPath, operation.Message.Body);
var messageContents = new List<string>
{
bodyPath,
HeaderSerializer.Serialize(operation.Message.Headers)
};
var messagePath = Path.Combine(basePath, $"{nativeMessageId}.txt");
// write to temp file first so an atomic move can be done
// this avoids the file being locked when the receiver tries to process it
var tempFile = Path.GetTempFileName();
File.WriteAllLines(tempFile, messageContents);
File.Move(tempFile, messagePath);
}
return Task.CompletedTask;
}
}
Message pump
The message pump is responsible for reading message from the underlying transport and pushing them into the message handling pipeline.
class FileTransportMessagePump :
IPushMessages
{
static ILog log = LogManager.GetLogger<FileTransportMessagePump>();
CancellationToken cancellationToken;
CancellationTokenSource cancellationTokenSource;
SemaphoreSlim concurrencyLimiter;
Task messagePumpTask;
Func<ErrorContext, Task<ErrorHandleResult>> onError;
string path;
Func<MessageContext, Task> pipeline;
bool purgeOnStartup;
ConcurrentDictionary<Task, Task> runningReceiveTasks;
public Task Init(Func<MessageContext, Task> onMessage, Func<ErrorContext, Task<ErrorHandleResult>> onError, CriticalError criticalError, PushSettings settings)
{
this.onError = onError;
pipeline = onMessage;
path = BaseDirectoryBuilder.BuildBasePath(settings.InputQueue);
purgeOnStartup = settings.PurgeOnStartup;
return Task.CompletedTask;
}
public void Start(PushRuntimeSettings limitations)
{
runningReceiveTasks = new ConcurrentDictionary<Task, Task>();
concurrencyLimiter = new SemaphoreSlim(limitations.MaxConcurrency);
cancellationTokenSource = new CancellationTokenSource();
cancellationToken = cancellationTokenSource.Token;
if (purgeOnStartup)
{
Directory.Delete(path, true);
Directory.CreateDirectory(path);
}
messagePumpTask = Task.Factory
.StartNew(
function: ProcessMessages,
cancellationToken: CancellationToken.None,
creationOptions: TaskCreationOptions.LongRunning,
scheduler: TaskScheduler.Default)
.Unwrap();
}
public async Task Stop()
{
cancellationTokenSource.Cancel();
var timeoutTask = Task.Delay(TimeSpan.FromSeconds(30), cancellationTokenSource.Token);
var allTasks = runningReceiveTasks.Values.Concat(new[]
{
messagePumpTask
});
var finishedTask = await Task.WhenAny(Task.WhenAll(allTasks), timeoutTask)
.ConfigureAwait(false);
if (finishedTask.Equals(timeoutTask))
{
log.Error("The message pump failed to stop with in the time allowed(30s)");
}
concurrencyLimiter.Dispose();
runningReceiveTasks.Clear();
}
[DebuggerNonUserCode]
async Task ProcessMessages()
{
try
{
await InnerProcessMessages()
.ConfigureAwait(false);
}
catch (OperationCanceledException)
{
// For graceful shutdown purposes
}
catch (Exception ex)
{
log.Error("File Message pump failed", ex);
}
if (!cancellationToken.IsCancellationRequested)
{
await ProcessMessages()
.ConfigureAwait(false);
}
}
async Task InnerProcessMessages()
{
while (!cancellationTokenSource.IsCancellationRequested)
{
var filesFound = false;
foreach (var filePath in Directory.EnumerateFiles(path, "*.*"))
{
filesFound = true;
await ProcessFile(filePath)
.ConfigureAwait(false);
}
if (!filesFound)
{
await Task.Delay(10, cancellationToken)
.ConfigureAwait(false);
}
}
}
async Task ProcessFile(string filePath)
{
var nativeMessageId = Path.GetFileNameWithoutExtension(filePath);
await concurrencyLimiter.WaitAsync(cancellationToken)
.ConfigureAwait(false);
var task = Task.Run(async () =>
{
try
{
await ProcessFileWithTransaction(filePath, nativeMessageId)
.ConfigureAwait(false);
}
finally
{
concurrencyLimiter.Release();
}
}, cancellationToken);
task.ContinueWith(t =>
{
runningReceiveTasks.TryRemove(t, out var toBeRemoved);
},
TaskContinuationOptions.ExecuteSynchronously)
.Ignore();
runningReceiveTasks.AddOrUpdate(task, task, (k, v) => task)
.Ignore();
}
async Task ProcessFileWithTransaction(string filePath, string messageId)
{
using (var transaction = new DirectoryBasedTransaction(path))
{
transaction.BeginTransaction(filePath);
var message = File.ReadAllLines(transaction.FileToProcess);
var bodyPath = message.First();
var json = string.Join("", message.Skip(1));
var headers = HeaderSerializer.DeSerialize(json);
if (headers.TryGetValue(Headers.TimeToBeReceived, out var ttbrString))
{
var ttbr = TimeSpan.Parse(ttbrString);
// file.move preserves create time
var sentTime = File.GetCreationTimeUtc(transaction.FileToProcess);
if (sentTime + ttbr < DateTime.UtcNow)
{
return;
}
}
var body = File.ReadAllBytes(bodyPath);
var transportTransaction = new TransportTransaction();
transportTransaction.Set(transaction);
var shouldCommit = await HandleMessageWithRetries(messageId, headers, body, transportTransaction, 1)
.ConfigureAwait(false);
if (shouldCommit)
{
transaction.Commit();
}
}
}
async Task<bool> HandleMessageWithRetries(string messageId, Dictionary<string, string> headers, byte[] body, TransportTransaction transportTransaction, int processingAttempt)
{
try
{
var receiveCancellationTokenSource = new CancellationTokenSource();
var pushContext = new MessageContext(
messageId: messageId,
headers: new Dictionary<string, string>(headers),
body: body,
transportTransaction: transportTransaction,
receiveCancellationTokenSource: receiveCancellationTokenSource,
context: new ContextBag());
await pipeline(pushContext)
.ConfigureAwait(false);
return !receiveCancellationTokenSource.IsCancellationRequested;
}
catch (Exception e)
{
var errorContext = new ErrorContext(e, headers, messageId, body, transportTransaction, processingAttempt);
var errorHandlingResult = await onError(errorContext)
.ConfigureAwait(false);
if (errorHandlingResult == ErrorHandleResult.RetryRequired)
{
return await HandleMessageWithRetries(messageId, headers, body, transportTransaction, ++processingAttempt)
.ConfigureAwait(false);
}
return true;
}
}
}
Transport tests
NServiceBus provides a test suite targeting transport implementations to verify the implementation.
Pulling in the tests
The tests are shipped in the NServiceBus.
NuGet package. This package can be installed into a dedicated test project. In this sample, CustomTransport.
contains the transport tests.
Configuring the tests
The transport tests need to be configured to use the custom transport by implementing IConfigureTransportInfrastructure
:
public class ConfigureFileTransportInfrastructure : IConfigureTransportInfrastructure
{
public TransportConfigurationResult Configure(SettingsHolder settings, TransportTransactionMode transactionMode)
{
return new TransportConfigurationResult
{
PurgeInputQueueOnStartup = true,
TransportInfrastructure = new FileTransportInfrastructure()
};
}
public Task Cleanup()
{
return Task.CompletedTask;
}
}
Running the tests
The transport tests can be run with all test runners that support NUnit.