Table of Contents

Serialization

NATS.Net supports serialization of messages using a simple interface INatsSerializer<T>.

By default, the client uses the NatsDefaultSerializer<T> which can handle binary data, UTF8 strings and numbers. You can provide your own serializer by implementing the INatsSerializer<T> interface or using the NatsJsonContextSerializer<T> for generated JSON serialization. Serializers can also be chained together to provide multiple serialization formats typically depending on the types being used.

Default Serializer Registry

Default serializer is used when no serializer is provided to the connection options. It can handle binary data, UTF8 strings and numbers. It uses the following rules to determine the type of the data:

  • If the data is a byte array, Memory<byte>, IMemoryOwner<byte> or similar it is treated as binary data.
  • If the data is a string or similar it is treated as UTF8 string.
  • If the data is a primitive (for example DateTime, int or double. See also NatsUtf8PrimitivesSerializer<T>) it is treated as the primitive encoded as a UTF8 string.
  • For any other type, the serializer will throw an exception.

Serialising custom data formats can be done by implementing the serializer registry interface INatsSerializerRegistry that can be used to provide a custom serializer instances for specific types.

You would be using the default serializer by not specifying a serializer registry in connection options or by setting it to the default explicitly:

// Same as not specifying a serializer.
var natsOpts = NatsOpts.Default with { SerializerRegistry = NatsDefaultSerializerRegistry.Default };

await using var nats = new NatsConnection(natsOpts);

var subscriber = Task.Run(async () =>
{
    // Default serializer knows how to deal with UTF8 strings, numbers and binary data.
    await foreach (var msg in nats.SubscribeAsync<string>("foo"))
    {
        // Check for the end of messages.
        if (msg.Data == null)
            break;

        // Outputs 'Hello World'
        Console.WriteLine(msg.Data);
    }
});

// Give subscriber a chance to connect.
await Task.Delay(1000);

// Default serializer knows how to deal with UTF8 strings, numbers and binary data.
await nats.PublishAsync<string>(subject: "foo", data: "Hello World");

// Signal the end of messages by sending an empty payload.
await nats.PublishAsync(subject: "foo");

await subscriber;

The default serializer is designed to be used by developers who want to only work with binary data, and provide an out of the box experience for basic use cases like sending and receiving UTF8 strings.

Using JSON Serializer Context

The NatsJsonContextSerializer<T> uses the System.Text.Json serializer to serialize and deserialize messages. It relies on the System.Text.Json source generator to generate the serialization code at compile time. This is the recommended JSON serializer for most use cases and it's required for Native AOT deployments.

First you need to define your JSON classes and a context to generate the serialization code:

public record MyData
{
    [JsonPropertyName("id")]
    public int Id { get; set; }

    [JsonPropertyName("name")]
    public string? Name { get; set; }
}

[JsonSerializable(typeof(MyData))]
internal partial class MyJsonContext : JsonSerializerContext
{
}

Then you can use the NatsJsonContextSerializer<T> to serialize and deserialize messages by providing the registry (NatsJsonContextSerializerRegistry) with the connection options:

// Set the custom serializer registry as the default for the connection.
var myRegistry = new NatsJsonContextSerializerRegistry(MyJsonContext.Default, OtherJsonContext.Default);

var natsOpts = NatsOpts.Default with { SerializerRegistry = myRegistry };

await using var nats = new NatsConnection(natsOpts);

var subscriber = Task.Run(async () =>
{
    await foreach (var msg in nats.SubscribeAsync<MyData>("foo"))
    {
        // Outputs 'MyData { Id = 1, Name = bar }'
        Console.WriteLine(msg.Data);
        break;
    }
});

// Give subscriber a chance to connect.
await Task.Delay(1000);

await nats.PublishAsync<MyData>(subject: "foo", data: new MyData { Id = 1, Name = "bar" });

await subscriber;

You can also set the serializer for a specific subscription or publish call:

await using var nats = new NatsConnection();

