Customization with MEF or Reflection

Component: NServiceBus
NuGet Package NServiceBus (7-pre)
This page targets a pre-release version and is subject to change prior to the final release.

Shared

The Shared project defines all the messages used in the sample.

It also contains the custom extension point definitions that both the endpoints and the implementations use to communicate. For example the extension point for running code after the endpoint has started is:

public interface IRunAfterEndpointStart
{
    Task Run(IEndpointInstance endpoint);
}

The complete list of extension points in this solution are

  • ICustomizeConfiguration
  • IRunBeforeEndpointStart
  • IRunAfterEndpointStart
  • IRunBeforeEndpointStop
  • IRunAfterEndpointStop

Approaches to executing extension points

Both approaches have a similar parts.

  • An endpoint project that starts the endpoint and loads + executes the specific extension points in the Shared project.
  • An extensions project that contains the implementations for the extension points in the Shared project.

Custom Reflection

This approach uses directory scanning and Reflection to discover and execute the extension points.

Endpoint Startup

static async Task AsyncMain()
{
    Console.Title = "Samples.CustomExtensionEndpoint";
    var endpointConfiguration = new EndpointConfiguration("Samples.CustomExtensionEndpoint");
    endpointConfiguration.UsePersistence<LearningPersistence>();
    endpointConfiguration.UseTransport<LearningTransport>();
    await RunCustomizeConfiguration(endpointConfiguration)
        .ConfigureAwait(false);
    await RunBeforeEndpointStart()
        .ConfigureAwait(false);
    var endpointInstance = await Endpoint.Start(endpointConfiguration)
        .ConfigureAwait(false);
    await RunAfterEndpointStart(endpointInstance)
        .ConfigureAwait(false);
    Console.WriteLine("Press any key to exit");
    Console.ReadKey();
    await RunBeforeEndpointStop( endpointInstance)
        .ConfigureAwait(false);
    await endpointInstance.Stop()
        .ConfigureAwait(false);
    await RunAfterEndpointStop()
        .ConfigureAwait(false);
}

static Task RunBeforeEndpointStart()
{
   return Resolver.Execute<IRunBeforeEndpointStart>(_ => _.Run());
}

// Other injection points excluded, but follow the same pattern as above

Helpers

Some common scanning and reflection helpers.

public static class Resolver
{
    static List<Assembly> assemblies;

    static Resolver()
    {
        var codebase = typeof(Resolver).Assembly.CodeBase.Remove(0, 8);
        var currentDirectory = Path.GetDirectoryName(codebase);
        assemblies = Directory.GetFiles(currentDirectory, "*.dll")
            .Select(Assembly.LoadFrom)
            .Where(ReferencesShared)
            .ToList();
    }

    static bool ReferencesShared(Assembly assembly)
    {
        var sharedAssembly = typeof(ICustomizeConfiguration).Assembly.GetName().Name;
        return assembly.GetReferencedAssemblies()
            .Any(name => name.Name == sharedAssembly);
    }

    public static async Task Execute<T>(Func<T, Task> action)
    {
        foreach (var assembly in assemblies)
        {
            foreach (var type in assembly.GetImplementationTypes<T>())
            {
                var instance = (T)Activator.CreateInstance(type);
                await action(instance)
                    .ConfigureAwait(false);
            }
        }
    }

    static IEnumerable<Type> GetImplementationTypes<TInterface>(this Assembly assembly)
    {
        return assembly.GetTypes().Where(IsConcreteClass<TInterface>);
    }

    static bool IsConcreteClass<TInterface>(Type type)
    {
        return typeof(TInterface).IsAssignableFrom(type) &&
               !type.IsAbstract &&
               !type.ContainsGenericParameters &&
               type.IsClass;
    }
}

Example extension implementations

public class CustomizeConfiguration :
    ICustomizeConfiguration
{
    static ILog log = LogManager.GetLogger<CustomizeConfiguration>();
    public Task Run(EndpointConfiguration endpointConfiguration)
    {
        log.Info("Setting serializer to XML in an extension");
        endpointConfiguration.UseSerialization<XmlSerializer>();
        return Task.CompletedTask;
    }
}
public class RunAfterEndpointStart :
    IRunAfterEndpointStart
{
    static ILog log = LogManager.GetLogger<RunAfterEndpointStart>();
    public Task Run(IEndpointInstance endpoint)
    {
        log.Info("Endpoint Started.");
        return Task.CompletedTask;
    }
}
public class SendMessageAfterEndpointStart :
    IRunAfterEndpointStart
{
    static ILog log = LogManager.GetLogger<SendMessageAfterEndpointStart>();
    public Task Run(IEndpointInstance endpoint)
    {
        log.Info("Sending Message.");
        var myMessage = new MyMessage();
        return endpoint.SendLocal(myMessage);
    }
}

