Scripts and SQL used when interacting with a SQL Server database.
Build Time
Scripts are created at build time and are executed as part of a deployment or decommissioning of an endpoint.
Outbox
Create Table
declare @tableName nvarchar(max) = '[' + @schema + '].[' + @tablePrefix + 'OutboxData]';
if not exists (
select * from sys.objects
where
object_id = object_id(@tableName)
and type in ('U')
)
begin
declare @createTable nvarchar(max);
set @createTable = '
create table ' + @tableName + '(
MessageId nvarchar(200) not null primary key nonclustered,
Dispatched bit not null default 0,
DispatchedAt datetime,
PersistenceVersion varchar(23) not null,
Operations nvarchar(max) not null
)
';
exec(@createTable);
end
if not exists
(
select *
from sys.indexes
where
name = 'Index_DispatchedAt' and
object_id = object_id(@tableName)
)
begin
declare @createDispatchedAtIndex nvarchar(max);
set @createDispatchedAtIndex = '
create index Index_DispatchedAt
on ' + @tableName + '(DispatchedAt) where Dispatched = 1;';
exec(@createDispatchedAtIndex);
end
Drop Table
declare @tableName nvarchar(max) = '[' + @schema + '].[' + @tablePrefix + 'OutboxData]';
if exists
(
select *
from sys.objects
where
object_id = object_id(@tableName) and
type in ('U')
)
begin
declare @dropTable nvarchar(max);
set @dropTable = 'drop table ' + @tableName;
exec(@dropTable);
end
Saga
For a Saga with the following structure
[SqlSaga(transitionalCorrelationProperty: nameof(OrderSagaData.OrderId))]
public class OrderSaga : Saga<OrderSaga.OrderSagaData>,
IAmStartedByMessages<StartSaga>
{
protected override void ConfigureHowToFindSaga(SagaPropertyMapper<OrderSagaData> mapper)
{
mapper.ConfigureMapping<StartSaga>(msg => msg.OrderNumber).ToSaga(saga => saga.OrderNumber);
}
public class OrderSagaData :
ContainSagaData
{
public int OrderNumber { get; set; }
public Guid OrderId { get; set; }
}
Create Table
/* TableNameVariable */
declare @tableName nvarchar(max) = '[' + @schema + '].[' + @tablePrefix + N'OrderSaga]';
declare @tableNameWithoutSchema nvarchar(max) = @tablePrefix + N'OrderSaga';
/* Initialize */
/* CreateTable */
if not exists
(
select *
from sys.objects
where
object_id = object_id(@tableName) and
type in ('U')
)
begin
declare @createTable nvarchar(max);
set @createTable = '
create table ' + @tableName + '(
Id uniqueidentifier not null primary key,
Metadata nvarchar(max) not null,
Data nvarchar(max) not null,
PersistenceVersion varchar(23) not null,
SagaTypeVersion varchar(23) not null,
Concurrency int not null
)
';
exec(@createTable);
end
/* AddProperty OrderNumber */
if not exists
(
select * from sys.columns
where
name = N'Correlation_OrderNumber' and
object_id = object_id(@tableName)
)
begin
declare @createColumn_OrderNumber nvarchar(max);
set @createColumn_OrderNumber = '
alter table ' + @tableName + N'
add Correlation_OrderNumber bigint;';
exec(@createColumn_OrderNumber);
end
/* VerifyColumnType Int */
declare @dataType_OrderNumber nvarchar(max);
set @dataType_OrderNumber = (
select data_type
from INFORMATION_SCHEMA.COLUMNS
where
table_name = @tableNameWithoutSchema and
table_schema = @schema and
column_name = 'Correlation_OrderNumber'
);
if (@dataType_OrderNumber <> 'bigint')
begin
declare @error_OrderNumber nvarchar(max) = N'Incorrect data type for Correlation_OrderNumber. Expected bigint got ' + @dataType_OrderNumber + '.';
throw 50000, @error_OrderNumber, 0
end
/* WriteCreateIndex OrderNumber */
if not exists
(
select *
from sys.indexes
where
name = N'Index_Correlation_OrderNumber' and
object_id = object_id(@tableName)
)
begin
declare @createIndex_OrderNumber nvarchar(max);
set @createIndex_OrderNumber = N'
create unique index Index_Correlation_OrderNumber
on ' + @tableName + N'(Correlation_OrderNumber)
where Correlation_OrderNumber is not null;';
exec(@createIndex_OrderNumber);
end
/* AddProperty OrderId */
if not exists
(
select * from sys.columns
where
name = N'Correlation_OrderId' and
object_id = object_id(@tableName)
)
begin
declare @createColumn_OrderId nvarchar(max);
set @createColumn_OrderId = '
alter table ' + @tableName + N'
add Correlation_OrderId uniqueidentifier;';
exec(@createColumn_OrderId);
end
/* VerifyColumnType Guid */
declare @dataType_OrderId nvarchar(max);
set @dataType_OrderId = (
select data_type
from INFORMATION_SCHEMA.COLUMNS
where
table_name = @tableNameWithoutSchema and
table_schema = @schema and
column_name = 'Correlation_OrderId'
);
if (@dataType_OrderId <> 'uniqueidentifier')
begin
declare @error_OrderId nvarchar(max) = N'Incorrect data type for Correlation_OrderId. Expected uniqueidentifier got ' + @dataType_OrderId + '.';
throw 50000, @error_OrderId, 0
end
/* CreateIndex OrderId */
if not exists
(
select *
from sys.indexes
where
name = N'Index_Correlation_OrderId' and
object_id = object_id(@tableName)
)
begin
declare @createIndex_OrderId nvarchar(max);
set @createIndex_OrderId = N'
create unique index Index_Correlation_OrderId
on ' + @tableName + N'(Correlation_OrderId)
where Correlation_OrderId is not null;';
exec(@createIndex_OrderId);
end
/* PurgeObsoleteIndex */
declare @dropIndexQuery nvarchar(max);
select @dropIndexQuery =
(
select 'drop index ' + name + ' on ' + @tableName + ';'
from sysindexes
where
Id = object_id(@tableName) and
Name is not null and
Name like 'Index_Correlation_%' and
Name <> N'Index_Correlation_OrderNumber' and
Name <> N'Index_Correlation_OrderId'
);
exec sp_executesql @dropIndexQuery
/* PurgeObsoleteProperties */
declare @dropPropertiesQuery nvarchar(max);
select @dropPropertiesQuery =
(
select 'alter table ' + @tableName + ' drop column ' + column_name + ';'
from INFORMATION_SCHEMA.COLUMNS
where
table_name = @tableNameWithoutSchema and
table_schema = @schema and
column_name like 'Correlation_%' and
column_name <> N'Correlation_OrderNumber' and
column_name <> N'Correlation_OrderId'
);
exec sp_executesql @dropPropertiesQuery
/* CompleteSagaScript */
Drop Table
/* TableNameVariable */
declare @tableName nvarchar(max) = '[' + @schema + '].[' + @tablePrefix + N'OrderSaga]';
declare @tableNameWithoutSchema nvarchar(max) = @tablePrefix + N'OrderSaga';
/* DropTable */
if exists
(
select *
from sys.objects
where
object_id = object_id(@tableName)
and type in ('U')
)
begin
declare @dropTable nvarchar(max);
set @dropTable = 'drop table ' + @tableName;
exec(@dropTable);
end
Subscription
Create Table
declare @tableName nvarchar(max) = '[' + @schema + '].[' + @tablePrefix + 'SubscriptionData]';
if not exists
(
select *
from sys.objects
where
object_id = object_id(@tableName) and
type in ('U')
)
begin
declare @createTable nvarchar(max);
set @createTable = '
create table ' + @tableName + '(
Subscriber nvarchar(200) not null,
Endpoint nvarchar(200),
MessageType nvarchar(200) not null,
PersistenceVersion varchar(23) not null,
primary key clustered
(
Subscriber,
MessageType
)
)
';
exec(@createTable);
end
Drop Table
declare @tableName nvarchar(max) = '[' + @schema + '].[' + @tablePrefix + 'SubscriptionData]';
if exists
(
select *
from sys.objects
where
object_id = object_id(@tableName) and
type in ('U')
)
begin
declare @dropTable nvarchar(max);
set @dropTable = 'drop table ' + @tableName;
exec(@dropTable);
end
Timeout
Create Table
declare @tableName nvarchar(max) = '[' + @schema + '].[' + @tablePrefix + 'TimeoutData]';
if not exists (
select * from sys.objects
where
object_id = object_id(@tableName)
and type in ('U')
)
begin
declare @createTable nvarchar(max);
set @createTable = '
create table ' + @tableName + '(
Id uniqueidentifier not null primary key,
Destination nvarchar(200),
SagaId uniqueidentifier,
State varbinary(max),
Time datetime,
Headers nvarchar(max) not null,
PersistenceVersion varchar(23) not null
)
';
exec(@createTable);
end
if not exists
(
select *
from sys.indexes
where
name = 'Index_SagaId' and
object_id = object_id(@tableName)
)
begin
declare @createSagaIdIndex nvarchar(max);
set @createSagaIdIndex = '
create index Index_SagaId
on ' + @tableName + '(SagaId);';
exec(@createSagaIdIndex);
end
if not exists
(
select *
from sys.indexes
where
name = 'Index_Time' and
object_id = object_id(@tableName)
)
begin
declare @createTimeIndex nvarchar(max);
set @createTimeIndex = '
create index Index_Time
on ' + @tableName + '(Time);';
exec(@createTimeIndex);
end
Drop Table
declare @tableName nvarchar(max) = '[' + @schema + '].[' + @tablePrefix + 'TimeoutData]';
if exists
(
select *
from sys.objects
where
object_id = object_id(@tableName) and
type in ('U')
)
begin
declare @dropTable nvarchar(max);
set @dropTable = 'drop table ' + @tableName;
exec(@dropTable);
end
Run Time
SQL used at runtime to query and update data.
Outbox
Used at intervals to cleanup old outbox records.
delete top (@BatchSize) from [dbo].[EndpointNameOutboxData]
where Dispatched = 'true' and
DispatchedAt < @DispatchedBefore
Get
Used by IOutboxStorage.
.
select
Dispatched,
Operations
from [dbo].[EndpointNameOutboxData]
where MessageId = @MessageId
SetAsDispatched
Used by IOutboxStorage.
.
update [dbo].[EndpointNameOutboxData]
set
Dispatched = 1,
DispatchedAt = @DispatchedAt,
Operations = '[]'
where MessageId = @MessageId
Store
Used by IOutboxStorage.
.
Optimistic (default) mode
insert into [dbo].[EndpointNameOutboxData]
(
MessageId,
Operations,
PersistenceVersion
)
values
(
@MessageId,
@Operations,
@PersistenceVersion
)
Pessimistic mode
insert into [dbo].[EndpointNameOutboxData]
(
MessageId,
Operations,
PersistenceVersion
)
values
(
@MessageId,
'[]',
@PersistenceVersion
)
update [dbo].[EndpointNameOutboxData]
set
Operations = @Operations
where MessageId = @MessageId
Saga
Complete
Used by ISagaPersister.
.
delete from EndpointName_SagaName
where Id = @Id and Concurrency = @Concurrency
Save
Used by ISagaPersister.
.
insert into EndpointName_SagaName
(
Id,
Metadata,
Data,
PersistenceVersion,
SagaTypeVersion,
Concurrency,
Correlation_CorrelationProperty,
Correlation_TransitionalCorrelationProperty
)
values
(
@Id,
@Metadata,
@Data,
@PersistenceVersion,
@SagaTypeVersion,
1,
@CorrelationId,
@TransitionalCorrelationId
)
GetByProperty
Used by ISagaPersister.
.
select
Id,
SagaTypeVersion,
Concurrency,
Metadata,
Data
from EndpointName_SagaName
with (updlock)
where Correlation_PropertyName = @propertyValue
GetBySagaId
Used by ISagaPersister.
.
select
Id,
SagaTypeVersion,
Concurrency,
Metadata,
Data
from EndpointName_SagaName
with (updlock)
where Id = @Id
Update
Used by ISagaPersister.
.
update EndpointName_SagaName
set
Data = @Data,
PersistenceVersion = @PersistenceVersion,
SagaTypeVersion = @SagaTypeVersion,
Concurrency = @Concurrency + 1,
Correlation_TransitionalCorrelationProperty = @TransitionalCorrelationId
where
Id = @Id and Concurrency = @Concurrency
Select used by Saga Finder
select
Id,
SagaTypeVersion,
Concurrency,
Metadata,
Data
from EndpointName_SagaName
with (updlock)
where 1 = 1
Subscription
GetSubscribers
Used by ISubscriptionStorage.
.
select distinct Subscriber, Endpoint
from [dbo].[EndpointNameSubscriptionData]
where MessageType in (@type0)
Subscribe
Used by ISubscriptionStorage.
.
declare @dummy int;
merge [dbo].[EndpointNameSubscriptionData] with (holdlock, tablock) as target
using(select @Endpoint as Endpoint, @Subscriber as Subscriber, @MessageType as MessageType) as source
on target.Subscriber = source.Subscriber
and target.MessageType = source.MessageType
when matched and source.Endpoint is not null and (target.Endpoint is null or target.Endpoint <> source.Endpoint) then
update set Endpoint = @Endpoint, PersistenceVersion = @PersistenceVersion
when not matched then
insert
(
Subscriber,
MessageType,
Endpoint,
PersistenceVersion
)
values
(
@Subscriber,
@MessageType,
@Endpoint,
@PersistenceVersion
);
Unsubscribe
Used by ISubscriptionStorage.
.
delete from [dbo].[EndpointNameSubscriptionData]
where
Subscriber = @Subscriber and
MessageType = @MessageType
Timeout
Peek
Used by IPersistTimeouts.
.
select
Destination,
SagaId,
State,
Time,
Headers
from [dbo].[EndpointNameTimeoutData]
where Id = @Id
Add
Used by IPersistTimeouts.
.
insert into [dbo].[EndpointNameTimeoutData]
(
Id,
Destination,
SagaId,
State,
Time,
Headers,
PersistenceVersion
)
values
(
@Id,
@Destination,
@SagaId,
@State,
@Time,
@Headers,
@PersistenceVersion
)
GetNextChunk
Used by IQueryTimeouts.
.
select top 1 Time from [dbo].[EndpointNameTimeoutData]
where Time > @EndTime
order by Time
select Id, Time
from [dbo].[EndpointNameTimeoutData]
where Time > @StartTime and Time <= @EndTime
TryRemove
Used by IPersistTimeouts.
.
delete from [dbo].[EndpointNameTimeoutData]
where Id = @Id
RemoveTimeoutBy
Used by IPersistTimeouts.
.
delete from [dbo].[EndpointNameTimeoutData]
where SagaId = @SagaId