Scheduling with NServiceBus

The NServiceBus Scheduler is a lightweight/non-durable API that helps to schedule a task that needs to be executed repeatedly based on a specified interval. The scheduling infrastructure leverages the reliable messaging approach and the NServiceBus core functionality. This allows scheduling to include features such as built in retries and forwarding to the error queue.

How the scheduler works

The scheduler holds a list of tasks scheduled in a non-durable in-memory dictionary. In version 3 and 4 tasks are scoped per AppDomain. In version 5 they are scoped per Bus instance.

When a new scheduled task is created it is given a unique identifier and stored in the endpoint's in-memory dictionary. The identifier for the task is sent in a message to the Timeout Manager for the message to be deferred for the specified time interval. When the specified time elapses, the Timeouts dispatcher returns the message containing the identifier to the endpoint with the scheduled task identifier, the endpoint uses that identifier to fetch and invoke the task from its internal list of tasks and executes them.

Example usage

The difference between these examples is that in the latter a name is given for the task. The name is used for logging.

// 'Schedule' is an instance class that can be resolved from the container.
// To send a message every 5 minutes
schedule.Every(TimeSpan.FromMinutes(5), () => bus.Send(new CallLegacySystem()));

// Name a schedule task and invoke it every 5 minutes
schedule.Every(TimeSpan.FromMinutes(5), "MyCustomTask", SomeCustomMethod);
// 'Schedule' is a static class that can be accessed anywhere. 
// To send a message every 5 minutes
Schedule.Every(TimeSpan.FromMinutes(5))
    .Action(() => bus.Send(new CallLegacySystem()));

// Name a schedule task and invoke it every 5 minutes
Schedule.Every(TimeSpan.FromMinutes(5))
    .Action("MyCustomTask", SomeCustomMethod);
// 'Schedule' is a static class that can be accessed anywhere. 
// To send a message every 5 minutes
Schedule.Every(TimeSpan.FromMinutes(5))
    .Action(() => bus.Send(new CallLegacySystem()));

// Name a schedule task and invoke it every 5 minutes
Schedule.Every(TimeSpan.FromMinutes(5))
    .Action("MyCustomTask", SomeCustomMethod);

When not to use it

  • As soon as your task starts to get some branching logic (if or switch statements) or you start to add business logic, instead of a simple Bus.Send() or a Bus.SendLocal(), you should consider moving to a saga .

  • Often times, rather than poll for a certain state using the Scheduler API, you can use an eventing model instead, where an event is published when that state transition occurs and take the necessary action in that event message handler.

  • When you have requirements that are not currently supported by the Scheduler API. For example, scaling out the tasks, canceling or deleting a scheduled task, doing a side-by-side deployment of a scheduled task as outlined by the following section.

