Table of Contents

Serialization

NATS.Net supports serialization of messages using a simple interface INatsSerializer<T>.

By default, the client uses the NatsClientDefaultSerializer<T> which can handle binary data, UTF8 strings, numbers, and ad hoc JSON serialization. You can provide your own serializer by implementing the INatsSerializer<T> interface or using the NatsJsonContextSerializer<T> for generated JSON serialization. Serializers can also be chained together to provide multiple serialization formats typically depending on the types being used.

Serializer Registries

There are two default serializer registries that can be used to provide serializers for specific types. For any other serializers, you can implement your own serializer registry.

NatsClientDefaultSerializer

This is the default serializer for NatsClient that is used when no serializer registry is provided as an option.

  • Can serialize what NatsDefaultSerializerRegistry can (see below).
  • Additionally, it can serialize data classes using ad hoc JSON serialization.
  • Uses reflection to generate serialization code at runtime so it's not AOT friendly.

The default client serializer is designed to be used by developers who want to have an out-of-the-box experience for basic use cases like sending and receiving UTF8 strings, or JSON messages.

NatsDefaultSerializerRegistry

This is the default serializer for NatsConnection that is used when no serializer registry is provided as an option. See also the differences between NatsClient vs NatsConnection

  • AOT friendly
  • If the data is a byte array, Memory<byte>, IMemoryOwner<byte> or similar it is treated as binary data.
  • If the data is a string or similar it is treated as UTF8 string.
  • If the data is a primitive (for example DateTime, int or double. See also NatsUtf8PrimitivesSerializer<T>) it is treated as the primitive encoded as a UTF8 string.
  • For any other type, the serializer will throw an exception.

The default connection serializer is designed to be AOT friendly and mostly suitable for binary data.

Using Custom Serializer Registries

Serialising custom data formats can be done by implementing the serializer registry interface INatsSerializerRegistry that can be used to provide a custom serializer instances for specific types.

// Set your custom serializer registry as the default for the connection.
NatsOpts opts = NatsOpts.Default with { SerializerRegistry = new MyProtoBufSerializerRegistry() };

await using NatsClient nc = new NatsClient(opts);

Using JSON Serializer Context

The NatsJsonContextSerializer<T> uses the System.Text.Json serializer to serialize and deserialize messages. It relies on the System.Text.Json source generator to generate the serialization code at compile time. This is the recommended JSON serializer for most use cases and it's required for Native AOT deployments.

First you need to define your JSON classes and a context to generate the serialization code:

public record MyData
{
    [JsonPropertyName("id")]
    public int Id { get; set; }

    [JsonPropertyName("name")]
    public string? Name { get; set; }
}

[JsonSerializable(typeof(MyData))]
internal partial class MyJsonContext : JsonSerializerContext
{
}

Then you can use the NatsJsonContextSerializer<T> to serialize and deserialize messages by providing the registry (NatsJsonContextSerializerRegistry) with the connection options:

// Set the custom serializer registry as the default for the connection.
NatsJsonContextSerializerRegistry myRegistry = new NatsJsonContextSerializerRegistry(MyJsonContext.Default, OtherJsonContext.Default);

NatsOpts opts = new NatsOpts { SerializerRegistry = myRegistry };

await using NatsClient nc = new NatsClient(opts);

Task subscriber = Task.Run(async () =>
{
    await foreach (NatsMsg<MyData> msg in nc.SubscribeAsync<MyData>("foo"))
    {
        // Outputs 'MyData { Id = 1, Name = bar }'
        Console.WriteLine(msg.Data);
        break;
    }
});

// Give subscriber a chance to connect.
await Task.Delay(1000);

await nc.PublishAsync<MyData>(subject: "foo", data: new MyData { Id = 1, Name = "bar" });

await subscriber;

You can also set the serializer for a specific subscription or publish call:

await using NatsClient nc = new NatsClient();

NatsJsonContextSerializer<MyData> serializer = new NatsJsonContextSerializer<MyData>(MyJsonContext.Default);

Task subscriber = Task.Run(async () =>
{
    await foreach (NatsMsg<MyData> msg in nc.SubscribeAsync<MyData>("foo", serializer: serializer))
    {
        // Outputs 'MyData { Id = 1, Name = bar }'
        Console.WriteLine(msg.Data);
        break;
    }
});

// Give subscriber a chance to connect.
await Task.Delay(1000);

await nc.PublishAsync<MyData>(subject: "foo", data: new MyData { Id = 1, Name = "bar" }, serializer: serializer);

await subscriber;

Using Custom Serializer

You can also provide your own serializer by implementing the INatsSerializer<T> interface. This is useful if you need to support a custom serialization format or if you need to support multiple serialization formats.

Here is an example of a custom serializer that uses the Google ProtoBuf serializer to serialize and deserialize:

public class MyProtoBufSerializer<T> : INatsSerializer<T>
{
    public static readonly INatsSerializer<T> Default = new MyProtoBufSerializer<T>();

    public void Serialize(IBufferWriter<byte> bufferWriter, T value)
    {
        if (value is IMessage message)
        {
            message.WriteTo(bufferWriter);
        }
        else
        {
            throw new NatsException($"Can't serialize {typeof(T)}");
        }
    }

    public T? Deserialize(in ReadOnlySequence<byte> buffer)
    {
        if (typeof(T) == typeof(Greeting))
        {
            return (T)(object)Greeting.Parser.ParseFrom(buffer);
        }

        throw new NatsException($"Can't deserialize {typeof(T)}");
    }

