MySql Scripts

Component: Sql Persistence
NuGet Package NServiceBus.Persistence.Sql (1.x)
Target NServiceBus Version: 6.x

Scripts and SQL used when interacting with a MySql database.

Build Time

Scripts created at build time and executed as part of a deployment or decommissioning of an endpoint.

Outbox

Create Table

set @tableName = concat(@tablePrefix, 'OutboxData');
set @createTable =  concat('
    create table if not exists ', @tableName, '(
        MessageId nvarchar(200) not null,
        Dispatched bit not null default 0,
        DispatchedAt datetime,
        PersistenceVersion varchar(23) not null,
        Operations json not null,
        primary key (`MessageId`)
    ) default charset=ascii;
');
prepare script from @createTable;
execute script;
deallocate prepare script;

Drop Table

set @tableName = concat(@tablePrefix, 'OutboxData');
set @dropTable = concat('drop table if exists ', @tableName);
prepare script from @dropTable;
execute script;
deallocate prepare script;

Saga

For a Saga with the following structure

[SqlSaga(
    correlationProperty: "OrderNumber",
    transitionalCorrelationProperty: "OrderId")]
public class OrderSaga :
    SqlSaga<OrderSaga.OrderSagaData>
{
    public class OrderSagaData :
        ContainSagaData
    {
        public int OrderNumber { get; set; }
        public Guid OrderId { get; set; }
    }

Create Table

/* TableNameVariable */

set @tableName = concat(@tablePrefix, 'OrderSaga');


/* Initialize */

drop procedure if exists sqlpersistence_raiseerror;
create procedure sqlpersistence_raiseerror(message varchar(256))
begin
signal sqlstate
    'ERROR'
set
    message_text = message,
    mysql_errno = '45000';
end;

/* CreateTable */

set @createTable = concat('
    create table if not exists ', @tableName, '(
        Id varchar(38) not null,
        Metadata json not null,
        Data json not null,
        PersistenceVersion varchar(23) not null,
        SagaTypeVersion varchar(23) not null,
        Concurrency int not null,
        primary key (Id)
    ) default charset=ascii;
');
prepare script from @createTable;
execute script;
deallocate prepare script;

/* AddProperty OrderNumber */

select count(*)
into @exist
from information_schema.columns
where table_schema = database() and
      column_name = 'Correlation_OrderNumber' and
      table_name = @tableName;

set @query = IF(
    @exist <= 0,
    concat('alter table ', @tableName, ' add column Correlation_OrderNumber bigint(20)'), 'select \'Column Exists\' status');

prepare script from @query;
execute script;
deallocate prepare script;

/* VerifyColumnType Int */

set @column_type_OrderNumber = (
  select concat(column_type,' character set ', character_set_name)
  from information_schema.columns
  where
    table_schema = database() and
    table_name = @tableName and
    column_name = 'Correlation_OrderNumber'
);

set @query = IF(
    @column_type_OrderNumber <> 'bigint(20)',
    'call sqlpersistence_raiseerror(concat(\'Incorrect data type for Correlation_OrderNumber. Expected bigint(20) got \', @column_type_OrderNumber, \'.\'));',
    'select \'Column Type OK\' status');

prepare script from @query;
execute script;
deallocate prepare script;

/* WriteCreateIndex OrderNumber */

select count(*)
into @exist
from information_schema.statistics
where
    table_schema = database() and
    index_name = 'Index_Correlation_OrderNumber' and
    table_name = @tableName;

set @query = IF(
    @exist <= 0,
    concat('create unique index Index_Correlation_OrderNumber on ', @tableName, '(Correlation_OrderNumber)'), 'select \'Index Exists\' status');

prepare script from @query;
execute script;
deallocate prepare script;

/* AddProperty OrderId */

select count(*)
into @exist
from information_schema.columns
where table_schema = database() and
      column_name = 'Correlation_OrderId' and
      table_name = @tableName;

set @query = IF(
    @exist <= 0,
    concat('alter table ', @tableName, ' add column Correlation_OrderId varchar(38) character set ascii'), 'select \'Column Exists\' status');

prepare script from @query;
execute script;
deallocate prepare script;

/* VerifyColumnType Guid */

set @column_type_OrderId = (
  select concat(column_type,' character set ', character_set_name)
  from information_schema.columns
  where
    table_schema = database() and
    table_name = @tableName and
    column_name = 'Correlation_OrderId'
);

set @query = IF(
    @column_type_OrderId <> 'varchar(38) character set ascii',
    'call sqlpersistence_raiseerror(concat(\'Incorrect data type for Correlation_OrderId. Expected varchar(38) character set ascii got \', @column_type_OrderId, \'.\'));',
    'select \'Column Type OK\' status');

prepare script from @query;
execute script;
deallocate prepare script;

/* CreateIndex OrderId */

select count(*)
into @exist
from information_schema.statistics
where
    table_schema = database() and
    index_name = 'Index_Correlation_OrderId' and
    table_name = @tableName;

set @query = IF(
    @exist <= 0,
    concat('create unique index Index_Correlation_OrderId on ', @tableName, '(Correlation_OrderId)'), 'select \'Index Exists\' status');

prepare script from @query;
execute script;
deallocate prepare script;

/* PurgeObsoleteIndex */

select concat('drop index ', index_name, ' on ', @tableName, ';')
from information_schema.statistics
where
    table_schema = database() and
    table_name = @tableName and
    index_name like 'Index_Correlation_%' and
    index_name <> 'Index_Correlation_OrderNumber' and
    index_name <> 'Index_Correlation_OrderId' and
    table_schema = database()
into @dropIndexQuery;
select if (
    @dropIndexQuery is not null,
    @dropIndexQuery,
    'select ''no index to delete'';')
    into @dropIndexQuery;

prepare script from @dropIndexQuery;
execute script;
deallocate prepare script;

/* PurgeObsoleteProperties */

select concat('alter table ', @tableName, ' drop column ', column_name, ';')
from information_schema.columns
where
    table_schema = database() and
    table_name = @tableName and
    column_name like 'Correlation_%' and
    column_name <> 'Correlation_OrderNumber' and
    column_name <> 'Correlation_OrderId'
into @dropPropertiesQuery;

select if (
    @dropPropertiesQuery is not null,
    @dropPropertiesQuery,
    'select ''no property to delete'';')
    into @dropPropertiesQuery;

prepare script from @dropPropertiesQuery;
execute script;
deallocate prepare script;

Drop Table

/* TableNameVariable */

set @tableName = concat(@tablePrefix, 'OrderSaga');


/* DropTable */

set @dropTable = concat('drop table if exists ', @tableName);
prepare script from @dropTable;
execute script;
deallocate prepare script;

Subscription

Create Table

set @tableName = concat(@tablePrefix, 'SubscriptionData');
set @createTable = concat('
    create table if not exists ', @tableName, '(
        Subscriber nvarchar(200) not null,
        Endpoint nvarchar(200) null,
        MessageType nvarchar(200) not null,
        PersistenceVersion varchar(23) not null,
        primary key clustered (Subscriber, MessageType)
    ) default charset=ascii;
');
prepare script from @createTable;
execute script;
deallocate prepare script;

Drop Table

set @tableName = concat(@tablePrefix, 'SubscriptionData');
set @dropTable = concat('drop table if exists ', @tableName);
prepare script from @dropTable;
execute script;
deallocate prepare script;

Timeout

Create Table

set @tableName = concat(@tablePrefix, 'TimeoutData');
set @createTable = concat('
    create table if not exists ', @tableName, '(
        Id varchar(38) not null,
        Destination nvarchar(200),
        SagaId varchar(38),
        State longblob,
        Time datetime,
        Headers json not null,
        PersistenceVersion varchar(23) not null,
        primary key (Id)
    ) default charset=ascii;
');
prepare script from @createTable;
execute script;
deallocate prepare script;

Drop Table

set @tableName = concat(@tablePrefix, 'TimeoutData');
set @dropTable = concat('drop table if exists ', @tableName, '');
prepare script from @dropTable;
execute script;
deallocate prepare script;

Run Time

SQL used at runtime to query and update data.

Outbox

Used at intervals to cleanup old outbox records.

delete from EndpointNameOutboxData where Dispatched = 1 And DispatchedAt < @Date

Get

Used by IOutboxStorage.SetAsDispatched.

select
    Dispatched,
    Operations
from EndpointNameOutboxData
where MessageId = @MessageId

SetAsDispatched

Used by IOutboxStorage.SetAsDispatched.

update EndpointNameOutboxData
set
    Dispatched = 1,
    DispatchedAt = @DispatchedAt,
    Operations = '[]'
where MessageId = @MessageId

Store

Used by IOutboxStorage.Store.

insert into EndpointNameOutboxData
(
    MessageId,
    Operations,
    PersistenceVersion
)
values
(
    @MessageId,
    @Operations,
    @PersistenceVersion
)

Saga

Complete

Used by ISagaPersister.Complete.

delete from EndpointNameSagaName
where Id = @Id AND Concurrency = @Concurrency

Save

Used by ISagaPersister.Save.

insert into EndpointNameSagaName
(
    Id,
    Metadata,
    Data,
    PersistenceVersion,
    SagaTypeVersion,
    Concurrency,
    Correlation_CorrelationProperty,
    Correlation_TransitionalCorrelationProperty
)
values
(
    @Id,
    @Metadata,
    @Data,
    @PersistenceVersion,
    @SagaTypeVersion,
    1,
    @CorrelationId,
    @TransitionalCorrelationId
)

GetByProperty

Used by ISagaPersister.Get(propertyName...).

select
    Id,
    SagaTypeVersion,
    Concurrency,
    Metadata,
    Data
from EndpointNameSagaName
where Correlation_PropertyName = @propertyValue

GetBySagaId

Used by ISagaPersister.Get(sagaId...).

select
    Id,
    SagaTypeVersion,
    Concurrency,
    Metadata,
    Data
from EndpointNameSagaName
where Id = @Id

Update

Used by ISagaPersister.Update.

update EndpointNameSagaName
set
    Data = @Data,
    PersistenceVersion = @PersistenceVersion,
    SagaTypeVersion = @SagaTypeVersion,
    Concurrency = @Concurrency + 1,
    Correlation_TransitionalCorrelationProperty = @TransitionalCorrelationId
where
    Id = @Id AND Concurrency = @Concurrency

Subscription

GetSubscribers

Used by ISubscriptionStorage.GetSubscriberAddressesForMessage.

select distinct Subscriber, Endpoint
from EndpointNameSubscriptionData
where MessageType in (@type0)

Subscribe

Used by ISubscriptionStorage.Subscribe.

insert into EndpointNameSubscriptionData
(
    Subscriber,
    MessageType,
    Endpoint,
    PersistenceVersion
)
values
(
    @Subscriber,
    @MessageType,
    @Endpoint,
    @PersistenceVersion
)
on duplicate key update
    Endpoint = @Endpoint,
    PersistenceVersion = @PersistenceVersion

Unsubscribe

Used by ISubscriptionStorage.Unsubscribe.

delete from EndpointNameSubscriptionData
where
    Subscriber = @Subscriber and
    MessageType = @MessageType

Timeout

Peek

Used by IPersistTimeouts.Peek.

select
    Destination,
    SagaId,
    State,
    Time,
    Headers
from EndpointNameTimeoutData
where Id = @Id

Add

Used by IPersistTimeouts.Add.

insert into EndpointNameTimeoutData
(
    Id,
    Destination,
    SagaId,
    State,
    Time,
    Headers,
    PersistenceVersion
)
values
(
    @Id,
    @Destination,
    @SagaId,
    @State,
    @Time,
    @Headers,
    @PersistenceVersion
)

GetNextChunk

Used by IQueryTimeouts.GetNextChunk.

select Time from EndpointNameTimeoutData
where Time > @EndTime
order by Time
limit 1
select Id, Time
from EndpointNameTimeoutData
where Time between @StartTime and @EndTime

TryRemove

Used by IPersistTimeouts.TryRemove.

set @sagaId := (select SagaId from EndpointNameTimeoutData where Id = @Id);
delete from EndpointNameTimeoutData
where Id = @Id;
select @sagaId;

RemoveTimeoutBy

Used by IPersistTimeouts.RemoveTimeoutBy.

delete from EndpointNameTimeoutData
where SagaId = @SagaId

Related Articles


Last modified