Building a transport

Component: NServiceBus
NuGet Package NServiceBus (6.x)

This sample show how to build a transport using the file system as a data store.

This is for learning purposes only and NOT for use in production.

This sample requires Visual Studio 2017.

Sample Structure

The sample has two endpoints, Endpoint1 and Endpoint2.

Both endpoints are configured to use the FileTransport.

endpointConfiguration.UseTransport<FileTransport>();

Message Flow

Endpoint1 Starts

Endpoint1 starts the message flow with a send of MessageA to Endpoint2.

var messageA = new MessageA();
await endpointInstance.Send("Samples.CustomTransport.Endpoint2", messageA)
    .ConfigureAwait(false);

Endpoint2 Handles and Replies

Endpoint2 has a Handler for MessageA and replies with MessageB

public class MessageAHandler :
    IHandleMessages<MessageA>
{
    static ILog log = LogManager.GetLogger<MessageAHandler>();

    public Task Handle(MessageA message, IMessageHandlerContext context)
    {
        log.Info("MessageA Handled");
        log.Info("Replying with MessageB");
        return context.Reply(new MessageB());
    }
}

Endpoint1 handles the reply

Endpoint1 then handles that reply.

public class MessageBHandler :
    IHandleMessages<MessageB>
{
    static ILog log = LogManager.GetLogger<MessageBHandler>();

    public Task Handle(MessageB message, IMessageHandlerContext context)
    {
        log.Info("MessageB Handled");
        return Task.CompletedTask;
    }
}

Running the sample

Start the sample with both Endpoint1 and Endpoint2 as startup projects. This will force both endpoints to create their underlying file system queues.

Now start Endpoint1. Notice it will send a message at startup.

Now look at the file system for Endpoint2 %temp%\FileTransport\Samples.CustomTransport.Endpoint2. It will contain the message headers in the root and the message body in %temp%\FileTransport\Samples.CustomTransport.Endpoint2\.bodies.

Now start Endpoint2. The reply message will appear in its file system %temp%\FileTransport\Samples.CustomTransport.Endpoint2\.

Transport implementation

Transport definition

The TransportDefinition allows a transport to define how it interacts with the core of NServiceBus.

public class FileTransport :
    TransportDefinition
{
    public override bool RequiresConnectionString => false;

    public override TransportInfrastructure Initialize(SettingsHolder settings, string connectionString)
    {
        return new FileTransportInfrastructure();
    }

    public override string ExampleConnectionStringForErrorMessage { get; } = "";
}

public class FileTransportInfrastructure :
    TransportInfrastructure
{
    public override TransportReceiveInfrastructure ConfigureReceiveInfrastructure()
    {
        return new TransportReceiveInfrastructure(
            messagePumpFactory: () => new FileTransportMessagePump(),
            queueCreatorFactory: () => new FileTransportQueueCreator(),
            preStartupCheck: () => Task.FromResult(StartupCheckResult.Success));
    }

    public override TransportSendInfrastructure ConfigureSendInfrastructure()
    {
        return new TransportSendInfrastructure(
            dispatcherFactory: () => new Dispatcher(),
            preStartupCheck: () => Task.FromResult(StartupCheckResult.Success));
    }

    public override TransportSubscriptionInfrastructure ConfigureSubscriptionInfrastructure()
    {
        throw new NotImplementedException();
    }

    public override EndpointInstance BindToLocalEndpoint(EndpointInstance instance)
    {
        return instance;
    }

    public override string ToTransportAddress(LogicalAddress logicalAddress)
    {
        var endpointInstance = logicalAddress.EndpointInstance;
        var discriminator = endpointInstance.Discriminator ?? "";
        var qualifier = logicalAddress.Qualifier ?? "";
        return Path.Combine(endpointInstance.Endpoint, discriminator, qualifier);
    }

    public override IEnumerable<Type> DeliveryConstraints
    {
        get
        {
            yield return typeof(DiscardIfNotReceivedBefore);
        }
    }

    public override TransportTransactionMode TransactionMode
    {
        get
        {
            return TransportTransactionMode.ReceiveOnly;
        }
    }

    public override OutboundRoutingPolicy OutboundRoutingPolicy
    {
        get
        {
            return new OutboundRoutingPolicy(
                sends: OutboundRoutingType.Unicast,
                publishes: OutboundRoutingType.Unicast,
                replies: OutboundRoutingType.Unicast);
        }
    }
}

