Getting Started
Architecture
NServiceBus
Transports
ServiceInsight
ServicePulse
ServiceControl
Monitoring
Samples

MS SQL Server Scripts

Component: Sql Persistence
Target Version: NServiceBus 7.x

Scripts and SQL used when interacting with a SQL Server database.

Build Time

Scripts are created at build time and are executed as part of a deployment or decommissioning of an endpoint.

Outbox

Create Table

declare @tableName nvarchar(max) = '[' + @schema + '].[' + @tablePrefix + 'OutboxData]';

if not exists (
    select * from sys.objects
    where
        object_id = object_id(@tableName)
        and type in ('U')
)
begin
declare @createTable nvarchar(max);
set @createTable = '
    create table ' + @tableName + '(
        MessageId nvarchar(200) not null primary key nonclustered,
        Dispatched bit not null default 0,
        DispatchedAt datetime,
        PersistenceVersion varchar(23) not null,
        Operations nvarchar(max) not null
    )
';
exec(@createTable);
end

if not exists
(
    select *
    from sys.indexes
    where
        name = 'Index_DispatchedAt' and
        object_id = object_id(@tableName)
)
begin
  declare @createDispatchedAtIndex nvarchar(max);
  set @createDispatchedAtIndex = '
  create index Index_DispatchedAt
  on ' + @tableName + '(DispatchedAt) where Dispatched = 1;';
  exec(@createDispatchedAtIndex);
end

Drop Table

declare @tableName nvarchar(max) = '[' + @schema + '].[' + @tablePrefix + 'OutboxData]';

if exists
(
    select *
    from sys.objects
    where
        object_id = object_id(@tableName) and
        type in ('U')
)
begin
declare @dropTable nvarchar(max);
set @dropTable = 'drop table ' + @tableName;
exec(@dropTable);
end

Saga

For a Saga with the following structure

