Getting Started
Architecture
NServiceBus
Transports
ServiceInsight
ServicePulse
ServiceControl
Monitoring
Samples

Oracle Scripts

Component: Sql Persistence
Target Version: NServiceBus 8.x

Scripts and SQL used when interacting with a Oracle database.

Build Time

Scripts created at build time and executed as part of a deployment or decommissioning of an endpoint.

Outbox

Create Table

declare
  tableName varchar2(30) := UPPER(:tablePrefix) || 'OD';
  pkName varchar2(30) := tableName || '_PK';
  indexName varchar2(30) := tableName || '_IX';
  createTable varchar2(500);
  createIndex varchar2(500);
  n number(10);
begin
  select count(*) into n from user_tables where table_name = tableName;
  if(n = 0)
  then

    createTable :=
       'create table "' || tableName || '"
        (
          messageid nvarchar2(200) not null,
          dispatched number(1,0) default 0 not null check
          (
            dispatched in (0,1)
          ),
          dispatchedat timestamp,
          operations clob not null,
          persistenceversion varchar2(23) not null,
          constraint "' || pkName || '" primary key
          (
            messageid
          )
          enable
        )';

    execute immediate createTable;

  end if;

  select count(*) into n from user_indexes where index_name = indexName;
  if(n = 0)
  then

    createIndex :=
      'create index "' || indexName || '" on "' || tableName || '" (dispatched, dispatchedat)';

    execute immediate createIndex;

  end if;
end;

Drop Table

declare
  tableName varchar2(30) := UPPER(:tablePrefix) || 'OD';
  dropTable varchar2(50);
  n number(10);
begin
  select count(*) into n from user_tables where table_name = tableName;
  if(n = 1)
  then

    dropTable := 'drop table "' || tableName || '"';

    execute immediate dropTable;

  end if;
end;

Saga

For a Saga with the following structure