Current Limitations

  • Since the scheduler is non-durable, if the process restarts, all scheduled tasks (that are created during the endpoint's startup) are recreated and given new identifiers. Tasks scheduled before the restart that arrive at the endpoint queue will not be found and a message is written to the log as information.
  • Already created scheduled tasks cannot be canceled or changed.
  • While the task repeat interval can be specified, does not support a task to be started at a specific time.
  • Each scheduled task will be executed in a new Task generated by the Task Parallel Library using Task.Factory.StartNew(Action), which means that there will be no transaction scope by default and it is up to you to create one if needed;
  • Since the scheduler uses the queuing mechanism, it does have some side effects on the timeliness of scheduled tasks. When a task is scheduled to be run at a given time it is not "executed at that time", it is instead "queued at that time" to be executed. In most cases this distinction will have no noticeable effect on the behavior of the scheduling API. However in high load systems the fact that a scheduled task is added to the back of the queue can result in a noticeable delay between the "time the task has been request to be run" and the "time the task is actually executed".
  • The scheduler API does not support scaling out the endpoint or doing a side-by-side deployment of an endpoint. When you have multiple instances of the endpoint that's running on the same machine when using a non-broker transport such as MSMQ, or scaling out the endpoint instances when using a broker transport such as RabbitMQ, these endpoint instances share the same input queue. Since each endpoint maintains its own created tasks in memory, when the specified time is up and the task is queued at the endpoint, any of the endpoint instance that is currently running can dequeue that message. If an endpoint that did not originally create this task happened to dequeue this message in order to be execute it, it will not find the task in its list.
    This will result in the task to not get executed and the task will also not be rescheduled again.

Converting a scheduled task into a saga

To store these tasks durably in order to support either a scale-out scenario or a side-by-side deployment scenario, you can use a saga instead. You can think of the scheduler as a never ending saga.

Using the same example usage of the scheduler API shown above, in order to convert it to a saga:

  1. Create a saga for the specified task name that is started by a message. Create a new message if necessary. In this example, the message that starts the saga is StartMyCustomTaskSaga. Send this message on startup, using Bus.SendLocal(new StartMyCustomTaskSaga{TaskName = "StartupTask1"});. If you did not use the API with the task name, name your saga appropriately. In this example, the saga is called MyCustomTaskSaga.
  2. Create the required saga data and make the task name as a unique identifier for the saga.
  3. Setup the mapping for the saga. This is done so that there will only be one instance of the saga for the specified task name, to avoid duplicate tasks to be scheduled if the endpoint is restarted.
  4. In the Handle() method of your startup message, request a timeout. This would be the interval that was specified in the Schedule API. Request the timeout only if it hasn't been requested already to avoid duplicate tasks that can be scheduled if the endpoint is restarted.
  5. Create a timeout class and add the IHandleTimeouts<T> in the saga definition.
  6. Define the action that needs to occur in the Timeout method.

In the following example, the endpoint upon startup will send itself a StartMyCustomSaga message to initiate the saga. The saga will request a timeout for 5 minutes if the task hasn't already been scheduled and in the timeout handler will send the CallLegacySystem message that will execute some task and also request another timeout for the specified interval.

class MyCustomTaskSaga : Saga<MyCustomTaskSagaData>,
   IAmStartedByMessages<StartMyCustomTaskSaga>, // Saga is started by a message at endpoint startup
   IHandleTimeouts<ExecuteTask> // task that gets executed when the scheduled time is up.
{
    protected override void ConfigureHowToFindSaga(SagaPropertyMapper<MyCustomTaskSagaData> mapper)
    {
        // To ensure that there is only one saga instance per the task name, 
        // regardless of if the endpoint is restarted or not.
        mapper.ConfigureMapping<StartMyCustomTaskSaga>(m => m.TaskName).ToSaga(s => s.TaskName);
    }

    public void Handle(StartMyCustomTaskSaga message)
    {
        Data.TaskName = message.TaskName;
        // Check to avoid that if the saga is already started, we don't initiate any more tasks 
        // as those timeout messages will arrive when the specified time is up.
        if (!Data.IsTaskAlreadyScheduled)
        {
            // Setup a timeout for the specified interval for the task to be executed.
            RequestTimeout<ExecuteTask>(TimeSpan.FromMinutes(5)); 
            Data.IsTaskAlreadyScheduled = true;
        }
    }

    public void Timeout(ExecuteTask state)
    {
        // Action that gets executed when the specified time is up
        Bus.Send(new CallLegacySystem());
        // Reschedule the task
        RequestTimeout<ExecuteTask>(TimeSpan.FromMinutes(5));
    }
}

// Associated saga data
public class MyCustomTaskSagaData : ContainSagaData
{
    [Unique]
    public string TaskName { get; set; }
    public bool IsTaskAlreadyScheduled { get; set; }
}

// Message that starts the saga
public class StartMyCustomTaskSaga : ICommand
{
    public string TaskName { get; set; }
}

// timeout class
class ExecuteTask
{
}
class MyCustomTaskSaga : Saga<MyCustomTaskSagaData>,
    IAmStartedByMessages<StartMyCustomTaskSaga>, // Saga is started by a message at endpoint startup
    IHandleTimeouts<ExecuteTask> // task that gets executed when the scheduled time is up.
{
    public override void ConfigureHowToFindSaga()
    {
        // To ensure that there is only one saga instance per the task name, regardless of if the endpoint is restarted or not.
        ConfigureMapping<StartMyCustomTaskSaga>(m => m.TaskName).ToSaga(s => s.TaskName);
    }

    public void Handle(StartMyCustomTaskSaga message)
    {
        Data.TaskName = message.TaskName;
        // Check to avoid that if the saga is already started, we don't initiate any more tasks 
        // as those timeout messages will arrive when the specified time is up.
        if (!Data.IsTaskAlreadyScheduled)
        {
            // Setup a timeout for the specified interval for the task to be executed.
            RequestTimeout<ExecuteTask>(TimeSpan.FromMinutes(5));
            Data.IsTaskAlreadyScheduled = true;
        }
    }

    public void Timeout(ExecuteTask state)
    {
        // Action that gets executed when the specified time is up
        Bus.Send(new CallLegacySystem());
        // Reschedule the task
        RequestTimeout<ExecuteTask>(TimeSpan.FromMinutes(5));
    }
}

// Associated saga data
public class MyCustomTaskSagaData : ContainSagaData
{
    [Unique]
    public string TaskName { get; set; }
    public bool IsTaskAlreadyScheduled { get; set; }
}

// Message that starts the saga
public class StartMyCustomTaskSaga : ICommand
{
    public string TaskName { get; set; }
}

// timeout class
class ExecuteTask
{
}
class MyCustomTaskSaga : Saga<MyCustomTaskSagaData>,
    IAmStartedByMessages<StartMyCustomTaskSaga>, // Saga is started by a message at endpoint startup
    IHandleTimeouts<ExecuteTask> // task that gets executed when the scheduled time is up.
{
    public override void ConfigureHowToFindSaga()
    {
        // To ensure that there is only one saga instance per the task name, regardless of if the endpoint is restarted or not.
        ConfigureMapping<StartMyCustomTaskSaga>(s => s.TaskName, m => m.TaskName);
    }

    public void Handle(StartMyCustomTaskSaga message)
    {
        Data.TaskName = message.TaskName;
        // Check to avoid that if the saga is already started, we don't initiate any more tasks 
        // as those timeout messages will arrive when the specified time is up.
        if (!Data.IsTaskAlreadyScheduled)
        {
            // Setup a timeout for the specified interval for the task to be executed.
            RequestUtcTimeout<ExecuteTask>(TimeSpan.FromMinutes(5));
            Data.IsTaskAlreadyScheduled = true;
        }
    }

    public void Timeout(ExecuteTask state)
    {
        // Action that gets executed when the specified time is up
        Bus.Send(new CallLegacySystem());
        // Reschedule the task
        RequestUtcTimeout<ExecuteTask>(TimeSpan.FromMinutes(5));
    }
}

// Associated saga data
public class MyCustomTaskSagaData : IContainSagaData
{
    [Unique]
    public string TaskName { get; set; }
    public bool IsTaskAlreadyScheduled { get; set; }

    public Guid Id { get; set; }
    public string OriginalMessageId { get; set; }
    public string Originator { get; set; }
}

// Message that starts the saga
public class StartMyCustomTaskSaga : ICommand
{
    public string TaskName { get; set; }
}

// timeout class
class ExecuteTask
{
}
If you need to cancel a scheduled task, create a new message, e.g. CancelMyCustomSaga and modify the above saga to also handle this message, which will end the saga using MarkAsComplete() method.


Last modified 2015-02-26 23:13:26Z