Storage location

This transport is hard coded to persist messages to %TEMP%FileTransport/ADDRESS/.

public static class BaseDirectoryBuilder
{
    public static string BuildBasePath(string address)
    {
        var temp = Environment.ExpandEnvironmentVariables("%temp%");
        var fullPath = Path.Combine(temp, "FileTransport", address);
        Directory.CreateDirectory(fullPath);
        return fullPath;
    }
}

Header serializer

To serialize headers this transport uses JSON via DataContractJsonSerializer

static class HeaderSerializer
{
    public static string Serialize(Dictionary<string, string> instance)
    {
        var serializer = BuildSerializer();
        using (var stream = new MemoryStream())
        {
            serializer.WriteObject(stream, instance);
            return Encoding.UTF8.GetString(stream.ToArray());
        }
    }

    public static Dictionary<string, string> DeSerialize(string json)
    {
        var serializer = BuildSerializer();
        using (var stream = new MemoryStream(Encoding.UTF8.GetBytes(json)))
        {
            return (Dictionary<string, string>) serializer.ReadObject(stream);
        }
    }

    static DataContractJsonSerializer BuildSerializer()
    {
        var settings = new DataContractJsonSerializerSettings
        {
            UseSimpleDictionaryFormat = true,
        };
        return new DataContractJsonSerializer(typeof(Dictionary<string, string>), settings);
    }
}

Queue creation

At startup a transport can optionally create queues.

class FileTransportQueueCreator :
    ICreateQueues
{
    public Task CreateQueueIfNecessary(QueueBindings queueBindings, string identity)
    {
        foreach (var address in queueBindings.SendingAddresses)
        {
            CreateQueueDirectory(address);
        }

        foreach (var address in queueBindings.ReceivingAddresses)
        {
            CreateQueueDirectory(address);
        }

        return Task.CompletedTask;
    }

    static void CreateQueueDirectory(string address)
    {
        var fullPath = BaseDirectoryBuilder.BuildBasePath(address);
        var committedPath = Path.Combine(fullPath, ".committed");
        Directory.CreateDirectory(committedPath);
        var bodiesPath = Path.Combine(fullPath, ".bodies");
        Directory.CreateDirectory(bodiesPath);
    }
}

Handling transactions

How a transport handles transactions differs greatly between specific implementations. For demonstration purposes this transport uses a highly simplified file system based transaction.

class DirectoryBasedTransaction :
    IDisposable
{
    string basePath;
    bool committed;
    string transactionDir;

    public DirectoryBasedTransaction(string basePath)
    {
        this.basePath = basePath;
        var transactionId = Guid.NewGuid().ToString();

        transactionDir = Path.Combine(basePath, ".pending", transactionId);
    }

    public string FileToProcess { get; private set; }

    public void BeginTransaction(string incomingFilePath)
    {
        Directory.CreateDirectory(transactionDir);
        FileToProcess = Path.Combine(transactionDir, Path.GetFileName(incomingFilePath));
        File.Move(incomingFilePath, FileToProcess);
    }

    public void Commit() => committed = true;

    public void Dispose()
    {
        if (!committed)
        {
            // rollback by moving the file back to the main dir
            File.Move(FileToProcess, Path.Combine(basePath, Path.GetFileName(FileToProcess)));
        }

        Directory.Delete(transactionDir, true);
    }
}

Dispatcher

The dispatcher is responsible for translating a message (its binary body and headers) and placing it onto the underlying transport technology.

