Getting Started
Architecture
Transports
Persistence
ServiceInsight
ServicePulse
ServiceControl
Monitoring
Samples

Customizing the data bus

Component: NServiceBus
NuGet Package: NServiceBus (8.1)

Endpoints support sending and receiving large chunks of data via the data bus.

It is possible to create a custom data bus implementation. This is done by making use of the Features extension.

Implement the IDataBus interface

This new class will provide the custom implementations for the Get and Put methods for the data bus.

class CustomDataBus :
    IDataBus
{
    public Task<Stream> Get(string key, CancellationToken cancellationToken)
    {
        Stream stream = File.OpenRead("blob.dat");
        return Task.FromResult(stream);
    }

    public async Task<string> Put(Stream stream, TimeSpan timeToBeReceived, CancellationToken cancellationToken)
    {
        using (var destination = File.OpenWrite("blob.dat"))
        {
            await stream.CopyToAsync(destination, 81920, cancellationToken);
        }
        return "the-key-of-the-stored-file-such-as-the-full-path";
    }

    public Task Start(CancellationToken cancellationToken)
    {
        return Task.CompletedTask;
    }
}

This new implementation needs to be registered as a new feature.

Define a feature

Define a new feature that registers the custom data bus implementation class.

class CustomDatabusFeature : Feature
{
    public CustomDatabusFeature()
        => DependsOn<DataBus>();

    protected override void Setup(FeatureConfigurationContext context)
        => context.Services.AddSingleton<IDataBus, CustomDataBus>();
}

Define a DataBusDefinition

Define a new class which inherits from the DataBusDefinition class.

class CustomDatabusDefinition : DataBusDefinition
{
    protected override Type ProvidedByFeature()
        => typeof(CustomDatabusFeature);
}

Configure the endpoint

Configure the endpoint to use the custom data bus implementation instead of the default data bus:

endpointConfiguration.UseDataBus(svc => new CustomDataBus(), new SystemJsonDataBusSerializer());

Samples


Last modified