[SqlSaga(transitionalCorrelationProperty: nameof(OrderSagaData.OrderId))]
public class OrderSaga : Saga<OrderSaga.OrderSagaData>,
    IAmStartedByMessages<StartSaga>
{
    protected override void ConfigureHowToFindSaga(SagaPropertyMapper<OrderSagaData> mapper)
    {
        mapper.ConfigureMapping<StartSaga>(msg => msg.OrderNumber).ToSaga(saga => saga.OrderNumber);
    }

    public class OrderSagaData :
        ContainSagaData
    {
        public int OrderNumber { get; set; }
        public Guid OrderId { get; set; }
    }

Create Table

/* TableNameVariable */

/* Initialize */

declare
  sqlStatement varchar2(500);
  dataType varchar2(30);
  n number(10);
  currentSchema varchar2(500);
begin
  select sys_context('USERENV','CURRENT_SCHEMA') into currentSchema from dual;

/* CreateTable */

  select count(*) into n from user_tables where table_name = 'ORDERSAGA';
  if(n = 0)
  then

    sqlStatement :=
       'create table "ORDERSAGA"
       (
          id varchar2(38) not null,
          metadata clob not null,
          data clob not null,
          persistenceversion varchar2(23) not null,
          sagatypeversion varchar2(23) not null,
          concurrency number(9) not null,
          constraint "ORDERSAGA_PK" primary key
          (
            id
          )
          enable
        )';

    execute immediate sqlStatement;

  end if;

/* AddProperty OrderNumber */

select count(*) into n from all_tab_columns where table_name = 'ORDERSAGA' and column_name = 'CORR_ORDERNUMBER' and owner = currentSchema;
if(n = 0)
then
  sqlStatement := 'alter table "ORDERSAGA" add ( CORR_ORDERNUMBER NUMBER(19) )';

  execute immediate sqlStatement;
end if;

/* VerifyColumnType Int */

select data_type ||
  case when char_length > 0 then
    '(' || char_length || ')'
  else
    case when data_precision is not null then
      '(' || data_precision ||
        case when data_scale is not null and data_scale > 0 then
          ',' || data_scale
        end || ')'
    end
  end into dataType
from all_tab_columns
where table_name = 'ORDERSAGA' and column_name = 'CORR_ORDERNUMBER' and owner = currentSchema;

if(dataType <> 'NUMBER(19)')
then
  raise_application_error(-20000, 'Incorrect data type for Correlation_CORR_ORDERNUMBER.  Expected "NUMBER(19)" got "' || dataType || '".');
end if;

/* WriteCreateIndex OrderNumber */

select count(*) into n from user_indexes where table_name = 'ORDERSAGA' and index_name = 'SAGAIDX_599F57BA89CF9D164E3CFF';
if(n = 0)
then
  sqlStatement := 'create unique index "SAGAIDX_599F57BA89CF9D164E3CFF" on "ORDERSAGA" (CORR_ORDERNUMBER ASC)';

  execute immediate sqlStatement;
end if;

/* AddProperty OrderId */

select count(*) into n from all_tab_columns where table_name = 'ORDERSAGA' and column_name = 'CORR_ORDERID' and owner = currentSchema;
if(n = 0)
then
  sqlStatement := 'alter table "ORDERSAGA" add ( CORR_ORDERID VARCHAR2(38) )';

  execute immediate sqlStatement;
end if;

/* VerifyColumnType Guid */

select data_type ||
  case when char_length > 0 then
    '(' || char_length || ')'
  else
    case when data_precision is not null then
      '(' || data_precision ||
        case when data_scale is not null and data_scale > 0 then
          ',' || data_scale
        end || ')'
    end
  end into dataType
from all_tab_columns
where table_name = 'ORDERSAGA' and column_name = 'CORR_ORDERID' and owner = currentSchema;

if(dataType <> 'VARCHAR2(38)')
then
  raise_application_error(-20000, 'Incorrect data type for Correlation_CORR_ORDERID.  Expected "VARCHAR2(38)" got "' || dataType || '".');
end if;

/* CreateIndex OrderId */

select count(*) into n from user_indexes where table_name = 'ORDERSAGA' and index_name = 'SAGAIDX_FD8BAD844CFBBE419E43FE';
if(n = 0)
then
  sqlStatement := 'create unique index "SAGAIDX_FD8BAD844CFBBE419E43FE" on "ORDERSAGA" (CORR_ORDERID ASC)';

  execute immediate sqlStatement;
end if;

/* PurgeObsoleteIndex */

/* PurgeObsoleteProperties */

select count(*) into n
from all_tab_columns
where table_name = 'ORDERSAGA' and column_name like 'CORR_%' and
        column_name <> 'CORR_ORDERNUMBER' and
        column_name <> 'CORR_ORDERID' and owner = currentSchema;

if(n > 0)
then

  select 'alter table "ORDERSAGA" drop column ' || column_name into sqlStatement
  from all_tab_columns
  where table_name = 'ORDERSAGA' and column_name like 'CORR_%' and
        column_name <> 'CORR_ORDERNUMBER' and
        column_name <> 'CORR_ORDERID' and owner = currentSchema;

  execute immediate sqlStatement;

end if;

/* CompleteSagaScript */
end;

Drop Table

/* TableNameVariable */

/* DropTable */

declare
  n number(10);
begin
  select count(*) into n from user_tables where table_name = 'ORDERSAGA';
  if(n > 0)
  then
    execute immediate 'drop table "ORDERSAGA"';
  end if;
end;

Subscription

Create Table

declare
  tableName varchar2(30) := UPPER(:tablePrefix) || 'SS';
  createTable varchar2(500);
  n number(10);
begin
  select count(*) into n from user_tables where table_name = tableName;
  if(n = 0)
  then

    createTable :=
       'create table "' || tableName || '"
        (
          messagetype nvarchar2(200) not null,
          subscriber nvarchar2(200) not null,
          endpoint varchar2(200),
          persistenceversion varchar2(23),
          constraint "' || tableName || '_PK" primary key
          (
            messagetype
          , subscriber
          )
          enable
        )
        organization index';

    execute immediate createTable;

  end if;
end;

Drop Table

declare
  tableName varchar2(30) := UPPER(:tablePrefix) || 'SS';
  dropTable varchar2(50);
  n number(10);
begin
  select count(*) into n from user_tables where table_name = tableName;
  if(n = 1)
  then

    dropTable := 'drop table "' || tableName || '"';

    execute immediate dropTable;

  end if;
end;

Timeout

Create Table

declare
  tableName varchar2(30) := UPPER(:tablePrefix) || 'TO';
  timeIndex varchar2(30) := tableName || '_TK';
  sagaIndex varchar2(30) := tableName || '_SK';
  sqlStatement varchar2(500);
  n number(10);
begin
  select count(*) into n from user_tables where table_name = tableName;
  if(n = 0)
  then

    execute immediate
       'create table "' || tableName || '"
       (
         id varchar2(38) not null,
         destination nvarchar2(200) not null,
         sagaid varchar2(38),
         state blob,
         expiretime date,
         headers clob not null,
         persistenceversion varchar2(23) not null,
         constraint "' || tableName || '_PK" primary key
         (
           id
         )
         enable
       )';

  end if;

  select count(*) into n from user_indexes where index_name = timeIndex;
  if(n = 0)
  then
    execute immediate 'create index "' || timeIndex || '" on "' || tableName || '" (expiretime asc)';
  end if;

  select count(*) into n from user_indexes where index_name = sagaIndex;
  if(n = 0)
  then
    execute immediate 'create index "' || sagaIndex || '" on "' || tableName || '" (sagaid asc)';
  end if;
end;

Drop Table

declare
  tableName varchar2(30) := UPPER(:tablePrefix) || 'TO';
  dropTable varchar2(50);
  n number(10);
begin
  select count(*) into n from user_tables where table_name = tableName;
  if(n = 1)
  then

    dropTable := 'drop table "' || tableName || '"';

    execute immediate dropTable;

  end if;
end;

Run Time

SQL used at runtime to query and update data.

Outbox

Used at intervals to cleanup old outbox records.

delete from "ENDPOINTNAMEOD"
where Dispatched = 1 and
      DispatchedAt < :DispatchedBefore and
      rownum <= :BatchSize

Get

Used by IOutboxStorage.SetAsDispatched.

select
    Dispatched,
    Operations
from "ENDPOINTNAMEOD"
where MessageId = :MessageId

SetAsDispatched

Used by IOutboxStorage.SetAsDispatched.

update "ENDPOINTNAMEOD"
set
    Dispatched = 1,
    DispatchedAt = :DispatchedAt,
    Operations = '[]'
where MessageId = :MessageId

Store

Used by IOutboxStorage.Store.

Optimistic (default) mode
insert into "ENDPOINTNAMEOD"
(
    MessageId,
    Operations,
    PersistenceVersion
)
values
(
    :MessageId,
    :Operations,
    :PersistenceVersion
)
Pessimistic mode
insert into "ENDPOINTNAMEOD"
(
    MessageId,
    Operations,
    PersistenceVersion
)
values
(
    :MessageId,
    '[]',
    :PersistenceVersion
)
update "ENDPOINTNAMEOD"
set
    Operations = :Operations
where MessageId = :MessageId

Saga

Complete

Used by ISagaPersister.Complete.

delete from EndpointName_SagaName
where Id = :Id and Concurrency = :Concurrency

Save

Used by ISagaPersister.Save.

insert into EndpointName_SagaName
(
    Id,
    Metadata,
    Data,
    PersistenceVersion,
    SagaTypeVersion,
    Concurrency,
    CORR_CORRELATIONPROPERTY,
    CORR_TRANSITIONALCORRELATIONPR
)
values
(
    :Id,
    :Metadata,
    :Data,
    :PersistenceVersion,
    :SagaTypeVersion,
    1,
    :CorrelationId,
    :TransitionalCorrelationId
)

GetByProperty

Used by ISagaPersister.Get(propertyName...).

select
    Id,
    SagaTypeVersion,
    Concurrency,
    Metadata,
    Data
from EndpointName_SagaName
where CORR_PROPERTYNAME = :propertyValue
for update

GetBySagaId

Used by ISagaPersister.Get(sagaId...).

select
    Id,
    SagaTypeVersion,
    Concurrency,
    Metadata,
    Data
from EndpointName_SagaName
where Id = :Id
for update

Update

Used by ISagaPersister.Update.

update EndpointName_SagaName
set
    Data = :Data,
    PersistenceVersion = :PersistenceVersion,
    SagaTypeVersion = :SagaTypeVersion,
    Concurrency = :Concurrency + 1,
    CORR_TRANSITIONALCORRELATIONPR = :TransitionalCorrelationId
where
    Id = :Id and Concurrency = :Concurrency

Subscription

GetSubscribers

Used by ISubscriptionStorage.GetSubscriberAddressesForMessage.

select distinct Subscriber, Endpoint
from "ENDPOINTNAMESS"
where MessageType in (:type0)

Subscribe

Used by ISubscriptionStorage.Subscribe.

begin
    insert into "ENDPOINTNAMESS"
    (
        MessageType,
        Subscriber,
        Endpoint,
        PersistenceVersion
    )
    values
    (
        :MessageType,
        :Subscriber,
        :Endpoint,
        :PersistenceVersion
    );
    commit;
exception
    when DUP_VAL_ON_INDEX then
    if :Endpoint is not null then
        update "ENDPOINTNAMESS" set
            Endpoint = :Endpoint,
            PersistenceVersion = :PersistenceVersion
        where 
            MessageType = :MessageType
            and Subscriber = :Subscriber;
    else
        ROLLBACK;
    end if;
end;

Unsubscribe

Used by ISubscriptionStorage.Unsubscribe.

delete from "ENDPOINTNAMESS"
where
    Subscriber = :Subscriber and
    MessageType = :MessageType

Timeout

Peek

Used by IPersistTimeouts.Peek.

select
    Destination,
    SagaId,
    State,
    ExpireTime,
    Headers
from "ENDPOINTNAMETO"
where Id = :Id

Add

Used by IPersistTimeouts.Add.

insert into "ENDPOINTNAMETO"
(
    Id,
    Destination,
    SagaId,
    State,
    ExpireTime,
    Headers,
    PersistenceVersion
)
values
(
    :Id,
    :Destination,
    :SagaId,
    :State,
    :Time,
    :Headers,
    :PersistenceVersion
)

GetNextChunk

Used by IQueryTimeouts.GetNextChunk.

select ExpireTime
from
(
    select ExpireTime from "ENDPOINTNAMETO"
    where ExpireTime > :EndTime
    order by ExpireTime
) subquery
where rownum <= 1
select Id, ExpireTime
from "ENDPOINTNAMETO"
where ExpireTime > :StartTime and ExpireTime <= :EndTime

TryRemove

Used by IPersistTimeouts.TryRemove.

delete from "ENDPOINTNAMETO"
where Id = :Id

RemoveTimeoutBy

Used by IPersistTimeouts.RemoveTimeoutBy.

delete from "ENDPOINTNAMETO"
where SagaId = :SagaId

Related Articles


Last modified