class Dispatcher :
    IDispatchMessages
{
    public Task Dispatch(TransportOperations outgoingMessages, TransportTransaction transaction, ContextBag context)
    {
        foreach (var operation in outgoingMessages.UnicastTransportOperations)
        {
            var basePath = BaseDirectoryBuilder.BuildBasePath(operation.Destination);
            var nativeMessageId = Guid.NewGuid().ToString();
            var bodyPath = Path.Combine(basePath, ".bodies", $"{nativeMessageId}.xml");

            var dir = Path.GetDirectoryName(bodyPath);
            if (!Directory.Exists(dir))
            {
                Directory.CreateDirectory(dir);
            }
            File.WriteAllBytes(bodyPath, operation.Message.Body);

            var messageContents = new List<string>
            {
                bodyPath,
                HeaderSerializer.Serialize(operation.Message.Headers)
            };

            var messagePath = Path.Combine(basePath, $"{nativeMessageId}.txt");

            // write to temp file first so an atomic move can be done
            // this avoids the file being locked when the receiver tries to process it
            var tempFile = Path.GetTempFileName();
            File.WriteAllLines(tempFile, messageContents);
            File.Move(tempFile, messagePath);
        }

        return Task.CompletedTask;
    }
}

Message pump

The message pump is responsible for reading message from the underlying transport and pushing them into the message handling pipeline.

