Scripts and SQL statements used when interacting with a PostgreSQL database.
Build Time
Scripts are created at build time and can be executed manually as part of a deployment or decommissioning of an endpoint.
Outbox
Create Table
create or replace function pg_temp.create_outbox_table(tablePrefix varchar, schema varchar)
returns integer as
$body$
declare
tableNameNonQuoted varchar;
createTable text;
begin
tableNameNonQuoted := tablePrefix || 'OutboxData';
createTable = 'create table if not exists "' || schema || '"."' || tableNameNonQuoted || '"
(
"MessageId" character varying(200),
"Dispatched" boolean not null default false,
"DispatchedAt" timestamp,
"PersistenceVersion" character varying(23),
"Operations" jsonb not null,
primary key ("MessageId")
);
create index if not exists "Index_DispatchedAt" on "' || schema || '"."' || tableNameNonQuoted || '" using btree ("DispatchedAt" asc nulls last);
create index if not exists "Index_Dispatched" on "' || schema || '"."' || tableNameNonQuoted || '" using btree ("Dispatched" asc nulls last);
';
execute createTable;
return 0;
end;
$body$
language 'plpgsql';
select pg_temp.create_outbox_table(@tablePrefix, @schema);
Drop Table
create or replace function pg_temp.drop_outbox_table(tablePrefix varchar, schema varchar)
returns integer as
$body$
declare
tableNameNonQuoted varchar;
dropTable text;
begin
tableNameNonQuoted := tablePrefix || 'OutboxData';
dropTable = 'drop table if exists "' || schema || '"."' || tableNameNonQuoted || '";';
execute dropTable;
return 0;
end;
$body$
language 'plpgsql';
select pg_temp.drop_outbox_table(@tablePrefix, @schema);
Saga
For a Saga with the following structure
[SqlSaga(transitionalCorrelationProperty: nameof(OrderSagaData.OrderId))]
public class OrderSaga : Saga<OrderSaga.OrderSagaData>,
IAmStartedByMessages<StartSaga>
{
protected override void ConfigureHowToFindSaga(SagaPropertyMapper<OrderSagaData> mapper)
{
mapper.ConfigureMapping<StartSaga>(msg => msg.OrderNumber).ToSaga(saga => saga.OrderNumber);
}
public class OrderSagaData :
ContainSagaData
{
public int OrderNumber { get; set; }
public Guid OrderId { get; set; }
}
Create Table
/* TableNameVariable */
/* Initialize */
/* CreateTable */
create or replace function pg_temp.create_saga_table_OrderSaga(tablePrefix varchar, schema varchar)
returns integer as
$body$
declare
tableNameNonQuoted varchar;
script text;
count int;
columnType varchar;
columnToDelete text;
begin
tableNameNonQuoted := tablePrefix || 'OrderSaga';
script = 'create table if not exists "' || schema || '"."' || tableNameNonQuoted || '"
(
"Id" uuid not null,
"Metadata" text not null,
"Data" jsonb not null,
"PersistenceVersion" character varying(23),
"SagaTypeVersion" character varying(23),
"Concurrency" int not null,
primary key("Id")
);';
execute script;
/* AddProperty OrderNumber */
script = 'alter table "' || schema || '"."' || tableNameNonQuoted || '" add column if not exists "Correlation_OrderNumber" integer';
execute script;
/* VerifyColumnType Int */
columnType := (
select data_type
from information_schema.columns
where
table_schema = schema and
table_name = tableNameNonQuoted and
column_name = 'Correlation_OrderNumber'
);
if columnType <> 'integer' then
raise exception 'Incorrect data type for Correlation_OrderNumber. Expected "integer" got "%"', columnType;
end if;
/* WriteCreateIndex OrderNumber */
script = 'create unique index if not exists "' || tablePrefix || '_i_919CC583A044D760C1FA433AD451B9E67BFD7C07" on "' || schema || '"."' || tableNameNonQuoted || '" using btree ("Correlation_OrderNumber" asc);';
execute script;
/* AddProperty OrderId */
script = 'alter table "' || schema || '"."' || tableNameNonQuoted || '" add column if not exists "Correlation_OrderId" uuid';
execute script;
/* VerifyColumnType Guid */
columnType := (
select data_type
from information_schema.columns
where
table_schema = schema and
table_name = tableNameNonQuoted and
column_name = 'Correlation_OrderId'
);
if columnType <> 'uuid' then
raise exception 'Incorrect data type for Correlation_OrderId. Expected "uuid" got "%"', columnType;
end if;
/* CreateIndex OrderId */
script = 'create unique index if not exists "' || tablePrefix || '_i_9ACA1354611B1EEE42F739F02F62E0AB88D036F3" on "' || schema || '"."' || tableNameNonQuoted || '" using btree ("Correlation_OrderId" asc);';
execute script;
/* PurgeObsoleteIndex */
/* PurgeObsoleteProperties */
for columnToDelete in
(
select column_name
from information_schema.columns
where
table_name = tableNameNonQuoted and
column_name LIKE 'Correlation_%' and
column_name <> 'Correlation_OrderNumber' and
column_name <> 'Correlation_OrderId'
)
loop
script = '
alter table "' || schema || '"."' || tableNameNonQuoted || '"
drop column "' || columnToDelete || '"';
execute script;
end loop;
/* CompleteSagaScript */
return 0;
end;
$body$
language 'plpgsql';
select pg_temp.create_saga_table_OrderSaga(@tablePrefix, @schema);
Drop Table
/* TableNameVariable */
/* DropTable */
create or replace function pg_temp.drop_saga_table_OrderSaga(tablePrefix varchar, schema varchar)
returns integer as
$body$
declare
tableNameNonQuoted varchar;
dropTable text;
begin
tableNameNonQuoted := tablePrefix || 'OrderSaga';
dropTable = 'drop table if exists "' || schema || '"."' || tableNameNonQuoted || '";';
execute dropTable;
return 0;
end;
$body$
language 'plpgsql';
select pg_temp.drop_saga_table_OrderSaga(@tablePrefix, @schema);
Subscription
Create Table
create or replace function pg_temp.create_subscription_table(tablePrefix varchar, schema varchar)
returns integer as
$body$
declare
tableNameNonQuoted varchar;
createTable text;
begin
tableNameNonQuoted := tablePrefix || 'SubscriptionData';
createTable = 'create table if not exists "' || schema || '"."' || tableNameNonQuoted || '"
(
"Id" character varying(400) not null,
"Subscriber" character varying(200) not null,
"Endpoint" character varying(200),
"MessageType" character varying(200) not null,
"PersistenceVersion" character varying(200) not null,
primary key ("Id")
);
';
execute createTable;
return 0;
end;
$body$
language 'plpgsql';
select pg_temp.create_subscription_table(@tablePrefix, @schema);
Drop Table
create or replace function pg_temp.drop_subscription_table(tablePrefix varchar, schema varchar)
returns integer as
$body$
declare
tableNameNonQuoted varchar;
dropTable text;
begin
tableNameNonQuoted := tablePrefix || 'SubscriptionData';
dropTable = 'drop table if exists "' || schema || '"."' || tableNameNonQuoted || '";';
execute dropTable;
return 0;
end;
$body$
language 'plpgsql';
select pg_temp.drop_subscription_table(@tablePrefix, @schema);
Timeout
Create Table
create or replace function pg_temp.create_timeouts_table(tablePrefix varchar, schema varchar)
returns integer as
$body$
declare
tableName varchar;
timeIndexName varchar;
sagaIndexName varchar;
createTable text;
begin
tableName := tablePrefix || 'TimeoutData';
timeIndexName := tableName || '_TimeIdx';
sagaIndexName := tableName || '_SagaIdx';
createTable = 'create table if not exists "' || schema || '"."' || tableName || '"
(
"Id" uuid not null,
"Destination" character varying(200),
"SagaId" uuid,
"State" bytea,
"Time" timestamp,
"Headers" text,
"PersistenceVersion" character varying(23),
primary key ("Id")
);
create index if not exists "' || timeIndexName || '" on "' || schema || '"."' || tableName || '" using btree ("Time" asc nulls last);
create index if not exists "' || sagaIndexName || '" on "' || schema || '"."' || tableName || '" using btree ("SagaId" asc nulls last);
';
execute createTable;
return 0;
end;
$body$
language 'plpgsql';
select pg_temp.create_timeouts_table(@tablePrefix, @schema);
Drop Table
create or replace function pg_temp.drop_timeouts_table(tablePrefix varchar, schema varchar)
returns integer as
$body$
declare
tableNameNonQuoted varchar;
dropTable text;
begin
tableNameNonQuoted := tablePrefix || 'TimeoutData';
dropTable = 'drop table if exists "' || schema || '"."' || tableNameNonQuoted || '";';
execute dropTable;
return 0;
end;
$body$
language 'plpgsql';
select pg_temp.drop_timeouts_table(@tablePrefix, @schema);
Run Time
These are the SQL scripts used at runtime to query and update data.
Outbox
Used at intervals to cleanup old outbox records.
delete from "public"."EndpointNameOutboxData"
where ctid in
(
select ctid
from "public"."EndpointNameOutboxData"
where
"Dispatched" = true and
"DispatchedAt" < @DispatchedBefore
limit @BatchSize
)
Get
Used by IOutboxStorage.
.
select
"Dispatched",
"Operations"
from "public"."EndpointNameOutboxData"
where "MessageId" = @MessageId
SetAsDispatched
Used by IOutboxStorage.
.
update "public"."EndpointNameOutboxData"
set
"Dispatched" = true,
"DispatchedAt" = @DispatchedAt,
"Operations" = '[]'
where "MessageId" = @MessageId
Store
Used by IOutboxStorage.
.
insert into "public"."EndpointNameOutboxData"
(
"MessageId",
"Operations",
"PersistenceVersion"
)
values
(
@MessageId,
@Operations,
@PersistenceVersion
)
Saga
Complete
Used by ISagaPersister.
.
delete from EndpointName_SagaName
where "Id" = @Id and "Concurrency" = @Concurrency
Save
Used by ISagaPersister.
.
insert into EndpointName_SagaName
(
"Id",
"Metadata",
"Data",
"PersistenceVersion",
"SagaTypeVersion",
"Concurrency",
"Correlation_CorrelationProperty",
"Correlation_TransitionalCorrelationProperty"
)
values
(
@Id,
@Metadata,
@Data,
@PersistenceVersion,
@SagaTypeVersion,
1,
@CorrelationId,
@TransitionalCorrelationId
)
GetByProperty
Used by ISagaPersister.
.
select
"Id",
"SagaTypeVersion",
"Concurrency",
"Metadata",
"Data"
from EndpointName_SagaName
where "Correlation_PropertyName" = @propertyValue
for update
GetBySagaId
Used by ISagaPersister.
.
select
"Id",
"SagaTypeVersion",
"Concurrency",
"Metadata",
"Data"
from EndpointName_SagaName
where "Id" = @Id
for update
Update
Used by ISagaPersister.
.
update EndpointName_SagaName
set
"Data" = @Data,
"PersistenceVersion" = @PersistenceVersion,
"SagaTypeVersion" = @SagaTypeVersion,
"Concurrency" = @Concurrency + 1,
"Correlation_TransitionalCorrelationProperty" = @TransitionalCorrelationId
where
"Id" = @Id and "Concurrency" = @Concurrency
Select used by Saga Finder
select
"Id",
"SagaTypeVersion",
"Concurrency",
"Metadata",
"Data"
from EndpointName_SagaName
where 1 = 1
for update
Subscriptions
GetSubscribers
Used by ISubscriptionStorage.
.
select distinct "Subscriber", "Endpoint"
from "public"."EndpointNameSubscriptionData"
where "MessageType" in (@type0)
Subscribe
Used by ISubscriptionStorage.
.
insert into "public"."EndpointNameSubscriptionData"
(
"Id",
"Subscriber",
"MessageType",
"Endpoint",
"PersistenceVersion"
)
values
(
concat(@Subscriber, @MessageType),
@Subscriber,
@MessageType,
@Endpoint,
@PersistenceVersion
)
on conflict ("Id") do update
set "Endpoint" = coalesce(@Endpoint, "public"."EndpointNameSubscriptionData"."Endpoint"),
"PersistenceVersion" = @PersistenceVersion
Unsubscribe
Used by ISubscriptionStorage.
.
delete from "public"."EndpointNameSubscriptionData"
where
"Subscriber" = @Subscriber and
"MessageType" = @MessageType
Timeouts
Peek
Used by IPersistTimeouts.
.
select
"Destination",
"SagaId",
"State",
"Time",
"Headers"
from "public"."EndpointNameTimeoutData"
where "Id" = @Id
Add
Used by IPersistTimeouts.
.
insert into "public"."EndpointNameTimeoutData"
(
"Id",
"Destination",
"SagaId",
"State",
"Time",
"Headers",
"PersistenceVersion"
)
values
(
@Id,
@Destination,
@SagaId,
@State,
@Time,
@Headers,
@PersistenceVersion
)
GetNextChunk
Used by IQueryTimeouts.
.
select "Time" from "public"."EndpointNameTimeoutData"
where "Time" > @EndTime
order by "Time"
limit 1
select "Id", "Time"
from "public"."EndpointNameTimeoutData"
where "Time" > @StartTime and "Time" <= @EndTime
TryRemove
Used by IPersistTimeouts.
.
delete from "public"."EndpointNameTimeoutData"
where "Id" = @Id;
RemoveTimeoutBy
Used by IPersistTimeouts.
.
delete from "public"."EndpointNameTimeoutData"
where "SagaId" = @SagaId