Managed Extensibility Framework (MEF)

This approach uses MEF to discover and execute the extension points.

Endpoint Startup

static async Task AsyncMain()
{
    Console.Title = "Samples.MefExtensionEndpoint";

    var containerConfiguration = new ContainerConfiguration();
    var location = Path.GetDirectoryName(Assembly.GetExecutingAssembly().Location);
    var assemblies = Directory.EnumerateFiles(location, "*.dll")
        .Select(Assembly.LoadFrom);
    containerConfiguration.WithAssemblies(assemblies);
    var compositionHost = containerConfiguration.CreateContainer();

    var endpointConfiguration = new EndpointConfiguration("Samples.MefExtensionEndpoint");
    endpointConfiguration.UsePersistence<LearningPersistence>();
    endpointConfiguration.UseTransport<LearningTransport>();
    await RunCustomizeConfiguration(compositionHost, endpointConfiguration)
        .ConfigureAwait(false);
    await RunBeforeEndpointStart(compositionHost)
        .ConfigureAwait(false);
    var endpointInstance = await Endpoint.Start(endpointConfiguration)
        .ConfigureAwait(false);
    await RunAfterEndpointStart(compositionHost, endpointInstance)
        .ConfigureAwait(false);
    Console.WriteLine("Press any key to exit");
    Console.ReadKey();
    await RunBeforeEndpointStop(compositionHost, endpointInstance)
        .ConfigureAwait(false);
    await endpointInstance.Stop()
        .ConfigureAwait(false);
    await RunAfterEndpointStop(compositionHost)
        .ConfigureAwait(false);
}

static Task RunBeforeEndpointStart(CompositionHost compositionHost)
{
    return compositionHost.ExecuteExports<IRunBeforeEndpointStart>(_ => _.Run());
}

// Other injection points excluded, but follow the same pattern as above

Helpers

Some common MEF helpers.

public static class MefExtensions
{
    public static async Task ExecuteExports<T>(this CompositionHost compositionHost, Func<T, Task> action)
    {
        foreach (var export in compositionHost.GetExports<T>())
        {
            await action(export)
                .ConfigureAwait(false);
        }
    }
}

Example extension implementations

[Export(typeof(ICustomizeConfiguration))]
public class CustomizeConfiguration :
    ICustomizeConfiguration
{
    static ILog log = LogManager.GetLogger<CustomizeConfiguration>();

    public Task Run(EndpointConfiguration endpointConfiguration)
    {
        log.Info("Setting serializer to Xml in an extension");
        endpointConfiguration.UseSerialization<XmlSerializer>();
        return Task.CompletedTask;
    }
}
[Export(typeof(IRunAfterEndpointStart))]
public class RunAfterEndpointStart :
    IRunAfterEndpointStart
{
    static ILog log = LogManager.GetLogger<RunAfterEndpointStart>();
    public Task Run(IEndpointInstance endpoint)
    {
        log.Info("Endpoint Started.");
        return Task.CompletedTask;
    }
}
[Export(typeof(IRunAfterEndpointStart))]
public class SendMessageAfterEndpointStart :
    IRunAfterEndpointStart
{
    static ILog log = LogManager.GetLogger<SendMessageAfterEndpointStart>();
    public Task Run(IEndpointInstance endpoint)
    {
        log.Info("Sending Message.");
        var myMessage = new MyMessage();
        return endpoint.SendLocal(myMessage);
    }
}

Other notes

Using an IOC container

Both the above approaches could be made more extensible and versatile by leveraging a container. In the case of MEF most containers have custom integrations, for example Autofac.Mef. For the custom reflection approach the standard reflection based APIs of a given container would be used.

Adding more extensions points

More extension points could be defined based on requirements. The extensions could be plugged into part of the NServiceBus lifecycle or the pipeline. Also more context could be passed, as parameters, to any give extension point.

Related Articles


Last modified