The following includes SQL scripts, C# code examples, and PowerShell scripts to facilitate deployment and operations against the SQL Server Transport.
Inspecting messages in the queue
The following script returns messages waiting in a given queue:
SELECT TOP (1000)
[Id],
[Expires],
[Headers],
[Body],
cast([Body] as varchar(max)) as [BodyString]
FROM {0} WITH (READPAST)
Some columns have been removed for clarity as they are only required for wire-level compatibility with previous versions of SQL Server transport.
The BodyString
column is a computed value that allows inspecting of the message body when a text-based serializer is used (e.g. Json or XML).
In case a column containing the message body in a human-readable format is enabled, the same result can be achieved using the following script:
SELECT TOP (1000)
[Id],
[Expires],
[Headers],
[Body],
[BodyString]
FROM {0} WITH (READPAST)
Create queues
Queue creation can be done for a specific endpoint or queues shared between multiple endpoints.
The create queue helper methods
In C#
public static class QueueCreationUtils
{
public static void CreateQueue(SqlConnection connection, string schema, string queueName)
{
var sql = $@"
if not exists (select * from sys.objects where object_id = object_id(N'[{schema}].[{queueName}]') and type in (N'U'))
begin
create table [{schema}].[{queueName}](
[Id] [uniqueidentifier] not null,
[CorrelationId] [varchar](255),
[ReplyToAddress] [varchar](255),
[Recoverable] [bit] not null,
[Expires] [datetime],
[Headers] [nvarchar](max) not null,
[Body] [varbinary](max),
[RowVersion] [bigint] identity(1,1) not null
);
create nonclustered index [Index_RowVersion] on [{schema}].[{queueName}]
(
[RowVersion]
)
create nonclustered index [Index_Expires] on [{schema}].[{queueName}]
(
[Expires]
)
include
(
[Id],
[RowVersion]
)
where [Expires] is not null
end";
using (var command = new SqlCommand(sql, connection))
{
command.ExecuteNonQuery();
}
}
public static void CreateDelayedQueue(SqlConnection connection, string schema, string queueName)
{
var sql = $@"
if not exists (select * from sys.objects where object_id = object_id(N'[{schema}].[{queueName}]') and type in (N'U'))
begin
create table [{schema}].[{queueName}](
[Headers] nvarchar(max) not null,
[Body] varbinary(max),
[Due] datetime not null,
[RowVersion] bigint identity(1,1) not null
);
create nonclustered index [Index_Due] on [{schema}].[{queueName}]
(
[Due]
)
end";
using (var command = new SqlCommand(sql, connection))
{
command.ExecuteNonQuery();
}
}
}
In PowerShell
function CreateQueue {
param (
[Parameter(Mandatory=$true)]
[ValidateNotNull()]
[System.Data.SqlClient.SqlConnection] $connection,
[ValidateNotNullOrEmpty()]
[string] $schema = "dbo",
[Parameter(Mandatory=$true)]
[ValidateNotNullOrEmpty()]
[string] $queueName
)
$sql = @"
if not exists (select * from sys.objects where object_id = object_id(N'[{0}].[{1}]') and type in (N'U'))
begin
create table [{0}].[{1}](
[Id] [uniqueidentifier] not null,
[CorrelationId] [varchar](255),
[ReplyToAddress] [varchar](255),
[Recoverable] [bit] not null,
[Expires] [datetime],
[Headers] [nvarchar](max) not null,
[Body] [varbinary](max),
[RowVersion] [bigint] identity(1,1) not null
);
create clustered index [Index_RowVersion] on [{0}].[{1}]
(
[RowVersion]
)
create nonclustered index [Index_Expires] on [{0}].[{1}]
(
[Expires]
)
include
(
[Id],
[RowVersion]
)
where
[Expires] is not null
end
"@ -f $schema, $queueName
$command = New-Object System.Data.SqlClient.SqlCommand($sql, $connection)
$command.ExecuteNonQuery()
$command.Dispose()
}
function CreateDelayedQueue {
param (
[Parameter(Mandatory=$true)]
[ValidateNotNull()]
[System.Data.SqlClient.SqlConnection] $connection,
[ValidateNotNullOrEmpty()]
[string] $schema = "dbo",
[Parameter(Mandatory=$true)]
[ValidateNotNullOrEmpty()]
[string] $queueName
)
$sql = @"
if not exists (select * from sys.objects where object_id = object_id(N'[{0}].[{1}]') and type in (N'U'))
begin
create table [{0}].[{1}](
[Headers] nvarchar(max) not null,
[Body] varbinary(max),
[Due] datetime not null,
[RowVersion] bigint identity(1,1) not null
);
create nonclustered index [Index_Due] on [{0}].[{1}]
(
[Due]
)
end
"@ -f $schema, $queueName
$command = New-Object System.Data.SqlClient.SqlCommand($sql, $connection)
$command.ExecuteNonQuery()
$command.Dispose()
}
Creating queues for an endpoint
To create all queues for a given endpoint name.
In C#
public static void CreateQueuesForEndpoint(SqlConnection connection, string schema, string endpointName)
{
// main queue
QueueCreationUtils.CreateQueue(connection, schema, endpointName);
// callback queue
QueueCreationUtils.CreateQueue(connection, schema, $"{endpointName}.{Environment.MachineName}");
// delayed messages queue
// Only required in Version 3.1 and above when native delayed delivery is enabled
QueueCreationUtils.CreateDelayedQueue(connection, schema, $"{endpointName}.Delayed");
// timeout queue
// only required in Versions 3.0 and below or when native delayed delivery is disabled or timeout manager compatibility mode is enabled
QueueCreationUtils.CreateQueue(connection, schema, $"{endpointName}.Timeouts");
// timeout dispatcher queue
// only required in Versions 3.0 and below or when native delayed delivery is disabled or timeout manager compatibility mode is enabled
QueueCreationUtils.CreateQueue(connection, schema, $"{endpointName}.TimeoutsDispatcher");
// retries queue
// TODO: Only required in Versions 2 and below
QueueCreationUtils.CreateQueue(connection, schema, $"{endpointName}.Retries");
}
In PowerShell
Function CreateQueuesForEndpoint
{
param(
[Parameter(Mandatory=$true)]
[ValidateNotNull()]
[string] $connection,
[ValidateNotNullOrEmpty()]
[string] $schema = "dbo",
[Parameter(Mandatory=$true)]
[ValidateNotNullOrEmpty()]
[string] $endpointName,
[Parameter(HelpMessage="Only required for NSB Versions 5 and below")]
[Switch] $includeRetries,
[Parameter(HelpMessage="Only required for SQL Server Version 3.1 and above if native delayed delivery is enabled")]
[Switch] $includeDelayed
)
$sqlConnection = New-Object System.Data.SqlClient.SqlConnection($connection)
$sqlConnection.Open()
try {
# main queue
CreateQueue -connection $sqlConnection -schema $schema -queuename $endpointName
# timeout queue
CreateQueue -connection $sqlConnection -schema $schema -queuename "$endpointName.timeouts"
# timeout dispatcher queue
CreateQueue -connection $sqlConnection -schema $schema -queuename "$endpointName.timeoutsdispatcher"
# retries queue
if ($includeRetries) {
CreateQueue -connection $sqlConnection -schema $schema -queuename "$endpointName.retries"
}
# retries queue
if ($includeDelayed) {
CreateDelayedQueue -connection $sqlConnection -schema $schema -queuename "$endpointName.delayed"
}
}
finally {
$sqlConnection.Close()
$sqlConnection.Dispose()
}
}
Using the create endpoint queues
In C#
using (var connection = new SqlConnection(connectionString))
{
connection.Open();
CreateQueuesForEndpoint(
connection: connection,
schema: "dbo",
endpointName: "myendpoint");
}
In PowerShell
# For NServiceBus 6 Endpoints
CreateQueuesForEndpoint -endpointName "myendpoint" -connection "TheConnectionString"
# For NServiceBus 5 and below Endpoints
CreateQueuesForEndpoint -endpointName "myendpoint" -connection "TheConnectionString" -IncludeRetries
Add message body string column
The following snippet adds a computed column containing the message body in a human-readable format.
In C#
public static void AddMessageBodyStringColumn(SqlConnection connection, string schema, string queueName)
{
var sql = $@"if not exists (select * from sys.objects where object_id = object_id(N'[{schema}].[{queueName}]') and type in (N'U')) return
if NOT exists (select * from sys.columns where object_id = object_id(N'[{schema}].[{queueName}]') and name = 'BodyString')
alter table [{schema}].[{queueName}]
add BodyString as cast(Body as nvarchar(max));";
using (var command = new SqlCommand(sql, connection))
{
command.ExecuteNonQuery();
}
}
To create shared queues
In C#
In PowerShell
Delete queues
The delete helper queue methods
public static class QueueDeletionUtils
{
public static void DeleteQueue(SqlConnection connection, string schema, string queueName)
{
var deleteScript = $@"
if exists (select * from sys.objects where object_id = object_id(N'[{schema}].[{queueName}]') and type in (N'U'))
drop table [{schema}].[{queueName}]";
using (var command = new SqlCommand(deleteScript, connection))
{
command.ExecuteNonQuery();
}
}
}
To delete all queues for a given endpoint
public static void DeleteQueuesForEndpoint(SqlConnection connection, string schema, string endpointName)
{
// main queue
QueueDeletionUtils.DeleteQueue(connection, schema, endpointName);
// callback queue
QueueDeletionUtils.DeleteQueue(connection, schema, $"{endpointName}.{Environment.MachineName}");
// timeout queue
QueueDeletionUtils.DeleteQueue(connection, schema, $"{endpointName}.Timeouts");
// timeout dispatcher queue
QueueDeletionUtils.DeleteQueue(connection, schema, $"{endpointName}.TimeoutsDispatcher");
// retries queue
// TODO: Only required in Versions 2 and below
QueueDeletionUtils.DeleteQueue(connection, schema, $"{endpointName}.Retries");
}
using (var sqlConnection = new SqlConnection(connectionString))
{
sqlConnection.Open();
DeleteQueuesForEndpoint(
connection: sqlConnection,
schema: "dbo",
endpointName: "myendpoint");
}
To delete shared queues
Return message to source queue
The retry helper methods
A retry involves the following actions:
- Read a message from the error queue table.
- Forward that message to another queue table to be retried.
Since the connection information for the endpoint that failed is not contained in the error queue table, that information is explicitly passed in.
public static void ReturnMessageToSourceQueue(
string errorQueueConnection,
string errorQueueName,
string retryConnectionString,
string retryQueueName,
Guid messageId)
{
using (var scope = new TransactionScope(TransactionScopeAsyncFlowOption.Enabled))
{
var messageToRetry = ReadAndDelete(errorQueueConnection, errorQueueName, messageId);
RetryMessage(retryConnectionString, retryQueueName, messageToRetry);
scope.Complete();
}
}
class MessageToRetry
{
public Guid Id;
public string Headers;
public byte[] Body;
}
static void RetryMessage(string connectionString, string queueName, MessageToRetry messageToRetry)
{
var sql = $@"
insert into [{queueName}] (
Id,
Recoverable,
Headers,
Body)
values (
@Id,
@Recoverable,
@Headers,
@Body)";
using (var connection = new SqlConnection(connectionString))
{
connection.Open();
using (var command = new SqlCommand(sql, connection))
{
var parameters = command.Parameters;
parameters.Add("Id", SqlDbType.UniqueIdentifier).Value = messageToRetry.Id;
parameters.Add("Headers", SqlDbType.NVarChar).Value = messageToRetry.Headers;
parameters.Add("Body", SqlDbType.VarBinary).Value = messageToRetry.Body;
parameters.Add("Recoverable", SqlDbType.Bit).Value = true;
command.ExecuteNonQuery();
}
}
}
static MessageToRetry ReadAndDelete(string connectionString, string queueName, Guid messageId)
{
var sql = $@"
delete from [{queueName}]
output
deleted.Headers,
deleted.Body
where Id = @Id";
using (var connection = new SqlConnection(connectionString))
{
connection.Open();
using (var command = new SqlCommand(sql, connection))
{
command.Parameters.AddWithValue("Id", messageId);
using (var reader = command.ExecuteReader(CommandBehavior.SingleRow))
{
if (reader.Read())
{
return new MessageToRetry
{
Id = messageId,
Headers = reader.GetString(0),
Body = reader.GetSqlBinary(1).Value
};
}
var message = $"Could not find error entry with messageId '{messageId}'";
throw new Exception(message);
}
}
}
}
Using the retry helper methods
ReturnMessageToSourceQueue(
errorQueueConnection: @"Data Source=.\SqlExpress;Database=samples;Integrated Security=True",
errorQueueName: "errors",
retryConnectionString: @"Data Source=.\SqlExpress;Database=samples;Integrated Security=True",
retryQueueName: "target",
messageId: Guid.Parse("1667B60E-2948-4EF0-8BB1-8C851A9407D2")
);
Archiving SqlTransport audit log to long-term storage
There are several ways to achieve archiving of the audit log, including using techniques like Table Partitioning and Snapshot Replication. In this example, BCP utility will be used.
Create helper "archive" table
Create an audit_archive
table with this SQL script.
create table [dbo].[audit_archive](
[Id] [uniqueidentifier] not null,
[CorrelationId] [varchar](255),
[ReplyToAddress] [varchar](255),
[Recoverable] [bit] not null,
[Expires] [datetime],
[Headers] [nvarchar](max) not null,
[Body] [varbinary](max),
[RowVersion] [bigint] not null
)
Move records to archive table
This script moves the contents of the audit table into audit_archive
table.
delete from [dbo].[audit]
output [deleted].*
into [dbo].[audit_archive]
This can be run with a scheduled job to clear the archive regularly.
Execute BCP
Once that query completes, the records can be archived to disk. In a command prompt, use the BCP to create an archive on disk.
bcp samples.dbo.audit_archive out archive.csv -c -q -T -S .\SQLExpress
Truncate the archive table
The audit records will still have to clear using the following script:
truncate table [dbo].[audit_archive];