SQL HTTP Passthrough

Component: SQL Server - HTTP Passthrough
NuGet Package NServiceBus.SqlServer.HttpPassthrough (2.x)
This is a community maintained project
Target NServiceBus Version: 7.x

This sample leverages the SQL Server - HTTP Passthrough to provide a bridge between an HTTP stream (via JavaScript on a web page) and the SQL Server Transport.

The flow of this sample is:

  • User performs an action on a web page that triggers JavaScript.
  • JavaScript posts the message body to a specific URL.
  • Controller takes the HTTP request and places it in a queue.
  • An endpoint receives the message and logs all contextual information.

Prerequisites

An instance of SQL Server Express is installed and accessible as .\SqlExpress.

At startup each endpoint will create its required SQL assets including databases, tables and schemas.

The database created by this sample is SqlHttpPassthroughSample.

Running the sample

When the solution is started two projects will start:

  • Endpoint
  • Web (as a console and browser)

In the browser, press the button and a message will be received by the endpoint.

Code walk-through

SampleEndpoint

This is a standard NServiceBus endpoint that will receive the message.

EndpointConfiguration

The endpoint is configured as follows:

endpointConfiguration.UseSerialization<NewtonsoftSerializer>();
endpointConfiguration.EnableAttachments(SqlHelper.ConnectionString, TimeToKeep.Default);
var transport = endpointConfiguration.UseTransport<SqlServerTransport>();
transport.ConnectionString(SqlHelper.ConnectionString);
#if NETCOREAPP2_0
// Since Transaction scope mode is not supported in .NET Core 2.0
// https://docs.particular.net/transports/sql/transactions#transaction-scope-distributed-transaction
transport.Transactions(TransportTransactionMode.SendsAtomicWithReceive);
#endif
endpointConfiguration.EnableInstallers();

Handler

The receiving handler outputs all incoming headers and attachments.

class MyHandler : IHandleMessages<SampleMessage>
{
    static ILog log = LogManager.GetLogger<MyHandler>();

    public Task Handle(SampleMessage message, IMessageHandlerContext context)
    {
        log.Info("SampleMessage received");
        log.Info($"Property={message.Property}");
        foreach (var header in context.MessageHeaders)
        {
            var headerSuffix = header.Key.Replace("NServiceBus.", "");
            log.Info($"{headerSuffix}={header.Value}");
        }

        return context.Attachments().ProcessStreams(WriteAttachment);
    }

    async Task WriteAttachment(string name, Stream stream)
    {
        using (var reader = new StreamReader(stream))
        {
            var contents = await reader.ReadToEndAsync()
                .ConfigureAwait(false);
            log.Info($"Attachment: {name}. Contents:{contents}");
        }
    }
}

Message contract

There is a single message with a property to illustrate the data being passed through:

namespace SampleNamespace
{
    public class SampleMessage : IMessage
    {
        public string Property { get; set; }
    }
}

Note that the messages exist only in this endpoint and do not need to be used, via a reference, in the Web project.

SampleWeb

Startup

At ASP.NET Core Startup several actions are taken:

  • AddSqlHttpPassthrough is called on IServiceCollection which makes the ISqlPassthrough interface available via dependency injection.
  • AddSqlHttpPassthroughBadRequestMiddleware is called on IApplicationBuilder, which adds Middleware to the pipeline. This means that if the request parsing code of the SQL HTTP Passthrough throws a BadRequestException, that exception can be gracefully handled and an HTTP BadRequest (400) can be sent as a response. This is optional, and a controller can choose to explicitly catch and handle BadRequestException in a different way.
public class Startup
{
    public Startup(IConfiguration configuration)
    {
        Configuration = configuration;
    }

    public IConfiguration Configuration { get; }

    public void ConfigureServices(IServiceCollection services)
    {
        var configuration = new PassthroughConfiguration(
            connectionFunc: OpenConnection,
            callback: AmendMessage,
            dedupCriticalError: exception =>
            {
                Environment.FailFast("Dedup failure", exception);
            });
        services.AddSqlHttpPassthrough(configuration);
        services.AddMvcCore();
    }

    Task<Table> AmendMessage(HttpContext context, PassthroughMessage message)
    {
        message.ExtraHeaders = new Dictionary<string, string>
        {
            {"CustomHeader", "CustomHeaderValue"}
        };
        //TODO: validate that the destination allowed
        var destinationTable = new Table(message.Destination);
        return Task.FromResult(destinationTable);
    }

