Table of Contents

Publish-Subscribe Pattern

NATS implements a publish-subscribe message distribution model for one-to-many communication. A publisher sends a message on a subject and any active subscriber listening on that subject receives the message.

await using var nats = new NatsConnection();

var subscription = Task.Run(async () =>
{
    await foreach (var msg in nats.SubscribeAsync<int>("foo"))
    {
        Console.WriteLine($"Received {msg.Subject}: {msg.Data}\n");

        if (msg.Data == -1)
            break;
    }
});

// Give subscription time to start
await Task.Delay(1000);

for (var i = 0; i < 10; i++)
{
    Console.WriteLine($" Publishing {i}...");
    await nats.PublishAsync<int>("foo", i);
}

// Signal subscription to stop
await nats.PublishAsync<int>("foo", -1);

// Make sure subscription completes cleanly
await subscription;

Subscriptions with Lower Level Control

The SubscribeAsync() method is a convenient way to subscribe to a subject and receive messages without much effort. If you need more control over how subscription is handled, you can use the SubscribeCoreAsync() method instead.

await using var nats = new NatsConnection();

// Connections are lazy, so we need to connect explicitly
// to avoid any races between subscription and publishing.
await nats.ConnectAsync();

await using var sub = await nats.SubscribeCoreAsync<int>("foo");

for (var i = 0; i < 10; i++)
{
    Console.WriteLine($" Publishing {i}...");
    await nats.PublishAsync<int>("foo", i);
}

// Signal subscription to stop
await nats.PublishAsync<int>("foo", -1);

// Messages have been collected in the subscription internal channel
// now we can drain them
await foreach (var msg in sub.Msgs.ReadAllAsync())
{
    Console.WriteLine($"Received {msg.Subject}: {msg.Data}\n");
    if (msg.Data == -1)
        break;
}

// We can unsubscribe from the subscription explicitly
// (otherwise dispose will do it for us)
await sub.UnsubscribeAsync();
Note

NatsConnection establishes the first server connection when the first call to subscribe or publish is made. This is why we call the ConnectAsync() method explicitly before subscribe or publishing any messages in the example above, making sure the subscription request is received by the server before any publish requests, avoiding potential race conditions of subscribe and publish method establishing the first connection.

Note

PingAsync() is somewhat a special method in all NATS clients. It is used to send a ping to the server and receive a pong back while measuring the round trip time. Since it waits for the server to respond, as a side effect it also flushes the outgoing buffers.

Remember that every NatsConnection instance is a single TCP connection and all the calls sent to the server are essentially serialized back to back after they're picked up from internal queues and buffers.