Scripts and SQL used when interacting with a MySql database.
Build Time
Scripts are created at build time and are executed as part of a deployment or decommissioning of an endpoint.
Outbox
Create Table
set @tableNameQuoted = concat('`', @tablePrefix, 'OutboxData`');
set @tableNameNonQuoted = concat(@tablePrefix, 'OutboxData');
set @createTable =  concat('
    create table if not exists ', @tableNameQuoted, '(
        MessageId nvarchar(200) not null,
        Dispatched bit not null default 0,
        DispatchedAt datetime,
        PersistenceVersion varchar(23) not null,
        Operations json not null,
        primary key (MessageId)
    ) default charset=ascii;
');
prepare script from @createTable;
execute script;
deallocate prepare script;
select count(*)
into @exist
from information_schema.statistics
where
    table_schema = database() and
    index_name = 'Index_DispatchedAt' and
    table_name = @tableNameNonQuoted;
set @query = IF(
    @exist <= 0,
    concat('create index Index_DispatchedAt on ', @tableNameQuoted, '(DispatchedAt)'), 'select \'Index Exists\' status');
prepare script from @query;
execute script;
deallocate prepare script;
select count(*)
into @exist
from information_schema.statistics
where
    table_schema = database() and
    index_name = 'Index_Dispatched' and
    table_name = @tableNameNonQuoted;
set @query = IF(
    @exist <= 0,
    concat('create index Index_Dispatched on ', @tableNameQuoted, '(Dispatched)'), 'select \'Index Exists\' status');
prepare script from @query;
execute script;
deallocate prepare script;
Drop Table
set @tableName = concat('`', @tablePrefix, 'OutboxData`');
set @dropTable = concat('drop table if exists ', @tableName);
prepare script from @dropTable;
execute script;
deallocate prepare script;
Saga
For a Saga with the following structure
[SqlSaga(transitionalCorrelationProperty: nameof(OrderSagaData.OrderId))]
public class OrderSaga : Saga<OrderSaga.OrderSagaData>,
    IAmStartedByMessages<StartSaga>
{
    protected override void ConfigureHowToFindSaga(SagaPropertyMapper<OrderSagaData> mapper)
    {
        mapper.ConfigureMapping<StartSaga>(msg => msg.OrderNumber).ToSaga(saga => saga.OrderNumber);
    }
    public class OrderSagaData :
        ContainSagaData
    {
        public int OrderNumber { get; set; }
        public Guid OrderId { get; set; }
    }
Create Table
/* TableNameVariable */
set @tableNameQuoted = concat('`', @tablePrefix, 'OrderSaga`');
set @tableNameNonQuoted = concat(@tablePrefix, 'OrderSaga');
/* Initialize */
drop procedure if exists sqlpersistence_raiseerror;
create procedure sqlpersistence_raiseerror(message varchar(256))
begin
signal sqlstate
    'ERROR'
set
    message_text = message,
    mysql_errno = '45000';
end;
/* CreateTable */
set @createTable = concat('
    create table if not exists ', @tableNameQuoted, '(
        Id varchar(38) not null,
        Metadata json not null,
        Data json not null,
        PersistenceVersion varchar(23) not null,
        SagaTypeVersion varchar(23) not null,
        Concurrency int not null,
        primary key (Id)
    ) default charset=ascii;
');
prepare script from @createTable;
execute script;
deallocate prepare script;
/* AddProperty OrderNumber */
select count(*)
into @exist
from information_schema.columns
where table_schema = database() and
      column_name = 'Correlation_OrderNumber' and
      table_name = @tableNameNonQuoted;
set @query = IF(
    @exist <= 0,
    concat('alter table ', @tableNameQuoted, ' add column Correlation_OrderNumber bigint(20)'), 'select \'Column Exists\' status');
prepare script from @query;
execute script;
deallocate prepare script;
/* VerifyColumnType Int */
set @column_type_OrderNumber = (
  select concat(column_type,' character set ', character_set_name)
  from information_schema.columns
  where
    table_schema = database() and
    table_name = @tableNameNonQuoted and
    column_name = 'Correlation_OrderNumber'
);
set @query = IF(
    @column_type_OrderNumber <> 'bigint(20)',
    'call sqlpersistence_raiseerror(concat(\'Incorrect data type for Correlation_OrderNumber. Expected bigint(20) got \', @column_type_OrderNumber, \'.\'));',
    'select \'Column Type OK\' status');
prepare script from @query;
execute script;
deallocate prepare script;
/* WriteCreateIndex OrderNumber */
select count(*)
into @exist
from information_schema.statistics
where
    table_schema = database() and
    index_name = 'Index_Correlation_OrderNumber' and
    table_name = @tableNameNonQuoted;
set @query = IF(
    @exist <= 0,
    concat('create unique index Index_Correlation_OrderNumber on ', @tableNameQuoted, '(Correlation_OrderNumber)'), 'select \'Index Exists\' status');
prepare script from @query;
execute script;
deallocate prepare script;
/* AddProperty OrderId */
select count(*)
into @exist
from information_schema.columns
where table_schema = database() and
      column_name = 'Correlation_OrderId' and
      table_name = @tableNameNonQuoted;
set @query = IF(
    @exist <= 0,
    concat('alter table ', @tableNameQuoted, ' add column Correlation_OrderId varchar(38) character set ascii'), 'select \'Column Exists\' status');
prepare script from @query;
execute script;
deallocate prepare script;
/* VerifyColumnType Guid */
set @column_type_OrderId = (
  select concat(column_type,' character set ', character_set_name)
  from information_schema.columns
  where
    table_schema = database() and
    table_name = @tableNameNonQuoted and
    column_name = 'Correlation_OrderId'
);
set @query = IF(
    @column_type_OrderId <> 'varchar(38) character set ascii',
    'call sqlpersistence_raiseerror(concat(\'Incorrect data type for Correlation_OrderId. Expected varchar(38) character set ascii got \', @column_type_OrderId, \'.\'));',
    'select \'Column Type OK\' status');
prepare script from @query;
execute script;
deallocate prepare script;
/* CreateIndex OrderId */
select count(*)
into @exist
from information_schema.statistics
where
    table_schema = database() and
    index_name = 'Index_Correlation_OrderId' and
    table_name = @tableNameNonQuoted;
set @query = IF(
    @exist <= 0,
    concat('create unique index Index_Correlation_OrderId on ', @tableNameQuoted, '(Correlation_OrderId)'), 'select \'Index Exists\' status');
prepare script from @query;
execute script;
deallocate prepare script;
/* PurgeObsoleteIndex */
select concat('drop index ', index_name, ' on ', @tableNameQuoted, ';')
from information_schema.statistics
where
    table_schema = database() and
    table_name = @tableNameNonQuoted and
    index_name like 'Index_Correlation_%' and
    index_name <> 'Index_Correlation_OrderNumber' and
    index_name <> 'Index_Correlation_OrderId' and
    table_schema = database()
into @dropIndexQuery;
select if (
    @dropIndexQuery is not null,
    @dropIndexQuery,
    'select ''no index to delete'';')
    into @dropIndexQuery;
prepare script from @dropIndexQuery;
execute script;
deallocate prepare script;
/* PurgeObsoleteProperties */
select concat('alter table ', table_name, ' drop column ', column_name, ';')
from information_schema.columns
where
    table_schema = database() and
    table_name = @tableNameNonQuoted and
    column_name like 'Correlation_%' and
    column_name <> 'Correlation_OrderNumber' and
    column_name <> 'Correlation_OrderId'
into @dropPropertiesQuery;
select if (
    @dropPropertiesQuery is not null,
    @dropPropertiesQuery,
    'select ''no property to delete'';')
    into @dropPropertiesQuery;
prepare script from @dropPropertiesQuery;
execute script;
deallocate prepare script;
/* CompleteSagaScript */
Drop Table
/* TableNameVariable */
set @tableNameQuoted = concat('`', @tablePrefix, 'OrderSaga`');
set @tableNameNonQuoted = concat(@tablePrefix, 'OrderSaga');
/* DropTable */
set @dropTable = concat('drop table if exists ', @tableNameQuoted);
prepare script from @dropTable;
execute script;
deallocate prepare script;
Subscription
Create Table
set @tableName = concat('`', @tablePrefix, 'SubscriptionData`');
set @createTable = concat('
    create table if not exists ', @tableName, '(
        Subscriber nvarchar(200) not null,
        Endpoint nvarchar(200),
        MessageType nvarchar(200) not null,
        PersistenceVersion varchar(23) not null,
        primary key clustered (Subscriber, MessageType)
    ) default charset=ascii;
');
prepare script from @createTable;
execute script;
deallocate prepare script;
Drop Table
set @tableName = concat('`', @tablePrefix, 'SubscriptionData`');
set @dropTable = concat('drop table if exists ', @tableName);
prepare script from @dropTable;
execute script;
deallocate prepare script;
Timeout
Create Table
set @tableNameQuoted = concat('`', @tablePrefix, 'TimeoutData`');
set @tableNameNonQuoted = concat(@tablePrefix, 'TimeoutData');
set @createTable = concat('
    create table if not exists ', @tableNameQuoted, '(
        Id varchar(38) not null,
        Destination nvarchar(200),
        SagaId varchar(38),
        State longblob,
        Time datetime,
        Headers json not null,
        PersistenceVersion varchar(23) not null,
        primary key (Id)
    ) default charset=ascii;
');
prepare script from @createTable;
execute script;
deallocate prepare script;
select count(*)
into @exist
from information_schema.statistics
where
    table_schema = database() and
    index_name = 'Index_SagaId' and
    table_name = @tableNameNonQuoted;
set @query = IF(
    @exist <= 0,
    concat('create index Index_SagaId on ', @tableNameQuoted, '(SagaId)'), 'select \'Index Exists\' status');
prepare script from @query;
execute script;
deallocate prepare script;
select count(*)
into @exist
from information_schema.statistics
where
    table_schema = database() and
    index_name = 'Index_Time' and
    table_name = @tableNameNonQuoted;
set @query = IF(
    @exist <= 0,
    concat('create index Index_Time on ', @tableNameQuoted, '(Time)'), 'select \'Index Exists\' status');
prepare script from @query;
execute script;
deallocate prepare script;
Drop Table
set @tableName = concat('`', @tablePrefix, 'TimeoutData`');
set @dropTable = concat('drop table if exists ', @tableName, '');
prepare script from @dropTable;
execute script;
deallocate prepare script;
Run Time
SQL used at runtime to query and update data.
Outbox
Used at intervals to cleanup old outbox records.
delete from `EndpointNameOutboxData`
where Dispatched = true and
      DispatchedAt < @DispatchedBefore
limit @BatchSize
Get
Used by IOutboxStorage..
select
    Dispatched,
    Operations
from `EndpointNameOutboxData`
where MessageId = @MessageId
SetAsDispatched
Used by IOutboxStorage..
update `EndpointNameOutboxData`
set
    Dispatched = 1,
    DispatchedAt = @DispatchedAt,
    Operations = '[]'
where MessageId = @MessageId
Store
Used by IOutboxStorage..
Optimistic (default) mode
insert into `EndpointNameOutboxData`
(
    MessageId,
    Operations,
    PersistenceVersion
)
values
(
    @MessageId,
    @Operations,
    @PersistenceVersion
)
Pessimistic mode
insert into `EndpointNameOutboxData`
(
    MessageId,
    Operations,
    PersistenceVersion
)
values
(
    @MessageId,
    '[]',
    @PersistenceVersion
)
update `EndpointNameOutboxData`
set
    Operations = @Operations
where MessageId = @MessageId
Saga
Complete
Used by ISagaPersister..
delete from EndpointName_SagaName
where Id = @Id and Concurrency = @Concurrency
Save
Used by ISagaPersister..
insert into EndpointName_SagaName
(
    Id,
    Metadata,
    Data,
    PersistenceVersion,
    SagaTypeVersion,
    Concurrency,
    Correlation_CorrelationProperty,
    Correlation_TransitionalCorrelationProperty
)
values
(
    @Id,
    @Metadata,
    @Data,
    @PersistenceVersion,
    @SagaTypeVersion,
    1,
    @CorrelationId,
    @TransitionalCorrelationId
)
GetByProperty
Used by ISagaPersister..
select
    Id,
    SagaTypeVersion,
    Concurrency,
    Metadata,
    Data
from EndpointName_SagaName
where Correlation_PropertyName = @propertyValue
for update
GetBySagaId
Used by ISagaPersister..
select
    Id,
    SagaTypeVersion,
    Concurrency,
    Metadata,
    Data
from EndpointName_SagaName
where Id = @Id
for update
Update
Used by ISagaPersister..
update EndpointName_SagaName
set
    Data = @Data,
    PersistenceVersion = @PersistenceVersion,
    SagaTypeVersion = @SagaTypeVersion,
    Concurrency = @Concurrency + 1,
    Correlation_TransitionalCorrelationProperty = @TransitionalCorrelationId
where
    Id = @Id and Concurrency = @Concurrency
Select used by Saga Finder
select
    Id,
    SagaTypeVersion,
    Concurrency,
    Metadata,
    Data
from EndpointName_SagaName
where 1 = 1
for update
Subscription
GetSubscribers
Used by ISubscriptionStorage..
select distinct Subscriber, Endpoint
from `EndpointNameSubscriptionData`
where MessageType in (@type0)
Subscribe
Used by ISubscriptionStorage..
insert into `EndpointNameSubscriptionData`
(
    Subscriber,
    MessageType,
    Endpoint,
    PersistenceVersion
)
values
(
    @Subscriber,
    @MessageType,
    @Endpoint,
    @PersistenceVersion
)
on duplicate key update
    Endpoint = coalesce(@Endpoint, Endpoint),
    PersistenceVersion = @PersistenceVersion
Unsubscribe
Used by ISubscriptionStorage..
delete from `EndpointNameSubscriptionData`
where
    Subscriber = @Subscriber and
    MessageType = @MessageType
Timeout
Peek
Used by IPersistTimeouts..
select
    Destination,
    SagaId,
    State,
    Time,
    Headers
from `EndpointNameTimeoutData`
where Id = @Id
Add
Used by IPersistTimeouts..
insert into `EndpointNameTimeoutData`
(
    Id,
    Destination,
    SagaId,
    State,
    Time,
    Headers,
    PersistenceVersion
)
values
(
    @Id,
    @Destination,
    @SagaId,
    @State,
    @Time,
    @Headers,
    @PersistenceVersion
)
GetNextChunk
Used by IQueryTimeouts..
select Time from `EndpointNameTimeoutData`
where Time > @EndTime
order by Time
limit 1
select Id, Time
from `EndpointNameTimeoutData`
where Time > @StartTime and Time <= @EndTime
TryRemove
Used by IPersistTimeouts..
delete from `EndpointNameTimeoutData`
where Id = @Id;
RemoveTimeoutBy
Used by IPersistTimeouts..
delete from `EndpointNameTimeoutData`
where SagaId = @SagaId