Table of Contents

Consuming Messages from Streams

Consuming messages from a stream can be done using one of three different methods depending on your application needs. You can access these methods from the consumer object created using JetStream context:

Install NATS.Net from Nuget.

// Generate serializer context at compile time, ready for native AOT deployments
[JsonSerializable(typeof(Order))]
public partial class OrderJsonSerializerContext : JsonSerializerContext
{
}

public record Order(int OrderId)
{
    public int OrderId { get; set; } = OrderId;
}

await using var nats = new NatsConnection();
var js = new NatsJSContext(nats);

await js.CreateStreamAsync(new StreamConfig(name: "orders", subjects: new[] { "orders.>" }));

var consumer = await js.CreateOrUpdateConsumerAsync(stream: "orders", new ConsumerConfig("order_processor"));

// Use generated JSON serializer
var orderSerializer = new NatsJsonContextSerializer<Order>(OrderJsonSerializerContext.Default);

// Publish new order messages
await nats.PublishAsync(subject: "orders.new.1", data: new Order(OrderId: 1), serializer: orderSerializer);
Note

See also Serialization section for more information about different serialization options.

Next Method

Next method is the simplest way of retrieving messages from a stream. Every time you call the next method, you get a single message or nothing based on the expiry time to wait for a message. Once a message is received you can process it and call next again for another.

var next = await consumer.NextAsync<Order>(serializer: orderSerializer);

if (next is { } msg)
{
    Console.WriteLine($"Processing {msg.Subject}: {msg.Data.OrderId}...");
    await msg.AckAsync();
}

Next is the simplest and most conservative way of consuming messages since you request a single message from JetStream server then acknowledge it before requesting more. However, next method is also the least performant since there is no message batching.

Fetch Method

Fetch method requests messages in batches to improve the performance while giving the application control over how fast it can process messages without overwhelming the application process.

await foreach (var msg in consumer.FetchAsync<Order>(new NatsJSFetchOpts { MaxMsgs = 1000 }, serializer: orderSerializer).WithCancellation(cancellationToken))
{
    // Process message
    await msg.AckAsync();

    // Loop ends when pull request expires or when requested number of messages (MaxMsgs) received
}

Consume Method

Consume method is the most performant method of consuming messages. Requests for messages (a.k.a. pull requests) are overlapped so that there is a constant flow of messages from the JetStream server. Flow is controlled by MaxMsgs or MaxBytes and respective thresholds to not overwhelm the application and to not waste server resources.

// Continuously consume a batch of messages (1000 by default)
await foreach (var msg in consumer.ConsumeAsync<Order>(serializer: orderSerializer).WithCancellation(cancellationToken))
{
    // Process message
    await msg.AckAsync();

    // loop never ends unless there is a terminal error, cancellation or a break
}

Handling Exceptions

While consuming messages (using next, fetch or consume methods) there are several scenarios where exceptions might be thrown by the client library, for example:

  • Consumer is deleted by another application or operator
  • Connection to NATS server is interrupted (mainly for next and fetch methods, consume method can recover)
  • Client pull request is invalid
  • Account permissions have changed
  • Cluster leader changed

A naive implementation might try to recover from errors assuming they are temporary e.g. the stream or the consumer will be created eventually:

while (!cancellationToken.IsCancellationRequested)
{
    try
    {
        await consumer.RefreshAsync(cancellationToken); // or try to recreate consumer

        await foreach (var msg in consumer.ConsumeAsync<Order>(serializer: orderSerializer).WithCancellation(cancellationToken))
        {
            // Process message
            await msg.AckAsync(cancellationToken: cancellationToken);
        }
    }
    catch (NatsJSProtocolException e)
    {
        // log exception
    }
    catch (NatsJSException e)
    {
        // log exception
        await Task.Delay(1000, cancellationToken); // backoff
    }
}

Depending on your application you should configure streams and consumers with appropriate settings so that the messages are processed and stored based on your requirements.

Note

This example used generated JSON serializer suitable for native AOT deployments. See also Serialization section for more details.