Getting Started
Architecture
NServiceBus
Transports
ServiceInsight
ServicePulse
ServiceControl
Monitoring
Samples

MySql Scripts

Component: Sql Persistence
Target Version: NServiceBus 9.x

Scripts and SQL used when interacting with a MySql database.

Build Time

Scripts are created at build time and are executed as part of a deployment or decommissioning of an endpoint.

Outbox

Create Table

set @tableNameQuoted = concat('`', @tablePrefix, 'OutboxData`');
set @tableNameNonQuoted = concat(@tablePrefix, 'OutboxData');

set @createTable =  concat('
    create table if not exists ', @tableNameQuoted, '(
        MessageId nvarchar(200) not null,
        Dispatched bit not null default 0,
        DispatchedAt datetime,
        PersistenceVersion varchar(23) not null,
        Operations json not null,
        primary key (MessageId)
    ) default charset=ascii;
');
prepare script from @createTable;
execute script;
deallocate prepare script;

select count(*)
into @exist
from information_schema.statistics
where
    table_schema = database() and
    index_name = 'Index_DispatchedAt' and
    table_name = @tableNameNonQuoted;

set @query = IF(
    @exist <= 0,
    concat('create index Index_DispatchedAt on ', @tableNameQuoted, '(DispatchedAt)'), 'select \'Index Exists\' status');

prepare script from @query;
execute script;
deallocate prepare script;

select count(*)
into @exist
from information_schema.statistics
where
    table_schema = database() and
    index_name = 'Index_Dispatched' and
    table_name = @tableNameNonQuoted;

set @query = IF(
    @exist <= 0,
    concat('create index Index_Dispatched on ', @tableNameQuoted, '(Dispatched)'), 'select \'Index Exists\' status');

prepare script from @query;
execute script;
deallocate prepare script;

Drop Table

set @tableName = concat('`', @tablePrefix, 'OutboxData`');

set @dropTable = concat('drop table if exists ', @tableName);
prepare script from @dropTable;
execute script;
deallocate prepare script;

Saga

For a Saga with the following structure

[SqlSaga(transitionalCorrelationProperty: nameof(OrderSagaData.OrderId))]
public class OrderSaga : Saga<OrderSaga.OrderSagaData>,
    IAmStartedByMessages<StartSaga>
{
    protected override void ConfigureHowToFindSaga(SagaPropertyMapper<OrderSagaData> mapper)
    {
        mapper.ConfigureMapping<StartSaga>(msg => msg.OrderNumber).ToSaga(saga => saga.OrderNumber);
    }

    public class OrderSagaData :
        ContainSagaData
    {
        public int OrderNumber { get; set; }
        public Guid OrderId { get; set; }
    }

Create Table

/* TableNameVariable */

set @tableNameQuoted = concat('`', @tablePrefix, 'OrderSaga`');
set @tableNameNonQuoted = concat(@tablePrefix, 'OrderSaga');

/* Initialize */

drop procedure if exists sqlpersistence_raiseerror;
create procedure sqlpersistence_raiseerror(message varchar(256))
begin
signal sqlstate
    'ERROR'
set
    message_text = message,
    mysql_errno = '45000';
end;

/* CreateTable */

set @createTable = concat('
    create table if not exists ', @tableNameQuoted, '(
        Id varchar(38) not null,
        Metadata json not null,
        Data json not null,
        PersistenceVersion varchar(23) not null,
        SagaTypeVersion varchar(23) not null,
        Concurrency int not null,
        primary key (Id)
    ) default charset=ascii;
');
prepare script from @createTable;
execute script;
deallocate prepare script;

/* AddProperty OrderNumber */

select count(*)
into @exist
from information_schema.columns
where table_schema = database() and
      column_name = 'Correlation_OrderNumber' and
      table_name = @tableNameNonQuoted;

set @query = IF(
    @exist <= 0,
    concat('alter table ', @tableNameQuoted, ' add column Correlation_OrderNumber bigint(20)'), 'select \'Column Exists\' status');

prepare script from @query;
execute script;
deallocate prepare script;

/* VerifyColumnType Int */

set @column_type_OrderNumber = (
  select concat(column_type,' character set ', character_set_name)
  from information_schema.columns
  where
    table_schema = database() and
    table_name = @tableNameNonQuoted and
    column_name = 'Correlation_OrderNumber'
);

set @query = IF(
    @column_type_OrderNumber <> 'bigint(20)',
    'call sqlpersistence_raiseerror(concat(\'Incorrect data type for Correlation_OrderNumber. Expected bigint(20) got \', @column_type_OrderNumber, \'.\'));',
    'select \'Column Type OK\' status');

prepare script from @query;
execute script;
deallocate prepare script;

/* WriteCreateIndex OrderNumber */

select count(*)
into @exist
from information_schema.statistics
where
    table_schema = database() and
    index_name = 'Index_Correlation_OrderNumber' and
    table_name = @tableNameNonQuoted;

set @query = IF(
    @exist <= 0,
    concat('create unique index Index_Correlation_OrderNumber on ', @tableNameQuoted, '(Correlation_OrderNumber)'), 'select \'Index Exists\' status');

prepare script from @query;
execute script;
deallocate prepare script;

/* AddProperty OrderId */

select count(*)
into @exist
from information_schema.columns
where table_schema = database() and
      column_name = 'Correlation_OrderId' and
      table_name = @tableNameNonQuoted;

set @query = IF(
    @exist <= 0,
    concat('alter table ', @tableNameQuoted, ' add column Correlation_OrderId varchar(38) character set ascii'), 'select \'Column Exists\' status');

prepare script from @query;
execute script;
deallocate prepare script;

/* VerifyColumnType Guid */

set @column_type_OrderId = (
  select concat(column_type,' character set ', character_set_name)
  from information_schema.columns
  where
    table_schema = database() and
    table_name = @tableNameNonQuoted and
    column_name = 'Correlation_OrderId'
);

set @query = IF(
    @column_type_OrderId <> 'varchar(38) character set ascii',
    'call sqlpersistence_raiseerror(concat(\'Incorrect data type for Correlation_OrderId. Expected varchar(38) character set ascii got \', @column_type_OrderId, \'.\'));',
    'select \'Column Type OK\' status');

prepare script from @query;
execute script;
deallocate prepare script;

/* CreateIndex OrderId */

select count(*)
into @exist
from information_schema.statistics
where
    table_schema = database() and
    index_name = 'Index_Correlation_OrderId' and
    table_name = @tableNameNonQuoted;

set @query = IF(
    @exist <= 0,
    concat('create unique index Index_Correlation_OrderId on ', @tableNameQuoted, '(Correlation_OrderId)'), 'select \'Index Exists\' status');

prepare script from @query;
execute script;
deallocate prepare script;

/* PurgeObsoleteIndex */

select concat('drop index ', index_name, ' on ', @tableNameQuoted, ';')
from information_schema.statistics
where
    table_schema = database() and
    table_name = @tableNameNonQuoted and
    index_name like 'Index_Correlation_%' and
    index_name <> 'Index_Correlation_OrderNumber' and
    index_name <> 'Index_Correlation_OrderId' and
    table_schema = database()
into @dropIndexQuery;
select if (
    @dropIndexQuery is not null,
    @dropIndexQuery,
    'select ''no index to delete'';')
    into @dropIndexQuery;

prepare script from @dropIndexQuery;
execute script;
deallocate prepare script;

/* PurgeObsoleteProperties */

select concat('alter table ', table_name, ' drop column ', column_name, ';')
from information_schema.columns
where
    table_schema = database() and
    table_name = @tableNameNonQuoted and
    column_name like 'Correlation_%' and
    column_name <> 'Correlation_OrderNumber' and
    column_name <> 'Correlation_OrderId'
into @dropPropertiesQuery;

select if (
    @dropPropertiesQuery is not null,
    @dropPropertiesQuery,
    'select ''no property to delete'';')
    into @dropPropertiesQuery;

prepare script from @dropPropertiesQuery;
execute script;
deallocate prepare script;

/* CompleteSagaScript */

Drop Table

/* TableNameVariable */

set @tableNameQuoted = concat('`', @tablePrefix, 'OrderSaga`');
set @tableNameNonQuoted = concat(@tablePrefix, 'OrderSaga');

/* DropTable */

set @dropTable = concat('drop table if exists ', @tableNameQuoted);
prepare script from @dropTable;
execute script;
deallocate prepare script;

Subscription

Create Table

set @tableName = concat('`', @tablePrefix, 'SubscriptionData`');

set @createTable = concat('
    create table if not exists ', @tableName, '(
        Subscriber nvarchar(200) not null,
        Endpoint nvarchar(200),
        MessageType nvarchar(200) not null,
        PersistenceVersion varchar(23) not null,
        primary key clustered (Subscriber, MessageType)
    ) default charset=ascii;
');
prepare script from @createTable;
execute script;
deallocate prepare script;

Drop Table

set @tableName = concat('`', @tablePrefix, 'SubscriptionData`');

set @dropTable = concat('drop table if exists ', @tableName);
prepare script from @dropTable;
execute script;
deallocate prepare script;

Timeout

Create Table

set @tableNameQuoted = concat('`', @tablePrefix, 'TimeoutData`');
set @tableNameNonQuoted = concat(@tablePrefix, 'TimeoutData');

set @createTable = concat('
    create table if not exists ', @tableNameQuoted, '(
        Id varchar(38) not null,
        Destination nvarchar(200),
        SagaId varchar(38),
        State longblob,
        Time datetime,
        Headers json not null,
        PersistenceVersion varchar(23) not null,
        primary key (Id)
    ) default charset=ascii;
');
prepare script from @createTable;
execute script;
deallocate prepare script;

select count(*)
into @exist
from information_schema.statistics
where
    table_schema = database() and
    index_name = 'Index_SagaId' and
    table_name = @tableNameNonQuoted;

set @query = IF(
    @exist <= 0,
    concat('create index Index_SagaId on ', @tableNameQuoted, '(SagaId)'), 'select \'Index Exists\' status');

prepare script from @query;
execute script;
deallocate prepare script;

select count(*)
into @exist
from information_schema.statistics
where
    table_schema = database() and
    index_name = 'Index_Time' and
    table_name = @tableNameNonQuoted;

set @query = IF(
    @exist <= 0,
    concat('create index Index_Time on ', @tableNameQuoted, '(Time)'), 'select \'Index Exists\' status');

prepare script from @query;
execute script;
deallocate prepare script;

Drop Table

set @tableName = concat('`', @tablePrefix, 'TimeoutData`');

set @dropTable = concat('drop table if exists ', @tableName, '');
prepare script from @dropTable;
execute script;
deallocate prepare script;

Run Time

SQL used at runtime to query and update data.

Outbox

Used at intervals to cleanup old outbox records.

delete from `EndpointNameOutboxData`
where Dispatched = true and
      DispatchedAt < @DispatchedBefore
limit @BatchSize

Get

Used by IOutboxStorage.SetAsDispatched.

select
    Dispatched,
    Operations
from `EndpointNameOutboxData`
where MessageId = @MessageId

SetAsDispatched

Used by IOutboxStorage.SetAsDispatched.

update `EndpointNameOutboxData`
set
    Dispatched = 1,
    DispatchedAt = @DispatchedAt,
    Operations = '[]'
where MessageId = @MessageId

Store

Used by IOutboxStorage.Store.

Optimistic (default) mode
insert into `EndpointNameOutboxData`
(
    MessageId,
    Operations,
    PersistenceVersion
)
values
(
    @MessageId,
    @Operations,
    @PersistenceVersion
)
Pessimistic mode
insert into `EndpointNameOutboxData`
(
    MessageId,
    Operations,
    PersistenceVersion
)
values
(
    @MessageId,
    '[]',
    @PersistenceVersion
)
update `EndpointNameOutboxData`
set
    Operations = @Operations
where MessageId = @MessageId

Saga

Complete

Used by ISagaPersister.Complete.

delete from EndpointName_SagaName
where Id = @Id and Concurrency = @Concurrency

Save

Used by ISagaPersister.Save.

insert into EndpointName_SagaName
(
    Id,
    Metadata,
    Data,
    PersistenceVersion,
    SagaTypeVersion,
    Concurrency,
    Correlation_CorrelationProperty,
    Correlation_TransitionalCorrelationProperty
)
values
(
    @Id,
    @Metadata,
    @Data,
    @PersistenceVersion,
    @SagaTypeVersion,
    1,
    @CorrelationId,
    @TransitionalCorrelationId
)

GetByProperty

Used by ISagaPersister.Get(propertyName...).

select
    Id,
    SagaTypeVersion,
    Concurrency,
    Metadata,
    Data
from EndpointName_SagaName
where Correlation_PropertyName = @propertyValue
for update

GetBySagaId

Used by ISagaPersister.Get(sagaId...).

select
    Id,
    SagaTypeVersion,
    Concurrency,
    Metadata,
    Data
from EndpointName_SagaName
where Id = @Id
for update

Update

Used by ISagaPersister.Update.

update EndpointName_SagaName
set
    Data = @Data,
    PersistenceVersion = @PersistenceVersion,
    SagaTypeVersion = @SagaTypeVersion,
    Concurrency = @Concurrency + 1,
    Correlation_TransitionalCorrelationProperty = @TransitionalCorrelationId
where
    Id = @Id and Concurrency = @Concurrency

Select used by Saga Finder

select
    Id,
    SagaTypeVersion,
    Concurrency,
    Metadata,
    Data
from EndpointName_SagaName
where 1 = 1
for update

Subscription

GetSubscribers

Used by ISubscriptionStorage.GetSubscriberAddressesForMessage.

select distinct Subscriber, Endpoint
from `EndpointNameSubscriptionData`
where MessageType in (@type0)

Subscribe

Used by ISubscriptionStorage.Subscribe.

insert into `EndpointNameSubscriptionData`
(
    Subscriber,
    MessageType,
    Endpoint,
    PersistenceVersion
)
values
(
    @Subscriber,
    @MessageType,
    @Endpoint,
    @PersistenceVersion
)
on duplicate key update
    Endpoint = coalesce(@Endpoint, Endpoint),
    PersistenceVersion = @PersistenceVersion

Unsubscribe

Used by ISubscriptionStorage.Unsubscribe.

delete from `EndpointNameSubscriptionData`
where
    Subscriber = @Subscriber and
    MessageType = @MessageType

Timeout

Peek

Used by IPersistTimeouts.Peek.

select
    Destination,
    SagaId,
    State,
    Time,
    Headers
from `EndpointNameTimeoutData`
where Id = @Id

Add

Used by IPersistTimeouts.Add.

insert into `EndpointNameTimeoutData`
(
    Id,
    Destination,
    SagaId,
    State,
    Time,
    Headers,
    PersistenceVersion
)
values
(
    @Id,
    @Destination,
    @SagaId,
    @State,
    @Time,
    @Headers,
    @PersistenceVersion
)

GetNextChunk

Used by IQueryTimeouts.GetNextChunk.

select Time from `EndpointNameTimeoutData`
where Time > @EndTime
order by Time
limit 1
select Id, Time
from `EndpointNameTimeoutData`
where Time > @StartTime and Time <= @EndTime

TryRemove

Used by IPersistTimeouts.TryRemove.

delete from `EndpointNameTimeoutData`
where Id = @Id;

RemoveTimeoutBy

Used by IPersistTimeouts.RemoveTimeoutBy.

delete from `EndpointNameTimeoutData`
where SagaId = @SagaId

Related Articles