var serializer = new NatsJsonContextSerializer<MyData>(MyJsonContext.Default);

var subscriber = Task.Run(async () =>
{
    await foreach (var msg in nats.SubscribeAsync<MyData>("foo", serializer: serializer))
    {
        // Outputs 'MyData { Id = 1, Name = bar }'
        Console.WriteLine(msg.Data);
        break;
    }
});

// Give subscriber a chance to connect.
await Task.Delay(1000);

await nats.PublishAsync<MyData>(subject: "foo", data: new MyData { Id = 1, Name = "bar" }, serializer: serializer);

await subscriber;

Using Custom Serializer

You can also provide your own serializer by implementing the INatsSerializer<T> interface. This is useful if you need to support a custom serialization format or if you need to support multiple serialization formats.

Here is an example of a custom serializer that uses the Google ProtoBuf serializer to serialize and deserialize:

public class MyProtoBufSerializer<T> : INatsSerializer<T>
{
    public static readonly INatsSerializer<T> Default = new MyProtoBufSerializer<T>();

    public void Serialize(IBufferWriter<byte> bufferWriter, T value)
    {
        if (value is IMessage message)
        {
            message.WriteTo(bufferWriter);
        }
        else
        {
            throw new NatsException($"Can't serialize {typeof(T)}");
        }
    }

    public T? Deserialize(in ReadOnlySequence<byte> buffer)
    {
        if (typeof(T) == typeof(Greeting))
        {
            return (T)(object)Greeting.Parser.ParseFrom(buffer);
        }

        throw new NatsException($"Can't deserialize {typeof(T)}");
    }
}

public class MyProtoBufSerializerRegistry : INatsSerializerRegistry
{
    public INatsSerialize<T> GetSerializer<T>() => MyProtoBufSerializer<T>.Default;

    public INatsDeserialize<T> GetDeserializer<T>() => MyProtoBufSerializer<T>.Default;
}

You can then use the custom serializer as the default for the connection:

var natsOpts = NatsOpts.Default with { SerializerRegistry = new MyProtoBufSerializerRegistry() };

await using var nats = new NatsConnection(natsOpts);

var subscriber = Task.Run(async () =>
{
    await foreach (var msg in nats.SubscribeAsync<Greeting>("foo"))
    {
        // Outputs '{ "id": 42, "name": "Marvin" }'
        Console.WriteLine(msg.Data);
        break;
    }
});

// Give subscriber a chance to connect.
await Task.Delay(1000);

await nats.PublishAsync(subject: "foo", data: new Greeting { Id = 42, Name = "Marvin" });

await subscriber;

Using Multiple Serializers (chaining)

You can also chain multiple serializers together to support multiple serialization formats. The first serializer in the chain that can handle the data will be used. This is useful if you need to support multiple serialization formats and reuse them.

Note that chaining serializers is implemented by convention and not enforced by the INatsSerializer<T> interface since the next serializer would not be exposed to external users of the interface.

Here is an example of a serializer that uses the Google ProtoBuf serializer and the NatsJsonContextSerializer<T> to serialize and deserialize messages based on the type:

public class MixedSerializerRegistry : INatsSerializerRegistry
{
    public INatsSerialize<T> GetSerializer<T>() => new NatsJsonContextSerializer<T>(MyJsonContext.Default, next: MyProtoBufSerializer<T>.Default);

    public INatsDeserialize<T> GetDeserializer<T>() => new NatsJsonContextSerializer<T>(MyJsonContext.Default, next: MyProtoBufSerializer<T>.Default);
}
var natsOpts = NatsOpts.Default with { SerializerRegistry = new MixedSerializerRegistry() };

await using var nats = new NatsConnection(natsOpts);

var subscriber1 = Task.Run(async () =>
{
    await foreach (var msg in nats.SubscribeAsync<Greeting>("greet"))
    {
        // Outputs '{ "id": 42, "name": "Marvin" }'
        Console.WriteLine(msg.Data);
        break;
    }
});

