Scripts and SQL used when interacting with a Oracle database.
Build Time
Scripts created at build time and executed as part of a deployment or decommissioning of an endpoint.
Outbox
Create Table
declare
tableName varchar2(30) := UPPER(:tablePrefix) || 'OD';
pkName varchar2(30) := tableName || '_PK';
indexName varchar2(30) := tableName || '_IX';
createTable varchar2(500);
createIndex varchar2(500);
n number(10);
begin
select count(*) into n from user_tables where table_name = tableName;
if(n = 0)
then
createTable :=
'create table "' || tableName || '"
(
messageid nvarchar2(200) not null,
dispatched number(1,0) default 0 not null check
(
dispatched in (0,1)
),
dispatchedat timestamp,
operations clob not null,
persistenceversion varchar2(23) not null,
constraint "' || pkName || '" primary key
(
messageid
)
enable
)';
execute immediate createTable;
end if;
select count(*) into n from user_indexes where index_name = indexName;
if(n = 0)
then
createIndex :=
'create index "' || indexName || '" on "' || tableName || '" (dispatched, dispatchedat)';
execute immediate createIndex;
end if;
end;
Drop Table
declare
tableName varchar2(30) := UPPER(:tablePrefix) || 'OD';
dropTable varchar2(50);
n number(10);
begin
select count(*) into n from user_tables where table_name = tableName;
if(n = 1)
then
dropTable := 'drop table "' || tableName || '"';
execute immediate dropTable;
end if;
end;
Saga
For a Saga with the following structure
[SqlSaga(transitionalCorrelationProperty: nameof(OrderSagaData.OrderId))]
public class OrderSaga : Saga<OrderSaga.OrderSagaData>,
IAmStartedByMessages<StartSaga>
{
protected override void ConfigureHowToFindSaga(SagaPropertyMapper<OrderSagaData> mapper)
{
mapper.ConfigureMapping<StartSaga>(msg => msg.OrderNumber).ToSaga(saga => saga.OrderNumber);
}
public class OrderSagaData :
ContainSagaData
{
public int OrderNumber { get; set; }
public Guid OrderId { get; set; }
}
Create Table
/* TableNameVariable */
/* Initialize */
declare
sqlStatement varchar2(500);
dataType varchar2(30);
n number(10);
currentSchema varchar2(500);
begin
select sys_context('USERENV','CURRENT_SCHEMA') into currentSchema from dual;
/* CreateTable */
select count(*) into n from user_tables where table_name = 'ORDERSAGA';
if(n = 0)
then
sqlStatement :=
'create table "ORDERSAGA"
(
id varchar2(38) not null,
metadata clob not null,
data clob not null,
persistenceversion varchar2(23) not null,
sagatypeversion varchar2(23) not null,
concurrency number(9) not null,
constraint "ORDERSAGA_PK" primary key
(
id
)
enable
)';
execute immediate sqlStatement;
end if;
/* AddProperty OrderNumber */
select count(*) into n from all_tab_columns where table_name = 'ORDERSAGA' and column_name = 'CORR_ORDERNUMBER' and owner = currentSchema;
if(n = 0)
then
sqlStatement := 'alter table "ORDERSAGA" add ( CORR_ORDERNUMBER NUMBER(19) )';
execute immediate sqlStatement;
end if;
/* VerifyColumnType Int */
select data_type ||
case when char_length > 0 then
'(' || char_length || ')'
else
case when data_precision is not null then
'(' || data_precision ||
case when data_scale is not null and data_scale > 0 then
',' || data_scale
end || ')'
end
end into dataType
from all_tab_columns
where table_name = 'ORDERSAGA' and column_name = 'CORR_ORDERNUMBER' and owner = currentSchema;
if(dataType <> 'NUMBER(19)')
then
raise_application_error(-20000, 'Incorrect data type for Correlation_CORR_ORDERNUMBER. Expected "NUMBER(19)" got "' || dataType || '".');
end if;
/* WriteCreateIndex OrderNumber */
select count(*) into n from user_indexes where table_name = 'ORDERSAGA' and index_name = 'SAGAIDX_599F57BA89CF9D164E3CFF';
if(n = 0)
then
sqlStatement := 'create unique index "SAGAIDX_599F57BA89CF9D164E3CFF" on "ORDERSAGA" (CORR_ORDERNUMBER ASC)';
execute immediate sqlStatement;
end if;
/* AddProperty OrderId */
select count(*) into n from all_tab_columns where table_name = 'ORDERSAGA' and column_name = 'CORR_ORDERID' and owner = currentSchema;
if(n = 0)
then
sqlStatement := 'alter table "ORDERSAGA" add ( CORR_ORDERID VARCHAR2(38) )';
execute immediate sqlStatement;
end if;
/* VerifyColumnType Guid */
select data_type ||
case when char_length > 0 then
'(' || char_length || ')'
else
case when data_precision is not null then
'(' || data_precision ||
case when data_scale is not null and data_scale > 0 then
',' || data_scale
end || ')'
end
end into dataType
from all_tab_columns
where table_name = 'ORDERSAGA' and column_name = 'CORR_ORDERID' and owner = currentSchema;
if(dataType <> 'VARCHAR2(38)')
then
raise_application_error(-20000, 'Incorrect data type for Correlation_CORR_ORDERID. Expected "VARCHAR2(38)" got "' || dataType || '".');
end if;
/* CreateIndex OrderId */
select count(*) into n from user_indexes where table_name = 'ORDERSAGA' and index_name = 'SAGAIDX_FD8BAD844CFBBE419E43FE';
if(n = 0)
then
sqlStatement := 'create unique index "SAGAIDX_FD8BAD844CFBBE419E43FE" on "ORDERSAGA" (CORR_ORDERID ASC)';
execute immediate sqlStatement;
end if;
/* PurgeObsoleteIndex */
/* PurgeObsoleteProperties */
select count(*) into n
from all_tab_columns
where table_name = 'ORDERSAGA' and column_name like 'CORR_%' and
column_name <> 'CORR_ORDERNUMBER' and
column_name <> 'CORR_ORDERID' and owner = currentSchema;
if(n > 0)
then
select 'alter table "ORDERSAGA" drop column ' || column_name into sqlStatement
from all_tab_columns
where table_name = 'ORDERSAGA' and column_name like 'CORR_%' and
column_name <> 'CORR_ORDERNUMBER' and
column_name <> 'CORR_ORDERID' and owner = currentSchema;
execute immediate sqlStatement;
end if;
/* CompleteSagaScript */
end;
Drop Table
/* TableNameVariable */
/* DropTable */
declare
n number(10);
begin
select count(*) into n from user_tables where table_name = 'ORDERSAGA';
if(n > 0)
then
execute immediate 'drop table "ORDERSAGA"';
end if;
end;
Subscription
Create Table
declare
tableName varchar2(30) := UPPER(:tablePrefix) || 'SS';
createTable varchar2(500);
n number(10);
begin
select count(*) into n from user_tables where table_name = tableName;
if(n = 0)
then
createTable :=
'create table "' || tableName || '"
(
messagetype nvarchar2(200) not null,
subscriber nvarchar2(200) not null,
endpoint varchar2(200),
persistenceversion varchar2(23),
constraint "' || tableName || '_PK" primary key
(
messagetype
, subscriber
)
enable
)
organization index';
execute immediate createTable;
end if;
end;
Drop Table
declare
tableName varchar2(30) := UPPER(:tablePrefix) || 'SS';
dropTable varchar2(50);
n number(10);
begin
select count(*) into n from user_tables where table_name = tableName;
if(n = 1)
then
dropTable := 'drop table "' || tableName || '"';
execute immediate dropTable;
end if;
end;
Timeout
Create Table
declare
tableName varchar2(30) := UPPER(:tablePrefix) || 'TO';
timeIndex varchar2(30) := tableName || '_TK';
sagaIndex varchar2(30) := tableName || '_SK';
sqlStatement varchar2(500);
n number(10);
begin
select count(*) into n from user_tables where table_name = tableName;
if(n = 0)
then
execute immediate
'create table "' || tableName || '"
(
id varchar2(38) not null,
destination nvarchar2(200) not null,
sagaid varchar2(38),
state blob,
expiretime date,
headers clob not null,
persistenceversion varchar2(23) not null,
constraint "' || tableName || '_PK" primary key
(
id
)
enable
)';
end if;
select count(*) into n from user_indexes where index_name = timeIndex;
if(n = 0)
then
execute immediate 'create index "' || timeIndex || '" on "' || tableName || '" (expiretime asc)';
end if;
select count(*) into n from user_indexes where index_name = sagaIndex;
if(n = 0)
then
execute immediate 'create index "' || sagaIndex || '" on "' || tableName || '" (sagaid asc)';
end if;
end;
Drop Table
declare
tableName varchar2(30) := UPPER(:tablePrefix) || 'TO';
dropTable varchar2(50);
n number(10);
begin
select count(*) into n from user_tables where table_name = tableName;
if(n = 1)
then
dropTable := 'drop table "' || tableName || '"';
execute immediate dropTable;
end if;
end;
Run Time
SQL used at runtime to query and update data.
Outbox
Used at intervals to cleanup old outbox records.
delete from "ENDPOINTNAMEOD"
where Dispatched = 1 and
DispatchedAt < :DispatchedBefore and
rownum <= :BatchSize
Get
Used by IOutboxStorage.
.
select
Dispatched,
Operations
from "ENDPOINTNAMEOD"
where MessageId = :MessageId
SetAsDispatched
Used by IOutboxStorage.
.
update "ENDPOINTNAMEOD"
set
Dispatched = 1,
DispatchedAt = :DispatchedAt,
Operations = '[]'
where MessageId = :MessageId
Store
Used by IOutboxStorage.
.
Optimistic (default) mode
insert into "ENDPOINTNAMEOD"
(
MessageId,
Operations,
PersistenceVersion
)
values
(
:MessageId,
:Operations,
:PersistenceVersion
)
Pessimistic mode
insert into "ENDPOINTNAMEOD"
(
MessageId,
Operations,
PersistenceVersion
)
values
(
:MessageId,
'[]',
:PersistenceVersion
)
update "ENDPOINTNAMEOD"
set
Operations = :Operations
where MessageId = :MessageId
Saga
Complete
Used by ISagaPersister.
.
delete from EndpointName_SagaName
where Id = :Id and Concurrency = :Concurrency
Save
Used by ISagaPersister.
.
insert into EndpointName_SagaName
(
Id,
Metadata,
Data,
PersistenceVersion,
SagaTypeVersion,
Concurrency,
CORR_CORRELATIONPROPERTY,
CORR_TRANSITIONALCORRELATIONPR
)
values
(
:Id,
:Metadata,
:Data,
:PersistenceVersion,
:SagaTypeVersion,
1,
:CorrelationId,
:TransitionalCorrelationId
)
GetByProperty
Used by ISagaPersister.
.
select
Id,
SagaTypeVersion,
Concurrency,
Metadata,
Data
from EndpointName_SagaName
where CORR_PROPERTYNAME = :propertyValue
for update
GetBySagaId
Used by ISagaPersister.
.
select
Id,
SagaTypeVersion,
Concurrency,
Metadata,
Data
from EndpointName_SagaName
where Id = :Id
for update
Update
Used by ISagaPersister.
.
update EndpointName_SagaName
set
Data = :Data,
PersistenceVersion = :PersistenceVersion,
SagaTypeVersion = :SagaTypeVersion,
Concurrency = :Concurrency + 1,
CORR_TRANSITIONALCORRELATIONPR = :TransitionalCorrelationId
where
Id = :Id and Concurrency = :Concurrency
Subscription
GetSubscribers
Used by ISubscriptionStorage.
.
select distinct Subscriber, Endpoint
from "ENDPOINTNAMESS"
where MessageType in (:type0)
Subscribe
Used by ISubscriptionStorage.
.
begin
insert into "ENDPOINTNAMESS"
(
MessageType,
Subscriber,
Endpoint,
PersistenceVersion
)
values
(
:MessageType,
:Subscriber,
:Endpoint,
:PersistenceVersion
);
commit;
exception
when DUP_VAL_ON_INDEX then
if :Endpoint is not null then
update "ENDPOINTNAMESS" set
Endpoint = :Endpoint,
PersistenceVersion = :PersistenceVersion
where
MessageType = :MessageType
and Subscriber = :Subscriber;
else
ROLLBACK;
end if;
end;
Unsubscribe
Used by ISubscriptionStorage.
.
delete from "ENDPOINTNAMESS"
where
Subscriber = :Subscriber and
MessageType = :MessageType
Timeout
Peek
Used by IPersistTimeouts.
.
select
Destination,
SagaId,
State,
ExpireTime,
Headers
from "ENDPOINTNAMETO"
where Id = :Id
Add
Used by IPersistTimeouts.
.
insert into "ENDPOINTNAMETO"
(
Id,
Destination,
SagaId,
State,
ExpireTime,
Headers,
PersistenceVersion
)
values
(
:Id,
:Destination,
:SagaId,
:State,
:Time,
:Headers,
:PersistenceVersion
)
GetNextChunk
Used by IQueryTimeouts.
.
select ExpireTime
from
(
select ExpireTime from "ENDPOINTNAMETO"
where ExpireTime > :EndTime
order by ExpireTime
) subquery
where rownum <= 1
select Id, ExpireTime
from "ENDPOINTNAMETO"
where ExpireTime > :StartTime and ExpireTime <= :EndTime
TryRemove
Used by IPersistTimeouts.
.
delete from "ENDPOINTNAMETO"
where Id = :Id
RemoveTimeoutBy
Used by IPersistTimeouts.
.
delete from "ENDPOINTNAMETO"
where SagaId = :SagaId