Getting Started
Architecture
NServiceBus
Transports
ServiceInsight
ServicePulse
ServiceControl
Monitoring
Samples

PostgreSQL Scripts

Component: Sql Persistence
Target Version: NServiceBus 8.x

Scripts and SQL statements used when interacting with a PostgreSQL database.

Build Time

Scripts are created at build time and can be executed manually as part of a deployment or decommissioning of an endpoint.

Outbox

Create Table

create or replace function pg_temp.create_outbox_table(tablePrefix varchar, schema varchar)
  returns integer as
  $body$
    declare
      tableNameNonQuoted varchar;
      createTable text;
    begin
        tableNameNonQuoted := tablePrefix || 'OutboxData';
        createTable = 'create table if not exists "' || schema || '"."' || tableNameNonQuoted || '"
    (
        "MessageId" character varying(200),
        "Dispatched" boolean not null default false,
        "DispatchedAt" timestamp,
        "PersistenceVersion" character varying(23),
        "Operations" jsonb not null,
        primary key ("MessageId")
    );
    create index if not exists "Index_DispatchedAt" on "' || schema || '"."' || tableNameNonQuoted || '" using btree ("DispatchedAt" asc nulls last);
    create index if not exists "Index_Dispatched" on "' || schema || '"."' || tableNameNonQuoted || '" using btree ("Dispatched" asc nulls last);
';
        execute createTable;
        return 0;
    end;
  $body$
  language 'plpgsql';

select pg_temp.create_outbox_table(@tablePrefix, @schema);

Drop Table

create or replace function pg_temp.drop_outbox_table(tablePrefix varchar, schema varchar)
  returns integer as
  $body$
    declare
      tableNameNonQuoted varchar;
      dropTable text;
    begin
        tableNameNonQuoted := tablePrefix || 'OutboxData';
        dropTable = 'drop table if exists "' || schema || '"."' || tableNameNonQuoted || '";';
        execute dropTable;
        return 0;
    end;
  $body$
  language 'plpgsql';

select pg_temp.drop_outbox_table(@tablePrefix, @schema);

Saga

For a Saga with the following structure