    public INatsSerializer<T> CombineWith(INatsSerializer<T> next) => throw new NotImplementedException();
}

public class MyProtoBufSerializerRegistry : INatsSerializerRegistry
{
    public INatsSerialize<T> GetSerializer<T>() => MyProtoBufSerializer<T>.Default;

    public INatsDeserialize<T> GetDeserializer<T>() => MyProtoBufSerializer<T>.Default;
}

You can then use the custom serializer as the default for the connection:

NatsOpts opts = new NatsOpts { SerializerRegistry = new MyProtoBufSerializerRegistry() };

await using NatsClient nc = new NatsClient(opts);

Task subscriber = Task.Run(async () =>
{
    await foreach (NatsMsg<Greeting> msg in nc.SubscribeAsync<Greeting>("foo"))
    {
        // Outputs '{ "id": 42, "name": "Marvin" }'
        Console.WriteLine(msg.Data);
        break;
    }
});

// Give subscriber a chance to connect.
await Task.Delay(1000);

await nc.PublishAsync(subject: "foo", data: new Greeting { Id = 42, Name = "Marvin" });

await subscriber;

Using Multiple Serializers (chaining)

You can also chain multiple serializers together to support multiple serialization formats. The first serializer in the chain that can handle the data will be used. This is useful if you need to support multiple serialization formats and reuse them.

Note that chaining serializers is implemented by convention and not enforced by the INatsSerializer<T> interface since the next serializer would not be exposed to external users of the interface.

Here is an example of a serializer that uses the Google ProtoBuf serializer and the NatsJsonContextSerializer<T> to serialize and deserialize messages based on the type:

public class MixedSerializerRegistry : INatsSerializerRegistry
{
    public INatsSerialize<T> GetSerializer<T>() => new NatsJsonContextSerializer<T>(MyJsonContext.Default, next: MyProtoBufSerializer<T>.Default);

    public INatsDeserialize<T> GetDeserializer<T>() => new NatsJsonContextSerializer<T>(MyJsonContext.Default, next: MyProtoBufSerializer<T>.Default);
}
NatsOpts opts = new NatsOpts { SerializerRegistry = new MixedSerializerRegistry() };

await using NatsClient nc = new NatsClient(opts);

Task subscriber1 = Task.Run(async () =>
{
    await foreach (NatsMsg<Greeting> msg in nc.SubscribeAsync<Greeting>("greet"))
    {
        // Outputs '{ "id": 42, "name": "Marvin" }'
        Console.WriteLine(msg.Data);
        break;
    }
});

Task subscriber2 = Task.Run(async () =>
{
    await foreach (NatsMsg<MyData> msg in nc.SubscribeAsync<MyData>("data"))
    {
        // Outputs 'MyData { Id = 1, Name = bar }'
        Console.WriteLine(msg.Data);
        break;
    }
});

// Give subscribers a chance to connect.
await Task.Delay(1000);

await nc.PublishAsync(subject: "greet", data: new Greeting { Id = 42, Name = "Marvin" });
await nc.PublishAsync(subject: "data", data: new MyData { Id = 1, Name = "Bob" });

await Task.WhenAll(subscriber1, subscriber2);

Dealing with Binary Data and Buffers

The default serializer can handle binary data and buffers. This is typically archived by using IMemoryOwner<byte> implementations. NATS .NET Client provides a NatsMemoryOwner<T> implementation that can be used to allocate buffers. The NatsMemoryOwner<T> and NatsBufferWriter<T> (adapted from .NET Community Toolkit) are IMemoryOwner<byte> and IBufferWriter<T> implementations that use the ArrayPool to allocate buffers. They can be used with the default serializer.

// The default serializer knows how to deal with binary data types like NatsMemoryOwner<byte>.
// So, you can use it without specifying a serializer.
await using NatsClient nc = new NatsClient();

Task subscriber = Task.Run(async () =>
{
    // The default serializer knows how to deal with binary data types like NatsMemoryOwner<byte>.
    await foreach (NatsMsg<NatsMemoryOwner<byte>> msg in nc.SubscribeAsync<NatsMemoryOwner<byte>>("foo"))
    {
        // Check for the end of messages.
        if (msg.Data.Length == 0)
            break;

        // Dispose the memory owner after using it so it can be returned to the pool.
        using NatsMemoryOwner<byte> memoryOwner = msg.Data;

        // Outputs 'Hi'
        Console.WriteLine(Encoding.ASCII.GetString(memoryOwner.Memory.Span));
    }
});

// Give subscriber a chance to connect.
await Task.Delay(1000);

// Don't reuse NatsBufferWriter, it's disposed and returned to the pool
// by the publisher after being written to the network.
NatsBufferWriter<byte> bw = new NatsBufferWriter<byte>();
Memory<byte> memory = bw.GetMemory(2);
memory.Span[0] = (byte)'H';
memory.Span[1] = (byte)'i';
bw.Advance(2);

// Default serializer knows how to deal with binary data types like NatsBufferWriter<byte>.
await nc.PublishAsync(subject: "foo", data: bw);

// Signal the end of messages by sending an empty payload.
await nc.PublishAsync(subject: "foo");

await subscriber;

Advantage of using NatsMemoryOwner<T> and NatsBufferWriter<T> is that they can be used with the default serializer and they can be used to allocate buffers from the ArrayPool<T> which can be reused. This is useful if you need to allocate buffers for binary data and you want to avoid allocating buffers on for every operation (e.g. new byte[]) reducing garbage collection pressure. They may also be useful for example, if your subscription may receive messages with different formats and the only way to determine the format is by reading the message.