Scripts and SQL used when interacting with a MySql database.
Build Time
Scripts are created at build time and are executed as part of a deployment or decommissioning of an endpoint.
Outbox
Create Table
set @tableNameQuoted = concat('`', @tablePrefix, 'OutboxData`');
set @tableNameNonQuoted = concat(@tablePrefix, 'OutboxData');
set @createTable = concat('
create table if not exists ', @tableNameQuoted, '(
MessageId nvarchar(200) not null,
Dispatched bit not null default 0,
DispatchedAt datetime,
PersistenceVersion varchar(23) not null,
Operations json not null,
primary key (MessageId)
) default charset=ascii;
');
prepare script from @createTable;
execute script;
deallocate prepare script;
select count(*)
into @exist
from information_schema.statistics
where
table_schema = database() and
index_name = 'Index_DispatchedAt' and
table_name = @tableNameNonQuoted;
set @query = IF(
@exist <= 0,
concat('create index Index_DispatchedAt on ', @tableNameQuoted, '(DispatchedAt)'), 'select \'Index Exists\' status');
prepare script from @query;
execute script;
deallocate prepare script;
select count(*)
into @exist
from information_schema.statistics
where
table_schema = database() and
index_name = 'Index_Dispatched' and
table_name = @tableNameNonQuoted;
set @query = IF(
@exist <= 0,
concat('create index Index_Dispatched on ', @tableNameQuoted, '(Dispatched)'), 'select \'Index Exists\' status');
prepare script from @query;
execute script;
deallocate prepare script;
Drop Table
set @tableName = concat('`', @tablePrefix, 'OutboxData`');
set @dropTable = concat('drop table if exists ', @tableName);
prepare script from @dropTable;
execute script;
deallocate prepare script;
Saga
For a Saga with the following structure
[SqlSaga(transitionalCorrelationProperty: nameof(OrderSagaData.OrderId))]
public class OrderSaga : Saga<OrderSaga.OrderSagaData>,
IAmStartedByMessages<StartSaga>
{
protected override void ConfigureHowToFindSaga(SagaPropertyMapper<OrderSagaData> mapper)
{
mapper.ConfigureMapping<StartSaga>(msg => msg.OrderNumber).ToSaga(saga => saga.OrderNumber);
}
public class OrderSagaData :
ContainSagaData
{
public int OrderNumber { get; set; }
public Guid OrderId { get; set; }
}
Create Table
/* TableNameVariable */
set @tableNameQuoted = concat('`', @tablePrefix, 'OrderSaga`');
set @tableNameNonQuoted = concat(@tablePrefix, 'OrderSaga');
/* Initialize */
drop procedure if exists sqlpersistence_raiseerror;
create procedure sqlpersistence_raiseerror(message varchar(256))
begin
signal sqlstate
'ERROR'
set
message_text = message,
mysql_errno = '45000';
end;
/* CreateTable */
set @createTable = concat('
create table if not exists ', @tableNameQuoted, '(
Id varchar(38) not null,
Metadata json not null,
Data json not null,
PersistenceVersion varchar(23) not null,
SagaTypeVersion varchar(23) not null,
Concurrency int not null,
primary key (Id)
) default charset=ascii;
');
prepare script from @createTable;
execute script;
deallocate prepare script;
/* AddProperty OrderNumber */
select count(*)
into @exist
from information_schema.columns
where table_schema = database() and
column_name = 'Correlation_OrderNumber' and
table_name = @tableNameNonQuoted;
set @query = IF(
@exist <= 0,
concat('alter table ', @tableNameQuoted, ' add column Correlation_OrderNumber bigint(20)'), 'select \'Column Exists\' status');
prepare script from @query;
execute script;
deallocate prepare script;
/* VerifyColumnType Int */
set @column_type_OrderNumber = (
select concat(column_type,' character set ', character_set_name)
from information_schema.columns
where
table_schema = database() and
table_name = @tableNameNonQuoted and
column_name = 'Correlation_OrderNumber'
);
set @query = IF(
@column_type_OrderNumber <> 'bigint(20)',
'call sqlpersistence_raiseerror(concat(\'Incorrect data type for Correlation_OrderNumber. Expected bigint(20) got \', @column_type_OrderNumber, \'.\'));',
'select \'Column Type OK\' status');
prepare script from @query;
execute script;
deallocate prepare script;
/* WriteCreateIndex OrderNumber */
select count(*)
into @exist
from information_schema.statistics
where
table_schema = database() and
index_name = 'Index_Correlation_OrderNumber' and
table_name = @tableNameNonQuoted;
set @query = IF(
@exist <= 0,
concat('create unique index Index_Correlation_OrderNumber on ', @tableNameQuoted, '(Correlation_OrderNumber)'), 'select \'Index Exists\' status');
prepare script from @query;
execute script;
deallocate prepare script;
/* AddProperty OrderId */
select count(*)
into @exist
from information_schema.columns
where table_schema = database() and
column_name = 'Correlation_OrderId' and
table_name = @tableNameNonQuoted;
set @query = IF(
@exist <= 0,
concat('alter table ', @tableNameQuoted, ' add column Correlation_OrderId varchar(38) character set ascii'), 'select \'Column Exists\' status');
prepare script from @query;
execute script;
deallocate prepare script;
/* VerifyColumnType Guid */
set @column_type_OrderId = (
select concat(column_type,' character set ', character_set_name)
from information_schema.columns
where
table_schema = database() and
table_name = @tableNameNonQuoted and
column_name = 'Correlation_OrderId'
);
set @query = IF(
@column_type_OrderId <> 'varchar(38) character set ascii',
'call sqlpersistence_raiseerror(concat(\'Incorrect data type for Correlation_OrderId. Expected varchar(38) character set ascii got \', @column_type_OrderId, \'.\'));',
'select \'Column Type OK\' status');
prepare script from @query;
execute script;
deallocate prepare script;
/* CreateIndex OrderId */
select count(*)
into @exist
from information_schema.statistics
where
table_schema = database() and
index_name = 'Index_Correlation_OrderId' and
table_name = @tableNameNonQuoted;
set @query = IF(
@exist <= 0,
concat('create unique index Index_Correlation_OrderId on ', @tableNameQuoted, '(Correlation_OrderId)'), 'select \'Index Exists\' status');
prepare script from @query;
execute script;
deallocate prepare script;
/* PurgeObsoleteIndex */
select concat('drop index ', index_name, ' on ', @tableNameQuoted, ';')
from information_schema.statistics
where
table_schema = database() and
table_name = @tableNameNonQuoted and
index_name like 'Index_Correlation_%' and
index_name <> 'Index_Correlation_OrderNumber' and
index_name <> 'Index_Correlation_OrderId' and
table_schema = database()
into @dropIndexQuery;
select if (
@dropIndexQuery is not null,
@dropIndexQuery,
'select ''no index to delete'';')
into @dropIndexQuery;
prepare script from @dropIndexQuery;
execute script;
deallocate prepare script;
/* PurgeObsoleteProperties */
select concat('alter table ', table_name, ' drop column ', column_name, ';')
from information_schema.columns
where
table_schema = database() and
table_name = @tableNameNonQuoted and
column_name like 'Correlation_%' and
column_name <> 'Correlation_OrderNumber' and
column_name <> 'Correlation_OrderId'
into @dropPropertiesQuery;
select if (
@dropPropertiesQuery is not null,
@dropPropertiesQuery,
'select ''no property to delete'';')
into @dropPropertiesQuery;
prepare script from @dropPropertiesQuery;
execute script;
deallocate prepare script;
/* CompleteSagaScript */
Drop Table
/* TableNameVariable */
set @tableNameQuoted = concat('`', @tablePrefix, 'OrderSaga`');
set @tableNameNonQuoted = concat(@tablePrefix, 'OrderSaga');
/* DropTable */
set @dropTable = concat('drop table if exists ', @tableNameQuoted);
prepare script from @dropTable;
execute script;
deallocate prepare script;
Subscription
Create Table
set @tableName = concat('`', @tablePrefix, 'SubscriptionData`');
set @createTable = concat('
create table if not exists ', @tableName, '(
Subscriber nvarchar(200) not null,
Endpoint nvarchar(200),
MessageType nvarchar(200) not null,
PersistenceVersion varchar(23) not null,
primary key clustered (Subscriber, MessageType)
) default charset=ascii;
');
prepare script from @createTable;
execute script;
deallocate prepare script;
Drop Table
set @tableName = concat('`', @tablePrefix, 'SubscriptionData`');
set @dropTable = concat('drop table if exists ', @tableName);
prepare script from @dropTable;
execute script;
deallocate prepare script;
Timeout
Create Table
set @tableNameQuoted = concat('`', @tablePrefix, 'TimeoutData`');
set @tableNameNonQuoted = concat(@tablePrefix, 'TimeoutData');
set @createTable = concat('
create table if not exists ', @tableNameQuoted, '(
Id varchar(38) not null,
Destination nvarchar(200),
SagaId varchar(38),
State longblob,
Time datetime,
Headers json not null,
PersistenceVersion varchar(23) not null,
primary key (Id)
) default charset=ascii;
');
prepare script from @createTable;
execute script;
deallocate prepare script;
select count(*)
into @exist
from information_schema.statistics
where
table_schema = database() and
index_name = 'Index_SagaId' and
table_name = @tableNameNonQuoted;
set @query = IF(
@exist <= 0,
concat('create index Index_SagaId on ', @tableNameQuoted, '(SagaId)'), 'select \'Index Exists\' status');
prepare script from @query;
execute script;
deallocate prepare script;
select count(*)
into @exist
from information_schema.statistics
where
table_schema = database() and
index_name = 'Index_Time' and
table_name = @tableNameNonQuoted;
set @query = IF(
@exist <= 0,
concat('create index Index_Time on ', @tableNameQuoted, '(Time)'), 'select \'Index Exists\' status');
prepare script from @query;
execute script;
deallocate prepare script;
Drop Table
set @tableName = concat('`', @tablePrefix, 'TimeoutData`');
set @dropTable = concat('drop table if exists ', @tableName, '');
prepare script from @dropTable;
execute script;
deallocate prepare script;
Run Time
SQL used at runtime to query and update data.
Outbox
Used at intervals to cleanup old outbox records.
delete from `EndpointNameOutboxData`
where Dispatched = true and
DispatchedAt < @DispatchedBefore
limit @BatchSize
Get
Used by IOutboxStorage.
.
select
Dispatched,
Operations
from `EndpointNameOutboxData`
where MessageId = @MessageId
SetAsDispatched
Used by IOutboxStorage.
.
update `EndpointNameOutboxData`
set
Dispatched = 1,
DispatchedAt = @DispatchedAt,
Operations = '[]'
where MessageId = @MessageId
Store
Used by IOutboxStorage.
.
insert into `EndpointNameOutboxData`
(
MessageId,
Operations,
PersistenceVersion
)
values
(
@MessageId,
@Operations,
@PersistenceVersion
)
Saga
Complete
Used by ISagaPersister.
.
delete from EndpointName_SagaName
where Id = @Id and Concurrency = @Concurrency
Save
Used by ISagaPersister.
.
insert into EndpointName_SagaName
(
Id,
Metadata,
Data,
PersistenceVersion,
SagaTypeVersion,
Concurrency,
Correlation_CorrelationProperty,
Correlation_TransitionalCorrelationProperty
)
values
(
@Id,
@Metadata,
@Data,
@PersistenceVersion,
@SagaTypeVersion,
1,
@CorrelationId,
@TransitionalCorrelationId
)
GetByProperty
Used by ISagaPersister.
.
select
Id,
SagaTypeVersion,
Concurrency,
Metadata,
Data
from EndpointName_SagaName
where Correlation_PropertyName = @propertyValue
for update
GetBySagaId
Used by ISagaPersister.
.
select
Id,
SagaTypeVersion,
Concurrency,
Metadata,
Data
from EndpointName_SagaName
where Id = @Id
for update
Update
Used by ISagaPersister.
.
update EndpointName_SagaName
set
Data = @Data,
PersistenceVersion = @PersistenceVersion,
SagaTypeVersion = @SagaTypeVersion,
Concurrency = @Concurrency + 1,
Correlation_TransitionalCorrelationProperty = @TransitionalCorrelationId
where
Id = @Id and Concurrency = @Concurrency
Select used by Saga Finder
select
Id,
SagaTypeVersion,
Concurrency,
Metadata,
Data
from EndpointName_SagaName
where 1 = 1
for update
Subscription
GetSubscribers
Used by ISubscriptionStorage.
.
select distinct Subscriber, Endpoint
from `EndpointNameSubscriptionData`
where MessageType in (@type0)
Subscribe
Used by ISubscriptionStorage.
.
insert into `EndpointNameSubscriptionData`
(
Subscriber,
MessageType,
Endpoint,
PersistenceVersion
)
values
(
@Subscriber,
@MessageType,
@Endpoint,
@PersistenceVersion
)
on duplicate key update
Endpoint = coalesce(@Endpoint, Endpoint),
PersistenceVersion = @PersistenceVersion
Unsubscribe
Used by ISubscriptionStorage.
.
delete from `EndpointNameSubscriptionData`
where
Subscriber = @Subscriber and
MessageType = @MessageType
Timeout
Peek
Used by IPersistTimeouts.
.
select
Destination,
SagaId,
State,
Time,
Headers
from `EndpointNameTimeoutData`
where Id = @Id
Add
Used by IPersistTimeouts.
.
insert into `EndpointNameTimeoutData`
(
Id,
Destination,
SagaId,
State,
Time,
Headers,
PersistenceVersion
)
values
(
@Id,
@Destination,
@SagaId,
@State,
@Time,
@Headers,
@PersistenceVersion
)
GetNextChunk
Used by IQueryTimeouts.
.
select Time from `EndpointNameTimeoutData`
where Time > @EndTime
order by Time
limit 1
select Id, Time
from `EndpointNameTimeoutData`
where Time > @StartTime and Time <= @EndTime
TryRemove
Used by IPersistTimeouts.
.
delete from `EndpointNameTimeoutData`
where Id = @Id;
RemoveTimeoutBy
Used by IPersistTimeouts.
.
delete from `EndpointNameTimeoutData`
where SagaId = @SagaId