class FileTransportMessagePump :
    IPushMessages
{
    static ILog log = LogManager.GetLogger<FileTransportMessagePump>();

    CancellationToken cancellationToken;
    CancellationTokenSource cancellationTokenSource;
    SemaphoreSlim concurrencyLimiter;
    Task messagePumpTask;
    Func<ErrorContext, Task<ErrorHandleResult>> onError;
    string path;
    Func<MessageContext, Task> pipeline;
    bool purgeOnStartup;
    ConcurrentDictionary<Task, Task> runningReceiveTasks;

    public Task Init(Func<MessageContext, Task> onMessage, Func<ErrorContext, Task<ErrorHandleResult>> onError, CriticalError criticalError, PushSettings settings)
    {
        this.onError = onError;
        pipeline = onMessage;
        path = BaseDirectoryBuilder.BuildBasePath(settings.InputQueue);
        purgeOnStartup = settings.PurgeOnStartup;
        return Task.CompletedTask;
    }

    public void Start(PushRuntimeSettings limitations)
    {
        runningReceiveTasks = new ConcurrentDictionary<Task, Task>();
        concurrencyLimiter = new SemaphoreSlim(limitations.MaxConcurrency);
        cancellationTokenSource = new CancellationTokenSource();

        cancellationToken = cancellationTokenSource.Token;

        if (purgeOnStartup)
        {
            Directory.Delete(path, true);
            Directory.CreateDirectory(path);
        }

        messagePumpTask = Task.Factory
            .StartNew(
                function: ProcessMessages,
                cancellationToken: CancellationToken.None,
                creationOptions: TaskCreationOptions.LongRunning,
                scheduler: TaskScheduler.Default)
            .Unwrap();
    }

    public async Task Stop()
    {
        cancellationTokenSource.Cancel();

        var timeoutTask = Task.Delay(TimeSpan.FromSeconds(30), cancellationTokenSource.Token);
        var allTasks = runningReceiveTasks.Values.Concat(new[]
        {
            messagePumpTask
        });
        var finishedTask = await Task.WhenAny(Task.WhenAll(allTasks), timeoutTask)
            .ConfigureAwait(false);

        if (finishedTask.Equals(timeoutTask))
        {
            log.Error("The message pump failed to stop with in the time allowed(30s)");
        }

        concurrencyLimiter.Dispose();
        runningReceiveTasks.Clear();
    }

    [DebuggerNonUserCode]
    async Task ProcessMessages()
    {
        try
        {
            await InnerProcessMessages()
                .ConfigureAwait(false);
        }
        catch (OperationCanceledException)
        {
            // For graceful shutdown purposes
        }
        catch (Exception ex)
        {
            log.Error("File Message pump failed", ex);
        }

        if (!cancellationToken.IsCancellationRequested)
        {
            await ProcessMessages()
                .ConfigureAwait(false);
        }
    }

    async Task InnerProcessMessages()
    {
        while (!cancellationTokenSource.IsCancellationRequested)
        {
            var filesFound = false;

            foreach (var filePath in Directory.EnumerateFiles(path, "*.*"))
            {
                filesFound = true;
                await ProcessFile(filePath)
                    .ConfigureAwait(false);
            }

            if (!filesFound)
            {
                await Task.Delay(10, cancellationToken)
                    .ConfigureAwait(false);
            }
        }
    }

    async Task ProcessFile(string filePath)
    {
        var nativeMessageId = Path.GetFileNameWithoutExtension(filePath);

        await concurrencyLimiter.WaitAsync(cancellationToken)
            .ConfigureAwait(false);

        var task = Task.Run(async () =>
        {
            try
            {
                await ProcessFileWithTransaction(filePath, nativeMessageId)
                    .ConfigureAwait(false);
            }
            finally
            {
                concurrencyLimiter.Release();
            }
        }, cancellationToken);

        task.ContinueWith(t =>
        {
            runningReceiveTasks.TryRemove(t, out var toBeRemoved);
        },
            TaskContinuationOptions.ExecuteSynchronously)
            .Ignore();

        runningReceiveTasks.AddOrUpdate(task, task, (k, v) => task)
            .Ignore();
    }

    async Task ProcessFileWithTransaction(string filePath, string messageId)
    {
        using (var transaction = new DirectoryBasedTransaction(path))
        {
            transaction.BeginTransaction(filePath);

            var message = File.ReadAllLines(transaction.FileToProcess);
            var bodyPath = message.First();
            var json = string.Join("", message.Skip(1));
            var headers = HeaderSerializer.DeSerialize(json);

            if (headers.TryGetValue(Headers.TimeToBeReceived, out var ttbrString))
            {
                var ttbr = TimeSpan.Parse(ttbrString);
                // file.move preserves create time
                var sentTime = File.GetCreationTimeUtc(transaction.FileToProcess);

                if (sentTime + ttbr < DateTime.UtcNow)
                {
                    return;
                }
            }

            var body = File.ReadAllBytes(bodyPath);
            var transportTransaction = new TransportTransaction();
            transportTransaction.Set(transaction);

            var shouldCommit = await HandleMessageWithRetries(messageId, headers, body, transportTransaction, 1)
                .ConfigureAwait(false);

            if (shouldCommit)
            {
                transaction.Commit();
            }
        }
    }

    async Task<bool> HandleMessageWithRetries(string messageId, Dictionary<string, string> headers, byte[] body, TransportTransaction transportTransaction, int processingAttempt)
    {
        try
        {
            var receiveCancellationTokenSource = new CancellationTokenSource();
            var pushContext = new MessageContext(
                messageId: messageId,
                headers: new Dictionary<string, string>(headers),
                body: body,
                transportTransaction: transportTransaction,
                receiveCancellationTokenSource: receiveCancellationTokenSource,
                context: new ContextBag());

            await pipeline(pushContext)
                .ConfigureAwait(false);

            return !receiveCancellationTokenSource.IsCancellationRequested;
        }
        catch (Exception e)
        {
            var errorContext = new ErrorContext(e, headers, messageId, body, transportTransaction, processingAttempt);
            var errorHandlingResult = await onError(errorContext)
                .ConfigureAwait(false);

            if (errorHandlingResult == ErrorHandleResult.RetryRequired)
            {
                return await HandleMessageWithRetries(messageId, headers, body, transportTransaction, ++processingAttempt)
                    .ConfigureAwait(false);
            }

            return true;
        }
    }
}

Transport tests

NServiceBus provides a test suite targeting transport implementations to verify the implementation.

Pulling in the tests

The tests are shipped in the NServiceBus.TransportTests.Sources NuGet package. This package can be installed into a dedicated test project. In this sample, CustomTransport.TransportTests contains the transport tests.

Configuring the tests

The transport tests need to be configured to use the custom transport by implementing IConfigureTransportInfrastructure:

public class ConfigureFileTransportInfrastructure : IConfigureTransportInfrastructure
{
    public TransportConfigurationResult Configure(SettingsHolder settings, TransportTransactionMode transactionMode)
    {
        return new TransportConfigurationResult
        {
            PurgeInputQueueOnStartup = true,
            TransportInfrastructure = new FileTransportInfrastructure()
        };
    }

    public Task Cleanup()
    {
        return Task.CompletedTask;
    }
}

Running the tests

The transport tests can be run with all test runners that support NUnit.

Related Articles


Last modified