OWIN HTTP Message Pass Through

Component: NServiceBus
NuGet Package NServiceBus (7-pre)
This page targets a pre-release version and is subject to change prior to the final release.

Introduction

This sample leverages OWIN (Open Web Interface for .NET) to add light weight HTTP message pass through middleware (general term for an extension to OWIN) that can be re-used in a variety of web technologies. This middleware provides the bridge between a HTTP stream (via JavaScript on a web page) and the queue used by NServiceBus.

The flow of this samples is as follows

  • User performs some action on a webpage that triggers some JavaScript
  • JavaScript posts the message body to a specific URL
  • OWIN intercepts that post and passes to the bus middleware
  • The middleware takes the HTTP request, optionally deserializes it, and places it on the queue

What is OWIN

OWIN defines a standard interface between .NET web servers and web applications. The goal of the OWIN interface is to decouple server and application, encourage the development of simple modules for .NET web development, and, by being an open standard, stimulate the open source ecosystem of .NET web development tools.

So extensions to NServiceBus that plug into OWIN can be easily applied to many .NET web server technologies.

The purpose of this sample

The primary purpose of this sample is to illustrate how simple it is to bridge the world of HTTP with the world of a service bus. The secondary purpose is to illustrate, as well as compare and contrast, two ways of communicating with the NServiceBus. i.e. using the NServiceBus API and using the native queue.

Comparisons with the NServiceBus Gateway

Performance

  • The Gateway uses a HttpListener while this sample allows leveraging the full power of the choice of webserver.
  • The Gateway is limited to run in a single endpoint while this samples supports the use any well known web scale out technologies.

Full Control of the incoming message

This sample allow the customization of the http-to-message handling code that places the message on the queue. As such this allows:

  • Write custom validation rules on the message
  • Return custom errors to the HTTP client
  • Apply custom authentication and authorization
  • Perform custom serialization

Hosting

The gateway is designed to run inside a NServiceBus endpoint. This sample code can run with the selection of technologies e.g. it will work with IIS, self-hosted, asp.mvc, NancyFX or within a NServiceBus endpoint in the same way as the Gateway.

Cross Site effects

When using the gateway to perform a HTTP pass through the call will most likely be hosted on a different domain. As such a normal HTTP request will be blocked. To work around this a JSONP request is required.

With the OWIN approach full control over the HTTP pipeline is possible and hence can leverage CORS to avoid the need for JSONP.

CORS

For layout and simplicity reasons this sample hosts the client side HTML/JavaScript parts in their own WebApplication. This means CORS need to be enabled on the HTTP server hosted in the endpoint. This is done using another OWIN Middleware from the Katana project called Microsoft.Owin.Cors.

CORS is enabled for all via builder.UseCors(CorsOptions.AllowAll);. Most likely this should be changed any real world usage.

WebServer/OWIN Hosting

This sample uses a Self Hosted instance of Katana to serve up HTTP and provide an OWIN pipeline.

The Endpoint Configuration

The endpoint configuration is fairly standard. The one exception is that the endpoint instance is passed in to the OWIN configuration code.

var endpointConfiguration = new EndpointConfiguration("Samples.OwinPassThrough");
endpointConfiguration.SendFailedMessagesTo("error");
endpointConfiguration.UseSerialization<NewtonsoftSerializer>();
endpointConfiguration.UsePersistence<InMemoryPersistence>();
endpointConfiguration.UseTransport<MsmqTransport>();
endpointConfiguration.EnableInstallers();

var endpointInstance = await Endpoint.Start(endpointConfiguration)
    .ConfigureAwait(false);
using (StartOwinHost(endpointInstance))
{
    Console.WriteLine("Press any key to exit");
    Console.ReadKey();
}
await endpointInstance.Stop()
    .ConfigureAwait(false);

HTTP Hosting and OWIN middleware

A self-hosted instance of Katana is started and then different middleware are injected into the OWIN pipeline. Note that they are wired to map to specific URL suffixes.

  • /to-bus for the Bus based interception
  • /to-msmq for the direct to MSMQ interception
static IDisposable StartOwinHost(IEndpointInstance endpointInstance)
{
    var baseUrl = "http://localhost:12345/";
    var startOptions = new StartOptions(baseUrl)
    {
        ServerFactory = "Microsoft.Owin.Host.HttpListener",
    };

    return WebApp.Start(startOptions, builder =>
    {
        builder.UseCors(CorsOptions.AllowAll);
        MapToBus(builder, endpointInstance);
        MapToMsmq(builder);
    });
}

static void MapToBus(IAppBuilder builder, IEndpointInstance endpointInstance)
{
    var owinToBus = new OwinToBus(endpointInstance);
    builder.Map("/to-bus", app => { app.Use(owinToBus.Middleware()); });
}

static void MapToMsmq(IAppBuilder builder)
{
    var queue = Environment.MachineName + @"\private$\Samples.OwinPassThrough";
    var owinToMsmq = new OwinToMsmq(queue);
    builder.Map("/to-msmq", app => { app.Use(owinToMsmq.Middleware()); });
}

Bus based middleware