[SqlSaga(transitionalCorrelationProperty: nameof(OrderSagaData.OrderId))]
public class OrderSaga : Saga<OrderSaga.OrderSagaData>,
    IAmStartedByMessages<StartSaga>
{
    protected override void ConfigureHowToFindSaga(SagaPropertyMapper<OrderSagaData> mapper)
    {
        mapper.ConfigureMapping<StartSaga>(msg => msg.OrderNumber).ToSaga(saga => saga.OrderNumber);
    }

    public class OrderSagaData :
        ContainSagaData
    {
        public int OrderNumber { get; set; }
        public Guid OrderId { get; set; }
    }

Create Table

/* TableNameVariable */

declare @tableName nvarchar(max) = '[' + @schema + '].[' + @tablePrefix + N'OrderSaga]';
declare @tableNameWithoutSchema nvarchar(max) = @tablePrefix + N'OrderSaga';

/* Initialize */

/* CreateTable */

if not exists
(
    select *
    from sys.objects
    where
        object_id = object_id(@tableName) and
        type in ('U')
)
begin
declare @createTable nvarchar(max);
set @createTable = '
    create table ' + @tableName + '(
        Id uniqueidentifier not null primary key,
        Metadata nvarchar(max) not null,
        Data nvarchar(max) not null,
        PersistenceVersion varchar(23) not null,
        SagaTypeVersion varchar(23) not null,
        Concurrency int not null
    )
';
exec(@createTable);
end

/* AddProperty OrderNumber */

if not exists
(
  select * from sys.columns
  where
    name = N'Correlation_OrderNumber' and
    object_id = object_id(@tableName)
)
begin
  declare @createColumn_OrderNumber nvarchar(max);
  set @createColumn_OrderNumber = '
  alter table ' + @tableName + N'
    add Correlation_OrderNumber bigint;';
  exec(@createColumn_OrderNumber);
end

/* VerifyColumnType Int */

declare @dataType_OrderNumber nvarchar(max);
set @dataType_OrderNumber = (
  select data_type
  from INFORMATION_SCHEMA.COLUMNS
  where
    table_name = @tableNameWithoutSchema and
    table_schema = @schema and
    column_name = 'Correlation_OrderNumber'
);
if (@dataType_OrderNumber <> 'bigint')
  begin
    declare @error_OrderNumber nvarchar(max) = N'Incorrect data type for Correlation_OrderNumber. Expected bigint got ' + @dataType_OrderNumber + '.';
    throw 50000, @error_OrderNumber, 0
  end

/* WriteCreateIndex OrderNumber */

if not exists
(
    select *
    from sys.indexes
    where
        name = N'Index_Correlation_OrderNumber' and
        object_id = object_id(@tableName)
)
begin
  declare @createIndex_OrderNumber nvarchar(max);
  set @createIndex_OrderNumber = N'
  create unique index Index_Correlation_OrderNumber
  on ' + @tableName + N'(Correlation_OrderNumber)
  where Correlation_OrderNumber is not null;';
  exec(@createIndex_OrderNumber);
end

/* AddProperty OrderId */

if not exists
(
  select * from sys.columns
  where
    name = N'Correlation_OrderId' and
    object_id = object_id(@tableName)
)
begin
  declare @createColumn_OrderId nvarchar(max);
  set @createColumn_OrderId = '
  alter table ' + @tableName + N'
    add Correlation_OrderId uniqueidentifier;';
  exec(@createColumn_OrderId);
end

/* VerifyColumnType Guid */

declare @dataType_OrderId nvarchar(max);
set @dataType_OrderId = (
  select data_type
  from INFORMATION_SCHEMA.COLUMNS
  where
    table_name = @tableNameWithoutSchema and
    table_schema = @schema and
    column_name = 'Correlation_OrderId'
);
if (@dataType_OrderId <> 'uniqueidentifier')
  begin
    declare @error_OrderId nvarchar(max) = N'Incorrect data type for Correlation_OrderId. Expected uniqueidentifier got ' + @dataType_OrderId + '.';
    throw 50000, @error_OrderId, 0
  end

/* CreateIndex OrderId */

if not exists
(
    select *
    from sys.indexes
    where
        name = N'Index_Correlation_OrderId' and
        object_id = object_id(@tableName)
)
begin
  declare @createIndex_OrderId nvarchar(max);
  set @createIndex_OrderId = N'
  create unique index Index_Correlation_OrderId
  on ' + @tableName + N'(Correlation_OrderId)
  where Correlation_OrderId is not null;';
  exec(@createIndex_OrderId);
end

/* PurgeObsoleteIndex */

declare @dropIndexQuery nvarchar(max);
select @dropIndexQuery =
(
    select 'drop index ' + name + ' on ' + @tableName + ';'
    from sysindexes
    where
        Id = object_id(@tableName) and
        Name is not null and
        Name like 'Index_Correlation_%' and
        Name <> N'Index_Correlation_OrderNumber' and
        Name <> N'Index_Correlation_OrderId'
);
exec sp_executesql @dropIndexQuery

/* PurgeObsoleteProperties */

declare @dropPropertiesQuery nvarchar(max);
select @dropPropertiesQuery =
(
    select 'alter table ' + @tableName + ' drop column ' + column_name + ';'
    from INFORMATION_SCHEMA.COLUMNS
    where
        table_name = @tableNameWithoutSchema and
        table_schema = @schema and
        column_name like 'Correlation_%' and
        column_name <> N'Correlation_OrderNumber' and
        column_name <> N'Correlation_OrderId'
);
exec sp_executesql @dropPropertiesQuery

/* CompleteSagaScript */

Drop Table

/* TableNameVariable */

declare @tableName nvarchar(max) = '[' + @schema + '].[' + @tablePrefix + N'OrderSaga]';
declare @tableNameWithoutSchema nvarchar(max) = @tablePrefix + N'OrderSaga';

/* DropTable */

if exists
(
    select *
    from sys.objects
    where
        object_id = object_id(@tableName)
        and type in ('U')
)
begin
    declare @dropTable nvarchar(max);
    set @dropTable = 'drop table ' + @tableName;
    exec(@dropTable);
end

Subscription

Create Table

declare @tableName nvarchar(max) = '[' + @schema + '].[' + @tablePrefix + 'SubscriptionData]';

if not exists
(
    select *
    from sys.objects
    where
        object_id = object_id(@tableName) and
        type in ('U')
)
begin
declare @createTable nvarchar(max);
set @createTable = '
    create table ' + @tableName + '(
        Subscriber nvarchar(200) not null,
        Endpoint nvarchar(200),
        MessageType nvarchar(200) not null,
        PersistenceVersion varchar(23) not null,
        primary key clustered
        (
            Subscriber,
            MessageType
        )
    )
';
exec(@createTable);
end

Drop Table

declare @tableName nvarchar(max) = '[' + @schema + '].[' + @tablePrefix + 'SubscriptionData]';

if exists
(
    select *
    from sys.objects
    where
        object_id = object_id(@tableName) and
        type in ('U')
)
begin
declare @dropTable nvarchar(max);
set @dropTable = 'drop table ' + @tableName;
exec(@dropTable);
end

Timeout

Create Table

declare @tableName nvarchar(max) = '[' + @schema + '].[' + @tablePrefix + 'TimeoutData]';

if not exists (
    select * from sys.objects
    where
        object_id = object_id(@tableName)
        and type in ('U')
)
begin
declare @createTable nvarchar(max);
set @createTable = '
    create table ' + @tableName + '(
        Id uniqueidentifier not null primary key,
        Destination nvarchar(200),
        SagaId uniqueidentifier,
        State varbinary(max),
        Time datetime,
        Headers nvarchar(max) not null,
        PersistenceVersion varchar(23) not null
    )
';
exec(@createTable);
end

if not exists
(
    select *
    from sys.indexes
    where
        name = 'Index_SagaId' and
        object_id = object_id(@tableName)
)
begin
  declare @createSagaIdIndex nvarchar(max);
  set @createSagaIdIndex = '
  create index Index_SagaId
  on ' + @tableName + '(SagaId);';
  exec(@createSagaIdIndex);
end

if not exists
(
    select *
    from sys.indexes
    where
        name = 'Index_Time' and
        object_id = object_id(@tableName)
)
begin
  declare @createTimeIndex nvarchar(max);
  set @createTimeIndex = '
  create index Index_Time
  on ' + @tableName + '(Time);';
  exec(@createTimeIndex);
end

Drop Table

declare @tableName nvarchar(max) = '[' + @schema + '].[' + @tablePrefix + 'TimeoutData]';

if exists
(
    select *
    from sys.objects
    where
        object_id = object_id(@tableName) and
        type in ('U')
)
begin
declare @dropTable nvarchar(max);
set @dropTable = 'drop table ' + @tableName;
exec(@dropTable);
end

Run Time

SQL used at runtime to query and update data.

Outbox

Used at intervals to cleanup old outbox records.

delete top (@BatchSize) from [dbo].[EndpointNameOutboxData]
where Dispatched = 'true' and
      DispatchedAt < @DispatchedBefore

Get

Used by IOutboxStorage.SetAsDispatched.

select
    Dispatched,
    Operations
from [dbo].[EndpointNameOutboxData]
where MessageId = @MessageId

SetAsDispatched

Used by IOutboxStorage.SetAsDispatched.

update [dbo].[EndpointNameOutboxData]
set
    Dispatched = 1,
    DispatchedAt = @DispatchedAt,
    Operations = '[]'
where MessageId = @MessageId

Store

Used by IOutboxStorage.Store.

insert into [dbo].[EndpointNameOutboxData]
(
    MessageId,
    Operations,
    PersistenceVersion
)
values
(
    @MessageId,
    @Operations,
    @PersistenceVersion
)

Saga

Complete

Used by ISagaPersister.Complete.

delete from EndpointName_SagaName
where Id = @Id and Concurrency = @Concurrency

Save

Used by ISagaPersister.Save.

insert into EndpointName_SagaName
(
    Id,
    Metadata,
    Data,
    PersistenceVersion,
    SagaTypeVersion,
    Concurrency,
    Correlation_CorrelationProperty,
    Correlation_TransitionalCorrelationProperty
)
values
(
    @Id,
    @Metadata,
    @Data,
    @PersistenceVersion,
    @SagaTypeVersion,
    1,
    @CorrelationId,
    @TransitionalCorrelationId
)

GetByProperty

Used by ISagaPersister.Get(propertyName...).

select
    Id,
    SagaTypeVersion,
    Concurrency,
    Metadata,
    Data
from EndpointName_SagaName
with (updlock)
where Correlation_PropertyName = @propertyValue

GetBySagaId

Used by ISagaPersister.Get(sagaId...).

select
    Id,
    SagaTypeVersion,
    Concurrency,
    Metadata,
    Data
from EndpointName_SagaName
with (updlock)
where Id = @Id

Update

Used by ISagaPersister.Update.

update EndpointName_SagaName
set
    Data = @Data,
    PersistenceVersion = @PersistenceVersion,
    SagaTypeVersion = @SagaTypeVersion,
    Concurrency = @Concurrency + 1,
    Correlation_TransitionalCorrelationProperty = @TransitionalCorrelationId
where
    Id = @Id and Concurrency = @Concurrency

Select used by Saga Finder

select
    Id,
    SagaTypeVersion,
    Concurrency,
    Metadata,
    Data
from EndpointName_SagaName
with (updlock)
where 1 = 1

Subscription

GetSubscribers

Used by ISubscriptionStorage.GetSubscriberAddressesForMessage.

select distinct Subscriber, Endpoint
from [dbo].[EndpointNameSubscriptionData]
where MessageType in (@type0)

Subscribe

Used by ISubscriptionStorage.Subscribe.

declare @dummy int;
merge [dbo].[EndpointNameSubscriptionData] with (holdlock, tablock) as target
using(select @Endpoint as Endpoint, @Subscriber as Subscriber, @MessageType as MessageType) as source
on target.Subscriber = source.Subscriber 
    and target.MessageType = source.MessageType
when matched and source.Endpoint is not null and (target.Endpoint is null or target.Endpoint <> source.Endpoint) then
update set Endpoint = @Endpoint, PersistenceVersion = @PersistenceVersion
when not matched then
insert
(
    Subscriber,
    MessageType,
    Endpoint,
    PersistenceVersion
)
values
(
    @Subscriber,
    @MessageType,
    @Endpoint,
    @PersistenceVersion
);

Unsubscribe

Used by ISubscriptionStorage.Unsubscribe.

delete from [dbo].[EndpointNameSubscriptionData]
where
    Subscriber = @Subscriber and
    MessageType = @MessageType

Timeout

Peek

Used by IPersistTimeouts.Peek.

select
    Destination,
    SagaId,
    State,
    Time,
    Headers
from [dbo].[EndpointNameTimeoutData]
where Id = @Id

Add

Used by IPersistTimeouts.Add.

insert into [dbo].[EndpointNameTimeoutData]
(
    Id,
    Destination,
    SagaId,
    State,
    Time,
    Headers,
    PersistenceVersion
)
values
(
    @Id,
    @Destination,
    @SagaId,
    @State,
    @Time,
    @Headers,
    @PersistenceVersion
)

GetNextChunk

Used by IQueryTimeouts.GetNextChunk.

select top 1 Time from [dbo].[EndpointNameTimeoutData]
where Time > @EndTime
order by Time
select Id, Time
from [dbo].[EndpointNameTimeoutData]
where Time > @StartTime and Time <= @EndTime

TryRemove

Used by IPersistTimeouts.TryRemove.

delete from [dbo].[EndpointNameTimeoutData]
where Id = @Id

RemoveTimeoutBy

Used by IPersistTimeouts.RemoveTimeoutBy.

delete from [dbo].[EndpointNameTimeoutData]
where SagaId = @SagaId

Related Articles