var subscriber2 = Task.Run(async () =>
{
    await foreach (var msg in nats.SubscribeAsync<MyData>("data"))
    {
        // Outputs 'MyData { Id = 1, Name = bar }'
        Console.WriteLine(msg.Data);
        break;
    }
});

// Give subscribers a chance to connect.
await Task.Delay(1000);

await nats.PublishAsync(subject: "greet", data: new Greeting { Id = 42, Name = "Marvin" });
await nats.PublishAsync(subject: "data", data: new MyData { Id = 1, Name = "Bob" });

await Task.WhenAll(subscriber1, subscriber2);

Dealing with Binary Data and Buffers

The default serializer can handle binary data and buffers. This is typically archived by using IMemoryOwner<byte> implementations. NATS .NET Client provides a NatsMemoryOwner<T> implementation that can be used to allocate buffers. The NatsMemoryOwner<T> and NatsBufferWriter<T> (adapted from .NET Community Toolkit) are IMemoryOwner<byte> and IBufferWriter<T> implementations that use the ArrayPool to allocate buffers. They can be used with the default serializer.

// Same as not specifying a serializer.
var natsOpts = NatsOpts.Default with { SerializerRegistry = NatsDefaultSerializerRegistry.Default };

await using var nats = new NatsConnection(natsOpts);

var subscriber = Task.Run(async () =>
{
    // Default serializer knows how to deal with binary data types like NatsMemoryOwner<byte>.
    await foreach (var msg in nats.SubscribeAsync<NatsMemoryOwner<byte>>("foo"))
    {
        // Check for the end of messages.
        if (msg.Data.Length == 0)
            break;

        // Dispose the memory owner after using it so it can be returned to the pool.
        using var memoryOwner = msg.Data;

        // Outputs 'Hi'
        Console.WriteLine(Encoding.ASCII.GetString(memoryOwner.Memory.Span));
    }
});

// Give subscriber a chance to connect.
await Task.Delay(1000);

// Don't reuse NatsBufferWriter, it's disposed and returned to the pool
// by the publisher after being written to network.
var bw = new NatsBufferWriter<byte>();
var memory = bw.GetMemory(2);
memory.Span[0] = (byte)'H';
memory.Span[1] = (byte)'i';
bw.Advance(2);

// Default serializer knows how to deal with binary data types like NatsBufferWriter<byte>.
await nats.PublishAsync(subject: "foo", data: bw);

// Signal the end of messages by sending an empty payload.
await nats.PublishAsync(subject: "foo");

await subscriber;

Advantage of using NatsMemoryOwner<T> and NatsBufferWriter<T> is that they can be used with the default serializer and they can be used to allocate buffers from the ArrayPool<T> which can be reused. This is useful if you need to allocate buffers for binary data and you want to avoid allocating buffers on for every operation (e.g. new byte[]) reducing garbage collection pressure. They may also be useful for example, if your subscription may receive messages with different formats and the only way to determine the format is by reading the message.

Using JSON Serialization with Reflection

If you're not using Native AOT deployments you can use the NatsJsonSerializer<T> to serialize and deserialize messages. NatsJsonSerializer<T> uses System.Text.Json APIs that can work with types that are not registered to generate serialization code.

Using this serializer is most useful for use cases where you want to send and receive JSON messages and you don't want to worry about registering types. It's also useful for prototyping and testing. To use the serializer you need to install the NATS.Client.Serializers.Json Nuget package.

$ dotnet add package NATS.Client.Serializers.Json

Then set the serializer as the default for the connection:

using NATS.Client.Serializers.Json;
var natsOpts = NatsOpts.Default with { SerializerRegistry = NatsJsonSerializerRegistry.Default };

await using var nats = new NatsConnection(natsOpts);

Now you can use any type without worrying about registering it. System.Text.Json serializer will use reflection to serialize and deserialize the messages.