The Bus based approach takes the following steps

  • Reads text for the message body from the HTTP request
  • Reads the message type name from the required headers
  • Converts the message type name to a .NET Type
  • Uses Type and message body, in conjunction with Json.net, to deserialize an instance of the real message
  • Places that message on the bus via a SendLocal
using AppFunc = System.Func<System.Collections.Generic.IDictionary<string, object>, System.Threading.Tasks.Task>;

public class OwinToBus
{
    IEndpointInstance endpointInstance;
    Newtonsoft.Json.JsonSerializer serializer;

    public OwinToBus(IEndpointInstance endpointInstance)
    {
        this.endpointInstance = endpointInstance;
        serializer = new Newtonsoft.Json.JsonSerializer();
    }

    public Func<AppFunc, AppFunc> Middleware()
    {
        return _ => Invoke;
    }

    async Task Invoke(IDictionary<string, object> environment)
    {
        var messageBody = await GetMessageBody(environment)
            .ConfigureAwait(false);
        var requestHeaders = (IDictionary<string, string[]>) environment["owin.RequestHeaders"];
        var typeName = requestHeaders["MessageType"].Single();
        var objectType = Type.GetType(typeName);
        var deserialize = Deserialize(messageBody, objectType);
        await endpointInstance.SendLocal(deserialize)
            .ConfigureAwait(false);
        environment["owin.ResponseStatusCode"] = (int)HttpStatusCode.Accepted;
    }

    object Deserialize(string messageBody, Type objectType)
    {
        using (var textReader = new StringReader(messageBody))
        {
            return serializer.Deserialize(textReader, objectType);
        }
    }

    static async Task<string> GetMessageBody(IDictionary<string, object> environment)
    {
        using (var requestStream = (Stream) environment["owin.RequestBody"])
        using (var streamReader = new StreamReader(requestStream))
        {
            return await streamReader.ReadToEndAsync()
                .ConfigureAwait(false);
        }
    }
}

MSMQ based middleware

The MSMQ based approach takes the following steps:

  • Reads text for the message body from the HTTP request.
  • Reads the message type name from the required headers.
  • Uses the message type name to create a MSMQ transport compatible header string.
  • Places that body and header directly onto MSMQ.
using AppFunc = System.Func<System.Collections.Generic.IDictionary<string, object>, System.Threading.Tasks.Task>;

public class OwinToMsmq
{
    string queuePath;

    public OwinToMsmq(string queuePath)
    {
        this.queuePath = queuePath;
    }

    public Func<AppFunc, AppFunc> Middleware()
    {
        return _ => Invoke;
    }

    async Task Invoke(IDictionary<string, object> environment)
    {
        using (var memoryStream = await RequestAsStream(environment)
            .ConfigureAwait(false))
        using (var queue = new MessageQueue(queuePath))
        using (var message = new Message())
        {
            message.BodyStream = memoryStream;
            var requestHeaders = (IDictionary<string, string[]>) environment["owin.RequestHeaders"];
            var messageType = requestHeaders["MessageType"].Single();
            message.Extension = MsmqHeaderSerializer.CreateHeaders(messageType);
            queue.Send(message, MessageQueueTransactionType.Single);
        }
        environment["owin.ResponseStatusCode"] = (int)HttpStatusCode.Accepted;
    }

    static async Task<Stream> RequestAsStream(IDictionary<string, object> environment)
    {
        var memoryStream = new MemoryStream();
        using (var requestStream = (Stream)environment["owin.RequestBody"])
        {
            await requestStream.CopyToAsync(memoryStream)
                .ConfigureAwait(false);
        }
        return memoryStream;
    }
}

Header Helper

A helper method for creating an header string that is compatible with the NServiceBus MSMQ transport:

static class MsmqHeaderSerializer
{
    static XmlSerializer serializer = new XmlSerializer(typeof(List<HeaderInfo>));
    public static byte[] CreateHeaders(string messageType)
    {
        var headerInfos = new List<HeaderInfo>
            {
                new HeaderInfo
                {
                    Key = "NServiceBus.EnclosedMessageTypes",
                    Value = messageType
                }
            };
        using (var stream = new MemoryStream())
        {
            serializer.Serialize(stream, headerInfos);
            return stream.ToArray();
        }
    }
}

Comparing the two approaches

Bus BasedNative MSMQ
Code ComplexitySimple.Slightly more complicated since knowledge of the transport is required.
PerformanceSlightly slower and uses more memory since every message needs to be deserialized before being sent to the Bus.Slightly faster and less memory since no deserialization or translation needs to occur.
Transport compatibilityWorks with any NServiceBus transport.Requires some native code for each transport.
Serialization errorsErrors in deserialization will result in the request failing and an error being returned to the client.Errors in deserialization will occur when the bus attempts to process the message and hence will leverage the built in error handling functionality of NServiceBus.

JavaScript HTTP Post

var request = new XMLHttpRequest();
request.open('POST', 'http://localhost:12345/to-msmq', true);
request.setRequestHeader('MessageType', 'Messages.MyMessage, Messages');
var message = new Object();
message.Property1 = "Property1 Value";
message.Property2 = "Property2 Value";
var jsonString = JSON.stringify(message);
request.send(jsonString);

Samples

Related Articles

  • Gateway
    Durable fire-and-forget messaging across physically separated IT infrastructure.

Last modified