Table of Contents

Publishing Messages to Streams

If you want to persist your messages you can do a normal publish to the subject for unacknowledged delivery, though it's better to use the JetStream context publish calls instead as the JetStream server will reply with an acknowledgement that it was successfully stored.

The subject must be configured on a stream to be persisted:

await using var nats = new NatsConnection();
var js = new NatsJSContext(nats);

await js.CreateStreamAsync(new StreamConfig(name: "orders", subjects: new[] { "orders.>" }));

or using the nats cli:

$ nats stream create orders --subjects 'orders.>'

Then you can publish to subjects captured by the stream:

await using var nats = new NatsConnection();
var js = new NatsJSContext(nats);

var order = new Order(OrderId: 1);

// Use generated JSON serializer
var orderSerializer = new NatsJsonContextSerializer<Order>(OrderJsonSerializerContext.Default);

var ack = await js.PublishAsync("orders.new.1", order, serializer: orderSerializer);

ack.EnsureSuccess();
// Generate serializer context at compile time, ready for native AOT deployments
[JsonSerializable(typeof(Order))]
public partial class OrderJsonSerializerContext : JsonSerializerContext
{
}

public record Order(int OrderId)
{
    public int OrderId { get; set; } = OrderId;
}

Message Deduplication

JetStream support idempotent message writes by ignoring duplicate messages as indicated by the message ID. Message ID is not part of the message but rather passed as metadata, part of the message headers.

var ack = await js.PublishAsync(subject: "orders.new.1", data: order, opts: new NatsJSPubOpts { MsgId = "1" }, serializer: orderSerializer);
if (ack.Duplicate)
{
    // A message with the same ID was published before
}
Note

See also Serialization section for more information about different serialization options.