[SqlSaga(transitionalCorrelationProperty: nameof(OrderSagaData.OrderId))]
public class OrderSaga : Saga<OrderSaga.OrderSagaData>,
    IAmStartedByMessages<StartSaga>
{
    protected override void ConfigureHowToFindSaga(SagaPropertyMapper<OrderSagaData> mapper)
    {
        mapper.ConfigureMapping<StartSaga>(msg => msg.OrderNumber).ToSaga(saga => saga.OrderNumber);
    }

    public class OrderSagaData :
        ContainSagaData
    {
        public int OrderNumber { get; set; }
        public Guid OrderId { get; set; }
    }

Create Table

/* TableNameVariable */

/* Initialize */

/* CreateTable */

create or replace function pg_temp.create_saga_table_OrderSaga(tablePrefix varchar, schema varchar)
    returns integer as
    $body$
    declare
        tableNameNonQuoted varchar;
        script text;
        count int;
        columnType varchar;
        columnToDelete text;
    begin
        tableNameNonQuoted := tablePrefix || 'OrderSaga';
        script = 'create table if not exists "' || schema || '"."' || tableNameNonQuoted || '"
(
    "Id" uuid not null,
    "Metadata" text not null,
    "Data" jsonb not null,
    "PersistenceVersion" character varying(23),
    "SagaTypeVersion" character varying(23),
    "Concurrency" int not null,
    primary key("Id")
);';
        execute script;

/* AddProperty OrderNumber */

        script = 'alter table "' || schema || '"."' || tableNameNonQuoted || '" add column if not exists "Correlation_OrderNumber" integer';
        execute script;

/* VerifyColumnType Int */

        columnType := (
            select data_type
            from information_schema.columns
            where
            table_schema = schema and
            table_name = tableNameNonQuoted and
            column_name = 'Correlation_OrderNumber'
        );
        if columnType <> 'integer' then
            raise exception 'Incorrect data type for Correlation_OrderNumber. Expected "integer" got "%"', columnType;
        end if;

/* WriteCreateIndex OrderNumber */

        script = 'create unique index if not exists "' || tablePrefix || '_i_919CC583A044D760C1FA433AD451B9E67BFD7C07" on "' || schema || '"."' || tableNameNonQuoted || '" using btree ("Correlation_OrderNumber" asc);';
        execute script;
/* AddProperty OrderId */

        script = 'alter table "' || schema || '"."' || tableNameNonQuoted || '" add column if not exists "Correlation_OrderId" uuid';
        execute script;

/* VerifyColumnType Guid */

        columnType := (
            select data_type
            from information_schema.columns
            where
            table_schema = schema and
            table_name = tableNameNonQuoted and
            column_name = 'Correlation_OrderId'
        );
        if columnType <> 'uuid' then
            raise exception 'Incorrect data type for Correlation_OrderId. Expected "uuid" got "%"', columnType;
        end if;

/* CreateIndex OrderId */

        script = 'create unique index if not exists "' || tablePrefix || '_i_9ACA1354611B1EEE42F739F02F62E0AB88D036F3" on "' || schema || '"."' || tableNameNonQuoted || '" using btree ("Correlation_OrderId" asc);';
        execute script;
/* PurgeObsoleteIndex */

/* PurgeObsoleteProperties */

for columnToDelete in
(
    select column_name
    from information_schema.columns
    where
        table_name = tableNameNonQuoted and
        column_name LIKE 'Correlation_%' and
        column_name <> 'Correlation_OrderNumber' and
        column_name <> 'Correlation_OrderId'
)
loop
	script = '
alter table "' || schema || '"."' || tableNameNonQuoted || '"
drop column "' || columnToDelete || '"';
    execute script;
end loop;

/* CompleteSagaScript */

        return 0;
    end;
    $body$
language 'plpgsql';

select pg_temp.create_saga_table_OrderSaga(@tablePrefix, @schema);

Drop Table

/* TableNameVariable */

/* DropTable */
create or replace function pg_temp.drop_saga_table_OrderSaga(tablePrefix varchar, schema varchar)
    returns integer as
    $body$
    declare
        tableNameNonQuoted varchar;
        dropTable text;
    begin
        tableNameNonQuoted := tablePrefix || 'OrderSaga';
        dropTable = 'drop table if exists "' || schema || '"."' || tableNameNonQuoted || '";';
        execute dropTable;
        return 0;
    end;
    $body$
    language 'plpgsql';

select pg_temp.drop_saga_table_OrderSaga(@tablePrefix, @schema);

Subscription

Create Table

create or replace function pg_temp.create_subscription_table(tablePrefix varchar, schema varchar)
  returns integer as
  $body$
    declare
      tableNameNonQuoted varchar;
      createTable text;
    begin
        tableNameNonQuoted := tablePrefix || 'SubscriptionData';
        createTable = 'create table if not exists "' || schema || '"."' || tableNameNonQuoted || '"
    (
        "Id" character varying(400) not null,
        "Subscriber" character varying(200) not null,
        "Endpoint" character varying(200),
        "MessageType" character varying(200) not null,
        "PersistenceVersion" character varying(200) not null,
        primary key ("Id")
    );
';
        execute createTable;
        return 0;
    end;
  $body$
  language 'plpgsql';

select pg_temp.create_subscription_table(@tablePrefix, @schema);

Drop Table

create or replace function pg_temp.drop_subscription_table(tablePrefix varchar, schema varchar)
  returns integer as
  $body$
    declare
      tableNameNonQuoted varchar;
      dropTable text;
    begin
        tableNameNonQuoted := tablePrefix || 'SubscriptionData';
        dropTable = 'drop table if exists "' || schema || '"."' || tableNameNonQuoted || '";';
        execute dropTable;
        return 0;
    end;
  $body$
  language 'plpgsql';

select pg_temp.drop_subscription_table(@tablePrefix, @schema);

Timeout

Create Table

create or replace function pg_temp.create_timeouts_table(tablePrefix varchar, schema varchar)
  returns integer as
  $body$
    declare
      tableName varchar;
      timeIndexName varchar;
      sagaIndexName varchar;
      createTable text;
    begin
        tableName := tablePrefix || 'TimeoutData';
        timeIndexName := tableName || '_TimeIdx';
        sagaIndexName := tableName || '_SagaIdx';
        createTable = 'create table if not exists "' || schema || '"."' || tableName || '"
    (
        "Id" uuid not null,
        "Destination" character varying(200),
        "SagaId" uuid,
        "State" bytea,
        "Time" timestamp,
        "Headers" text,
        "PersistenceVersion" character varying(23),
        primary key ("Id")
    );
    create index if not exists "' || timeIndexName || '" on "' || schema || '"."' || tableName || '" using btree ("Time" asc nulls last);
    create index if not exists "' || sagaIndexName || '" on "' || schema || '"."' || tableName || '" using btree ("SagaId" asc nulls last);
';
        execute createTable;
        return 0;
    end;
  $body$
  language 'plpgsql';

select pg_temp.create_timeouts_table(@tablePrefix, @schema);

Drop Table

create or replace function pg_temp.drop_timeouts_table(tablePrefix varchar, schema varchar)
  returns integer as
  $body$
    declare
      tableNameNonQuoted varchar;
      dropTable text;
    begin
        tableNameNonQuoted := tablePrefix || 'TimeoutData';
        dropTable = 'drop table if exists "' || schema || '"."' || tableNameNonQuoted || '";';
        execute dropTable;
        return 0;
    end;
  $body$
  language 'plpgsql';

select pg_temp.drop_timeouts_table(@tablePrefix, @schema);

Run Time

These are the SQL scripts used at runtime to query and update data.

Outbox

Used at intervals to cleanup old outbox records.

delete from "public"."EndpointNameOutboxData"
where ctid in
(
    select ctid
    from "public"."EndpointNameOutboxData"
    where
        "Dispatched" = true and
        "DispatchedAt" < @DispatchedBefore
    limit @BatchSize
)

Get

Used by IOutboxStorage.SetAsDispatched.

select
    "Dispatched",
    "Operations"
from "public"."EndpointNameOutboxData"
where "MessageId" = @MessageId

SetAsDispatched

Used by IOutboxStorage.SetAsDispatched.

update "public"."EndpointNameOutboxData"
set
    "Dispatched" = true,
    "DispatchedAt" = @DispatchedAt,
    "Operations" = '[]'
where "MessageId" = @MessageId

Store

Used by IOutboxStorage.Store.

Optimistic (default) mode
insert into "public"."EndpointNameOutboxData"
(
    "MessageId",
    "Operations",
    "PersistenceVersion"
)
values
(
    @MessageId,
    @Operations,
    @PersistenceVersion
)
Pessimistic mode
insert into "public"."EndpointNameOutboxData"
(
    "MessageId",
    "Operations",
    "PersistenceVersion"
)
values
(
    @MessageId,
    '[]',
    @PersistenceVersion
)
update "public"."EndpointNameOutboxData"
set
    "Operations" = @Operations
where "MessageId" = @MessageId

Saga

Complete

Used by ISagaPersister.Complete.

delete from EndpointName_SagaName
where "Id" = @Id and "Concurrency" = @Concurrency

Save

Used by ISagaPersister.Save.

insert into EndpointName_SagaName
(
    "Id",
    "Metadata",
    "Data",
    "PersistenceVersion",
    "SagaTypeVersion",
    "Concurrency",
    "Correlation_CorrelationProperty",
    "Correlation_TransitionalCorrelationProperty"
)
values
(
    @Id,
    @Metadata,
    @Data,
    @PersistenceVersion,
    @SagaTypeVersion,
    1,
    @CorrelationId,
    @TransitionalCorrelationId
)

GetByProperty

Used by ISagaPersister.Get(propertyName...).

select
    "Id",
    "SagaTypeVersion",
    "Concurrency",
    "Metadata",
    "Data"
from EndpointName_SagaName
where "Correlation_PropertyName" = @propertyValue
for update

GetBySagaId

Used by ISagaPersister.Get(sagaId...).

select
    "Id",
    "SagaTypeVersion",
    "Concurrency",
    "Metadata",
    "Data"
from EndpointName_SagaName
where "Id" = @Id
for update

Update

Used by ISagaPersister.Update.

update EndpointName_SagaName
set
    "Data" = @Data,
    "PersistenceVersion" = @PersistenceVersion,
    "SagaTypeVersion" = @SagaTypeVersion,
    "Concurrency" = @Concurrency + 1,
    "Correlation_TransitionalCorrelationProperty" = @TransitionalCorrelationId
where
    "Id" = @Id and "Concurrency" = @Concurrency

Select used by Saga Finder

select
    "Id",
    "SagaTypeVersion",
    "Concurrency",
    "Metadata",
    "Data"
from EndpointName_SagaName
where 1 = 1
for update

Subscriptions

GetSubscribers

Used by ISubscriptionStorage.GetSubscriberAddressesForMessage.

select distinct "Subscriber", "Endpoint"
from "public"."EndpointNameSubscriptionData"
where "MessageType" in (@type0)

Subscribe

Used by ISubscriptionStorage.Subscribe.

insert into "public"."EndpointNameSubscriptionData"
(
    "Id",
    "Subscriber",
    "MessageType",
    "Endpoint",
    "PersistenceVersion"
)
values
(
    concat(@Subscriber, @MessageType),
    @Subscriber,
    @MessageType,
    @Endpoint,
    @PersistenceVersion
)
on conflict ("Id") do update
    set "Endpoint" = coalesce(@Endpoint, "public"."EndpointNameSubscriptionData"."Endpoint"),
        "PersistenceVersion" = @PersistenceVersion

Unsubscribe

Used by ISubscriptionStorage.Unsubscribe.

delete from "public"."EndpointNameSubscriptionData"
where
    "Subscriber" = @Subscriber and
    "MessageType" = @MessageType

Timeouts

Peek

Used by IPersistTimeouts.Peek.

select
    "Destination",
    "SagaId",
    "State",
    "Time",
    "Headers"
from "public"."EndpointNameTimeoutData"
where "Id" = @Id

Add

Used by IPersistTimeouts.Add.

insert into "public"."EndpointNameTimeoutData"
(
    "Id",
    "Destination",
    "SagaId",
    "State",
    "Time",
    "Headers",
    "PersistenceVersion"
)
values
(
    @Id,
    @Destination,
    @SagaId,
    @State,
    @Time,
    @Headers,
    @PersistenceVersion
)

GetNextChunk

Used by IQueryTimeouts.GetNextChunk.

select "Time" from "public"."EndpointNameTimeoutData"
where "Time" > @EndTime
order by "Time"
limit 1
select "Id", "Time"
from "public"."EndpointNameTimeoutData"
where "Time" > @StartTime and "Time" <= @EndTime

TryRemove

Used by IPersistTimeouts.TryRemove.

delete from "public"."EndpointNameTimeoutData"
where "Id" = @Id;

RemoveTimeoutBy

Used by IPersistTimeouts.RemoveTimeoutBy.

delete from "public"."EndpointNameTimeoutData"
where "SagaId" = @SagaId

Related Articles