    public void Configure(IApplicationBuilder builder)
    {
        builder.AddSqlHttpPassthroughBadRequestMiddleware();
        builder.UseMvc();
    }

    Task<SqlConnection> OpenConnection(CancellationToken cancellation)
    {
        return ConnectionHelpers.OpenConnection(SqlHelper.ConnectionString, cancellation);
    }
}

PassthroughController

The PassthroughController consists of several parts.

  • ISqlPassthrough injected through dependency injection.
  • The controller handling the HTTP Post and passing that information to ISqlPassthrough.Send.
[Route("SendMessage")]
public class PassthroughController : ControllerBase
{
    ISqlPassthrough sender;

    public PassthroughController(ISqlPassthrough sender)
    {
        this.sender = sender;
    }

    [HttpPost]
    public Task Post(CancellationToken cancellation)
    {
        return sender.Send(HttpContext, cancellation);
    }
}
In a production application, the controller would perform authorization and authentication on the incoming request.

SampleClientController

The SampleClientController serves up the HTML UI for this sample.

public class SampleClientController : ControllerBase
{
    [HttpGet]
    [Route("SampleClient")]
    public IActionResult SampleClient()
    {
        var file = Path.Combine(Directory.GetCurrentDirectory(), "SampleClient.html");
        return PhysicalFile(file, "text/html");
    }
}

HTML form

The HTML captures the data that will be submitted to the PassthroughController.

<body>
    Select files: <input type="file" id="files" multiple /><br />
    Property: <input type="text" id="property" /><br />
    <button onclick="PostToBus()">PostToBus</button>
</body>

Form submission

The JavaScript that submits the data does so through by building up a FormData object and POSTing that via the Fetch API.

function PostToBus() {
    var message = new Object();
    message.Property = document.getElementById("property").value;
    var jsonString = JSON.stringify(message);

    var data = new FormData();
    var files = document.getElementById("files").files;
    for (var i = 0; i < files.length; i++) {
        data.append('files[]', files[i], files[i].name);
    }
    data.append("message", jsonString);

    var postSettings = {
        method: 'POST',
        credentials: 'include',
        mode: 'cors',
        headers: {
            'MessageType': 'SampleMessage',
            'MessageNamespace': 'SampleNamespace',
            'MessageId': newGuid(),
            'Destination': 'SampleEndpoint'
        },
        body: data
    };

    return fetch('SendMessage', postSettings);
}

MessageId generation

For deduplication to operate, the client must generate a MessageId, so that any retries can be ignored. JavaScript does not contain native functionality to generate a GUID, so a helper method is used.

function newGuid() {
    return ([1e7] + -1e3 + -4e3 + -8e3 + -1e11).replace(/[018]/g, c =>
        (c ^ crypto.getRandomValues(new Uint8Array(1))[0] & 15 >> c / 4)
            .toString(16)
            .toUpperCase()
    );
}

Testing

The solution includes an integration test that verifies that a submitted HTTP request is intercepted by the SampleEndpoint.

[TestFixture]
public class IntegrationTests
{
    static ManualResetEvent resetEvent = new ManualResetEvent(false);

    [Test]
    public async Task Integration()
    {
        await Installation.Run();
        var endpoint = await Program.StartEndpoint();

        await SubmitMultipartForm();

        if (!resetEvent.WaitOne(TimeSpan.FromSeconds(5)))
        {
            throw new Exception("OutgoingMessage not received");
        }

        await endpoint.Stop();
    }

    static async Task SubmitMultipartForm()
    {
        var hostBuilder = new WebHostBuilder();
        hostBuilder.UseStartup<Startup>();
        using (var server = new TestServer(hostBuilder))
        using (var client = server.CreateClient())
        {
            client.DefaultRequestHeaders.Referrer = new Uri("http://TheReferrer");
            await new ClientFormSender(client).Send(
                route: "/SendMessage",
                message: "{\"Property\": \"Value\"}",
                typeName: "SampleMessage",
                typeNamespace: "SampleNamespace",
                destination: "SampleEndpoint",
                attachments: new Dictionary<string, byte[]>
                {
                    {"fileName", Encoding.UTF8.GetBytes("fileContents")}
                });
        }
    }

    class Handler : IHandleMessages<SampleMessage>
    {
        public Task Handle(SampleMessage message, IMessageHandlerContext context)
        {
            resetEvent.Set();
            return Task.CompletedTask;
        }
    }
}

MARS

All SqlConnections must have Multiple Active Result Sets (MARS) as multiple concurrent async request can be performed.

Samples


Last modified