Table of Contents

Managing JetStream and the JetStream Context

JetStream Context is a NATS JetStream Client concept which is mainly responsible for managing streams. It serves as the entry point for creating, configuring, and controlling the streams. JetStream Context also exposes methods to manage consumers directly, bypassing the need to get or create a stream first.

You can create a context using an existing NATS connection:

await using var nats = new NatsConnection();

var js = new NatsJSContext(nats);

Streams

Streams are message stores, each stream defines how messages are stored and what the limits (duration, size, interest) of the retention are. Streams consume normal NATS subjects, any message published on those subjects will be captured in the defined storage system. You can do a normal publish to the subject for unacknowledged delivery, though it's better to use the JetStream publish calls instead as the JetStream server will reply with an acknowledgement that it was successfully stored.

An example of creating a stream:

await js.CreateStreamAsync(new StreamConfig(name: "orders", subjects: new[] { "orders.>" }));

However, in practice streams are usually managed separately from the applications, for example using the NATS command line client you can create a stream interactively:

$ nats stream create my_events --subjects 'events.*'
? Storage  [Use arrows to move, type to filter, ? for more help]
> file
  memory
# you can safely choose defaults for testing and development

Refer to NATS JetStream documentation for stream concepts and more information.

Consumers

A consumer is a stateful view of a stream. It acts as interface for clients to consume a subset of messages stored in a stream and will keep track of which messages were delivered and acknowledged by clients.

Unlike streams, consumers are accessed by NATS client libraries as part of message consumption:

// Create or get a consumer
var consumer = await js.CreateOrUpdateConsumerAsync(stream: "orders", new ConsumerConfig("order_processor"));
// Get an existing consumer
var consumer = await js.GetConsumerAsync(stream: "orders", consumer: "order_processor");

Ephemeral and Durable Consumers

A consumer can be ephemeral or durable. It is considered durable when an explicit name is set on the DurableName field when creating the consumer, otherwise it is considered ephemeral. Durables and ephemeral behave exactly the same except that an ephemeral will be automatically cleaned up (deleted) after a period of inactivity, specifically when there are no subscriptions bound to the consumer.

// Create a durable consumer
var durableConfig = new ConsumerConfig("durable_processor");

// Same as above
durableConfig = new ConsumerConfig
{
    Name = "durable_processor",
    DurableName = "durable_processor",
    AckPolicy = ConsumerConfigAckPolicy.Explicit,
};

var consumer = await js.CreateOrUpdateConsumerAsync(stream: "orders", durableConfig);

Console.WriteLine($"Consumer Name: {consumer.Info.Name}"); // durable_processor
Console.WriteLine($"Consumer DurableName: {consumer.Info.Config.DurableName}"); // durable_processor
// Create an ephemeral consumer by not setting durable name
var ephemeralConfig = new ConsumerConfig();

var consumer = await js.CreateOrUpdateConsumerAsync(stream: "orders", ephemeralConfig);

Console.WriteLine($"Consumer Name: {consumer.Info.Name}"); // e.g. Z8YlwrP9 (server assigned random name)