Modules

async nats.connect(servers=['nats://localhost:4222'], **options)[source]
Parameters:
  • servers (Union[str, List[str]]) – List of servers to connect.

  • options – NATS connect options.

import asyncio
import nats

async def main():
    # Connect to NATS Server.
    nc = await nats.connect('demo.nats.io')
    await nc.publish('foo', b'Hello World!')
    await nc.flush()
    await nc.close()

if __name__ == '__main__':
    asyncio.run(main())

Asyncio Client

Asyncio based client for NATS.

async nats.aio.client.Client.connect(self, servers=['nats://localhost:4222'], error_cb=None, disconnected_cb=None, closed_cb=None, discovered_server_cb=None, reconnected_cb=None, name=None, pedantic=False, verbose=False, allow_reconnect=True, connect_timeout=2, reconnect_time_wait=2, max_reconnect_attempts=60, ping_interval=120, max_outstanding_pings=2, dont_randomize=False, flusher_queue_size=1024, no_echo=False, tls=None, tls_hostname=None, tls_handshake_first=False, user=None, password=None, token=None, drain_timeout=30, signature_cb=None, user_jwt_cb=None, user_credentials=None, nkeys_seed=None, nkeys_seed_str=None, inbox_prefix=b'_INBOX', pending_size=2097152, flush_timeout=None)

Establishes a connection to NATS.

Parameters:
  • servers (Union[str, List[str]]) – NATS Connection

  • name (Optional[str]) – Label the connection with name (shown in NATS monitoring)

  • error_cb (Optional[Callable[[Exception], Awaitable[None]]]) – Callback to report errors.

  • disconnected_cb (Optional[Callable[[], Awaitable[None]]]) – Callback to report disconnection from NATS.

  • closed_cb (Optional[Callable[[], Awaitable[None]]]) – Callback to report when client stops reconnection to NATS.

  • discovered_server_cb (Optional[Callable[[], Awaitable[None]]]) – Callback to report when a new server joins the cluster.

  • pending_size (int) – Max size of the pending buffer for publishing commands.

  • flush_timeout (Optional[float]) – Max duration to wait for a forced flush to occur.

Connecting setting all callbacks:

import asyncio
import nats

async def main():
    async def disconnected_cb():
        print('Got disconnected!')

    async def reconnected_cb():
        print(f'Got reconnected to {nc.connected_url.netloc}')

    async def error_cb(e):
        print(f'There was an error: {e}')

    async def closed_cb():
        print('Connection is closed')

    # Connect to NATS with logging callbacks.
    nc = await nats.connect('demo.nats.io',
                             error_cb=error_cb,
                             reconnected_cb=reconnected_cb,
                             disconnected_cb=disconnected_cb,
                             closed_cb=closed_cb,
                             )

    async def handler(msg):
        print(f'Received a message on {msg.subject} {msg.reply}: {msg.data}')
        await msg.respond(b'OK')

    sub = await nc.subscribe('help.please', cb=handler)

    resp = await nc.request('help.please', b'help')
    print('Response:', resp)

    await nc.close()

if __name__ == '__main__':
    asyncio.run(main())

Using a context manager:

import asyncio
import nats

async def main():

    is_done = asyncio.Future()

    async def closed_cb():
        print('Connection to NATS is closed.')
        is_done.set_result(True)

    async with (await nats.connect('nats://demo.nats.io:4222', closed_cb=closed_cb)) as nc:
        print(f'Connected to NATS at {nc.connected_url.netloc}...')

        async def subscribe_handler(msg):
            subject = msg.subject
            reply = msg.reply
            data = msg.data.decode()
            print('Received a message on '{subject} {reply}': {data}'.format(
                subject=subject, reply=reply, data=data))

        await nc.subscribe('discover', cb=subscribe_handler)
        await nc.flush()

        for i in range(0, 10):
            await nc.publish('discover', b'hello world')
            await asyncio.sleep(0.1)

    await asyncio.wait_for(is_done, 60.0)

if __name__ == '__main__':
    asyncio.run(main())
async nats.aio.client.Client.publish(self, subject, payload=b'', reply='', headers=None)

Publishes a NATS message.

Parameters:
  • subject (str) – Subject to which the message will be published.

  • payload (bytes) – Message data.

  • reply (str) – Inbox to which a responder can respond.

  • headers (Optional[Dict[str, str]]) – Optional message header.

import asyncio
import nats

async def main():
    nc = await nats.connect('demo.nats.io')

    # Publish as message with an inbox.
    inbox = nc.new_inbox()
    sub = await nc.subscribe('hello')

    # Simple publishing
    await nc.publish('hello', b'Hello World!')

    # Publish with a reply
    await nc.publish('hello', b'Hello World!', reply=inbox)

    # Publish with headers
    await nc.publish('hello', b'With Headers', headers={'Foo':'Bar'})

    while True:
        try:
            msg = await sub.next_msg()
        except:
            break
        print('----------------------')
        print('Subject:', msg.subject)
        print('Reply  :', msg.reply)
        print('Data   :', msg.data)
        print('Headers:', msg.header)

if __name__ == '__main__':
    asyncio.run(main())
async nats.aio.client.Client.subscribe(self, subject, queue='', cb=None, future=None, max_msgs=0, pending_msgs_limit=524288, pending_bytes_limit=134217728)

subscribe registers interest in a given subject.

If a callback is provided, messages will be processed asychronously.

If a callback isn’t provided, messages can be retrieved via an asynchronous iterator on the returned subscription object.

async nats.aio.client.Client.flush(self, timeout=10)

Sends a ping to the server expecting a pong back ensuring what we have written so far has made it to the server and also enabling measuring of roundtrip time. In case a pong is not returned within the allowed timeout, then it will raise nats.errors.TimeoutError

async nats.aio.client.Client.close(self)

Closes the socket to which we are connected and sets the client to be in the CLOSED state. No further reconnections occur once reaching this point.

async nats.aio.client.Client.drain(self)

drain will put a connection into a drain state. All subscriptions will immediately be put into a drain state. Upon completion, the publishers will be drained and can not publish any additional messages. Upon draining of the publishers, the connection will be closed. Use the closed_cb option to know when the connection has moved from draining to closed.

nats.aio.client.Client.new_inbox(self)

new_inbox returns a unique inbox that can be used for NATS requests or subscriptions:

# Create unique subscription to receive direct messages.
inbox = nc.new_inbox()
sub = await nc.subscribe(inbox)
nc.publish('broadcast', b'', reply=inbox)
msg = sub.next_msg()

Subscription

class nats.aio.subscription.Subscription(conn, id=0, subject='', queue='', cb=None, future=None, max_msgs=0, pending_msgs_limit=524288, pending_bytes_limit=134217728)[source]

A Subscription represents interest in a particular subject.

A Subscription should not be constructed directly, rather connection.subscribe() should be used to get a subscription.

nc = await nats.connect()

# Async Subscription
async def cb(msg):
  print('Received', msg)
await nc.subscribe('foo', cb=cb)

# Sync Subscription
sub = nc.subscribe('foo')
msg = await sub.next_msg()
print('Received', msg)
property subject: str

Returns the subject of the Subscription.

property queue: str

Returns the queue name of the Subscription if part of a queue group.

property messages: AsyncIterator[Msg]

Retrieves an async iterator for the messages from the subscription.

This is only available if a callback isn’t provided when creating a subscription.

nc = await nats.connect()
sub = await nc.subscribe('foo')
# Use `async for` which implicitly awaits messages
async for msg in sub.messages:
    print('Received', msg)
property pending_msgs: int

Number of delivered messages by the NATS Server that are being buffered in the pending queue.

property pending_bytes: int

Size of data sent by the NATS Server that is being buffered in the pending queue.

property delivered: int

Number of delivered messages to this subscription so far.

async next_msg(timeout=1.0)[source]
Params timeout:

Time in seconds to wait for next message before timing out.

Raises:

nats.errors.TimeoutError

next_msg can be used to retrieve the next message from a stream of messages using await syntax, this only works when not passing a callback on subscribe:

sub = await nc.subscribe('hello')
msg = await sub.next_msg(timeout=1)
async drain()[source]

Removes interest in a subject, but will process remaining messages.

async unsubscribe(limit=0)[source]
Parameters:

limit (int) – Max number of messages to receive before unsubscribing.

Removes interest in a subject, remaining messages will be discarded.

If limit is greater than zero, interest is not immediately removed, rather, interest will be automatically removed after limit messages are received.

Msg

class nats.aio.msg.Msg(_client, subject='', reply='', data=b'', headers=None, _metadata=None, _ackd=False, _sid=None)[source]

Msg represents a message delivered by NATS.

property header: Dict[str, str] | None

header returns the headers from a message.

property sid: int

sid returns the subscription ID from a message.

async respond(data)[source]

respond replies to the inbox of the message if there is one.

async ack()[source]

ack acknowledges a message delivered by JetStream.

async ack_sync(timeout=1.0)[source]

ack_sync waits for the acknowledgement to be processed by the server.

async nak(delay=None)[source]

nak negatively acknowledges a message delivered by JetStream triggering a redelivery. if delay is provided, redelivery is delayed for delay seconds

async in_progress()[source]

in_progress acknowledges a message delivered by JetStream is still being worked on. Unlike other types of acks, an in-progress ack (+WPI) can be done multiple times.

async term()[source]

term terminates a message delivered by JetStream and disables redeliveries.

property metadata: Metadata

metadata returns the Metadata of a JetStream message.

class Metadata(sequence, num_pending, num_delivered, timestamp, stream, consumer, domain=None)[source]

Metadata is the metadata from a JetStream message.

  • num_pending is the number of available messages in the Stream that have not been consumed yet.

  • num_delivered is the number of times that this message has been delivered. For example, num_delivered higher than one means that there have been redeliveries.

  • timestamp is the time at which the message was delivered.

  • stream is the name of the stream.

  • consumer is the name of the consumer.

class SequencePair(consumer, stream)[source]

SequencePair represents a pair of consumer and stream sequence.

Connection Properties

property Client.servers: List[ParseResult]
property Client.discovered_servers: List[ParseResult]
property Client.max_payload: int

Returns the max payload which we received from the servers INFO

property Client.connected_url: ParseResult | None
property Client.client_id: int | None

Returns the client id which we received from the servers INFO

property Client.pending_data_size: int

Connection State

property Client.is_connected: bool
property Client.is_closed: bool
property Client.is_connecting: bool
property Client.is_reconnecting: bool
property Client.is_draining: bool
property Client.is_draining_pubs: bool
property Client.last_error: Exception | None

Returns the last error which may have occurred.

JetStream

nats.aio.client.Client.jetstream(self, **opts)

jetstream returns a context that can be used to produce and consume messages from NATS JetStream.

Parameters:
  • prefix – Default JetStream API Prefix.

  • domain – Optional domain used by the JetStream API.

  • timeout – Timeout for all JS API actions.

import asyncio
import nats

async def main():
    nc = await nats.connect()
    js = nc.jetstream()

    await js.add_stream(name='hello', subjects=['hello'])
    ack = await js.publish('hello', b'Hello JS!')
    print(f'Ack: stream={ack.stream}, sequence={ack.seq}')
    # Ack: stream=hello, sequence=1
    await nc.close()

if __name__ == '__main__':
    asyncio.run(main())
async nats.js.client.JetStreamContext.publish(self, subject, payload=b'', timeout=None, stream=None, headers=None)

publish emits a new message to JetStream.

async nats.js.client.JetStreamContext.subscribe(self, subject, queue=None, cb=None, durable=None, stream=None, config=None, manual_ack=False, ordered_consumer=False, idle_heartbeat=None, flow_control=False, pending_msgs_limit=524288, pending_bytes_limit=268435456, deliver_policy=None, headers_only=None, inactive_threshold=None)

Create consumer if needed and push-subscribe to it.

  1. Check if consumer exists.

  2. Creates consumer if needed.

  3. Calls subscribe_bind.

Parameters:
  • subject (str) – Subject from a stream from JetStream.

  • queue (Optional[str]) – Deliver group name from a set a of queue subscribers.

  • durable (Optional[str]) – Name of the durable consumer to which the the subscription should be bound.

  • stream (Optional[str]) – Name of the stream to which the subscription should be bound. If not set, then the client will automatically look it up based on the subject.

  • manual_ack (bool) – Disables auto acking for async subscriptions.

  • ordered_consumer (bool) – Enable ordered consumer mode.

  • idle_heartbeat (Optional[float]) – Enable Heartbeats for a consumer to detect failures.

  • flow_control (bool) – Enable Flow Control for a consumer.

import asyncio
import nats

async def main():
    nc = await nats.connect()
    js = nc.jetstream()

    await js.add_stream(name='hello', subjects=['hello'])
    await js.publish('hello', b'Hello JS!')

    async def cb(msg):
      print('Received:', msg)

    # Ephemeral Async Subscribe
    await js.subscribe('hello', cb=cb)

    # Durable Async Subscribe
    # NOTE: Only one subscription can be bound to a durable name. It also auto acks by default.
    await js.subscribe('hello', cb=cb, durable='foo')

    # Durable Sync Subscribe
    # NOTE: Sync subscribers do not auto ack.
    await js.subscribe('hello', durable='bar')

    # Queue Async Subscribe
    # NOTE: Here 'workers' becomes deliver_group, durable name and queue name.
    await js.subscribe('hello', 'workers', cb=cb)

if __name__ == '__main__':
    asyncio.run(main())
async nats.js.client.JetStreamContext.pull_subscribe(self, subject, durable=None, stream=None, config=None, pending_msgs_limit=524288, pending_bytes_limit=268435456, inbox_prefix=b'_INBOX.')

Create consumer and pull subscription.

  1. Find stream name by subject if stream is not passed.

  2. Create consumer with the given config if not created.

  3. Call pull_subscribe_bind.

import asyncio
import nats

async def main():
    nc = await nats.connect()
    js = nc.jetstream()

    await js.add_stream(name='mystream', subjects=['foo'])
    await js.publish('foo', b'Hello World!')

    sub = await js.pull_subscribe('foo', stream='mystream')

    msgs = await sub.fetch()
    msg = msgs[0]
    await msg.ack()

    await nc.close()

if __name__ == '__main__':
    asyncio.run(main())
class nats.js.client.JetStreamContext.PullSubscription(js, sub, stream, consumer, deliver)

PullSubscription is a subscription that can fetch messages.

async consumer_info()

consumer_info gets the current info of the consumer from this subscription.

property delivered: int

Number of delivered messages to this subscription so far.

async fetch(batch=1, timeout=5)

fetch makes a request to JetStream to be delivered a set of messages.

Parameters:
  • batch (int) – Number of messages to fetch from server.

  • timeout (Optional[float]) – Max duration of the fetch request before it expires.

import asyncio
import nats

async def main():
    nc = await nats.connect()
    js = nc.jetstream()

    await js.add_stream(name='mystream', subjects=['foo'])
    await js.publish('foo', b'Hello World!')

    msgs = await sub.fetch(5)
    for msg in msgs:
      await msg.ack()

    await nc.close()

if __name__ == '__main__':
    asyncio.run(main())
property pending_bytes: int

Size of data sent by the NATS Server that is being buffered in the pending queue.

property pending_msgs: int

Number of delivered messages by the NATS Server that are being buffered in the pending queue.

async unsubscribe()

unsubscribe destroys the inboxes of the pull subscription making it unable to continue to receive messages.

Manager

class nats.js.manager.JetStreamManager(conn, prefix='$JS.API', timeout=5)[source]

JetStreamManager exposes management APIs for JetStream.

async find_stream_name_by_subject(subject)[source]

Find the stream to which a subject belongs in an account.

async stream_info(name, subjects_filter=None)[source]

Get the latest StreamInfo by stream name.

async add_stream(config=None, **params)[source]

add_stream creates a stream.

async update_stream(config=None, **params)[source]

update_stream updates a stream.

async delete_stream(name)[source]

Delete a stream by name.

async purge_stream(name, seq=None, subject=None, keep=None)[source]

Purge a stream by name.

async streams_info()[source]

streams_info retrieves a list of streams.

async consumers_info(stream, offset=None)[source]

consumers_info retrieves a list of consumers. Consumers list limit is 256 for more consider to use offset :type stream: str :param stream: stream to get consumers :type offset: Optional[int] :param offset: consumers list offset

async get_msg(stream_name, seq=None, subject=None, direct=False, next=False)[source]

get_msg retrieves a message from a stream.

async delete_msg(stream_name, seq)[source]

delete_msg retrieves a message from a stream based on the sequence ID.

async get_last_msg(stream_name, subject, direct=False)[source]

get_last_msg retrieves the last message from a stream.

KeyValue

class nats.js.kv.KeyValue(name, stream, pre, js, direct)[source]

KeyValue uses the JetStream KeyValue functionality.

Note

This functionality is EXPERIMENTAL and may be changed in later releases.

import asyncio
import nats

async def main():
    nc = await nats.connect()
    js = nc.jetstream()

    # Create a KV
    kv = await js.create_key_value(bucket='MY_KV')

    # Set and retrieve a value
    await kv.put('hello', b'world')
    entry = await kv.get('hello')
    print(f'KeyValue.Entry: key={entry.key}, value={entry.value}')
    # KeyValue.Entry: key=hello, value=world

    await nc.close()

if __name__ == '__main__':
    asyncio.run(main())
class Entry(bucket, key, value, revision, delta, created, operation)[source]

An entry from a KeyValue store in JetStream.

class BucketStatus(stream_info, bucket)[source]

BucketStatus is the status of a KeyValue bucket.

property values: int

values returns the number of stored messages in the stream.

property history: int

history returns the max msgs per subject.

property ttl: float | None

ttl returns the max age in seconds.

async get(key, revision=None)[source]

get returns the latest value for the key.

async put(key, value)[source]

put will place the new value for the key into the store and return the revision number.

async create(key, value)[source]

create will add the key/value pair iff it does not exist.

async update(key, value, last=None)[source]

update will update the value iff the latest revision matches.

async delete(key, last=None)[source]

delete will place a delete marker and remove all previous revisions.

async purge(key)[source]

purge will remove the key and all revisions.

async purge_deletes(olderthan=1800)[source]

purge will remove all current delete markers older. :type olderthan: int :param olderthan: time in seconds

async status()[source]

status retrieves the status and configuration of a bucket.

async watchall(**kwargs)[source]

watchall returns a KeyValue watcher that matches all the keys.

async keys(**kwargs)[source]

keys will return a list of the keys from a KeyValue store.

async history(key)[source]

history retrieves a list of the entries so far.

async watch(keys, headers_only=False, include_history=False, ignore_deletes=False, meta_only=False, inactive_threshold=None)[source]

watch will fire a callback when a key that matches the keys pattern is updated. The first update after starting the watch is None in case there are no pending updates.

API

class nats.js.api.Header(value, names=None, *values, module=None, qualname=None, type=None, start=1, boundary=None)[source]
class nats.js.api.StatusCode(value, names=None, *values, module=None, qualname=None, type=None, start=1, boundary=None)[source]
class nats.js.api.Base[source]

Helper dataclass to filter unknown fields from the API.

classmethod from_response(resp)[source]

Read the class instance from a server response.

Unknown fields are ignored (“open-world assumption”).

evolve(**params)[source]

Return a copy of the instance with the passed values replaced.

as_dict()[source]

Return the object converted into an API-friendly dict.

class nats.js.api.PubAck(stream, seq, domain=None, duplicate=None)[source]

PubAck is the response of publishing a message to JetStream.

class nats.js.api.Placement(cluster=None, tags=None)[source]

Placement directives to consider when placing replicas of this stream

class nats.js.api.ExternalStream(api, deliver=None)[source]
class nats.js.api.StreamSource(name, opt_start_seq=None, filter_subject=None, external=None)[source]
classmethod from_response(resp)[source]

Read the class instance from a server response.

Unknown fields are ignored (“open-world assumption”).

class nats.js.api.StreamSourceInfo(name, lag=None, active=None, error=None)[source]
class nats.js.api.LostStreamData(msgs=None, bytes=None)[source]
class nats.js.api.StreamState(messages, bytes, first_seq, last_seq, consumer_count, deleted=None, num_deleted=None, lost=None, subjects=None)[source]
classmethod from_response(resp)[source]

Read the class instance from a server response.

Unknown fields are ignored (“open-world assumption”).

class nats.js.api.RetentionPolicy(value, names=None, *values, module=None, qualname=None, type=None, start=1, boundary=None)[source]

How message retention is considered

class nats.js.api.StorageType(value, names=None, *values, module=None, qualname=None, type=None, start=1, boundary=None)[source]

The type of storage backend

class nats.js.api.DiscardPolicy(value, names=None, *values, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Discard policy when a stream reaches its limits

class nats.js.api.RePublish(src=None, dest=None, headers_only=None)[source]

RePublish is for republishing messages once committed to a stream. The original subject cis remapped from the subject pattern to the destination pattern.

class nats.js.api.StreamConfig(name=None, description=None, subjects=None, retention=None, max_consumers=None, max_msgs=None, max_bytes=None, discard=DiscardPolicy.OLD, max_age=None, max_msgs_per_subject=-1, max_msg_size=-1, storage=None, num_replicas=None, no_ack=False, template_owner=None, duplicate_window=0, placement=None, mirror=None, sources=None, sealed=False, deny_delete=False, deny_purge=False, allow_rollup_hdrs=False, republish=None, allow_direct=None, mirror_direct=None)[source]

StreamConfig represents the configuration of a stream.

classmethod from_response(resp)[source]

Read the class instance from a server response.

Unknown fields are ignored (“open-world assumption”).

as_dict()[source]

Return the object converted into an API-friendly dict.

class nats.js.api.PeerInfo(name=None, current=None, offline=None, active=None, lag=None)[source]
class nats.js.api.ClusterInfo(leader=None, name=None, replicas=None)[source]
classmethod from_response(resp)[source]

Read the class instance from a server response.

Unknown fields are ignored (“open-world assumption”).

class nats.js.api.StreamInfo(config, state, mirror=None, sources=None, cluster=None, did_create=None)[source]

StreamInfo is the latest information about a stream from JetStream.

classmethod from_response(resp)[source]

Read the class instance from a server response.

Unknown fields are ignored (“open-world assumption”).

class nats.js.api.AckPolicy(value, names=None, *values, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Policies defining how messages should be acknowledged.

If an ack is required but is not received within the AckWait window, the message will be redelivered.

References

class nats.js.api.DeliverPolicy(value, names=None, *values, module=None, qualname=None, type=None, start=1, boundary=None)[source]

When a consumer is first created, it can specify where in the stream it wants to start receiving messages.

This is the DeliverPolicy, and this enumeration defines allowed values.

References

class nats.js.api.ReplayPolicy(value, names=None, *values, module=None, qualname=None, type=None, start=1, boundary=None)[source]
The replay policy applies when the DeliverPolicy is one of:
  • all

  • by_start_sequence

  • by_start_time

since those deliver policies begin reading the stream at a position other than the end.

References

class nats.js.api.ConsumerConfig(name=None, durable_name=None, description=None, deliver_policy=DeliverPolicy.ALL, opt_start_seq=None, opt_start_time=None, ack_policy=AckPolicy.EXPLICIT, ack_wait=None, max_deliver=None, filter_subject=None, filter_subjects=None, replay_policy=ReplayPolicy.INSTANT, rate_limit_bps=None, sample_freq=None, max_waiting=None, max_ack_pending=None, flow_control=None, idle_heartbeat=None, headers_only=None, deliver_subject=None, deliver_group=None, inactive_threshold=None, num_replicas=None, mem_storage=None)[source]

Consumer configuration.

References

classmethod from_response(resp)[source]

Read the class instance from a server response.

Unknown fields are ignored (“open-world assumption”).

as_dict()[source]

Return the object converted into an API-friendly dict.

class nats.js.api.SequenceInfo(consumer_seq, stream_seq)[source]
class nats.js.api.ConsumerInfo(name, stream_name, config, delivered=None, ack_floor=None, num_ack_pending=None, num_redelivered=None, num_waiting=None, num_pending=None, cluster=None, push_bound=None)[source]

ConsumerInfo represents the info about the consumer.

classmethod from_response(resp)[source]

Read the class instance from a server response.

Unknown fields are ignored (“open-world assumption”).

class nats.js.api.AccountLimits(max_memory, max_storage, max_streams, max_consumers, max_ack_pending, memory_max_stream_bytes, storage_max_stream_bytes, max_bytes_required)[source]

Account limits

References

class nats.js.api.Tier(memory, storage, streams, consumers, limits)[source]
classmethod from_response(resp)[source]

Read the class instance from a server response.

Unknown fields are ignored (“open-world assumption”).

class nats.js.api.APIStats(total, errors)[source]

API stats

class nats.js.api.AccountInfo(memory, storage, streams, consumers, limits, api, domain=None, tiers=None)[source]

Account information

References

classmethod from_response(resp)[source]

Read the class instance from a server response.

Unknown fields are ignored (“open-world assumption”).

class nats.js.api.RawStreamMsg(subject=None, seq=None, data=None, hdrs=None, headers=None, stream=None)[source]
property header: Dict | None

header returns the headers from a message.

class nats.js.api.KeyValueConfig(bucket, description=None, max_value_size=None, history=1, ttl=None, max_bytes=None, storage=None, replicas=1, placement=None, republish=None, direct=None)[source]

KeyValueConfig is the configuration of a KeyValue store.

as_dict()[source]

Return the object converted into an API-friendly dict.

class nats.js.api.StreamPurgeRequest(seq=None, filter=None, keep=None)[source]

StreamPurgeRequest is optional request information to the purge API.

class nats.js.api.ObjectStoreConfig(bucket=None, description=None, ttl=None, max_bytes=None, storage=None, replicas=1, placement=None)[source]

ObjectStoreConfig is the configurigation of an ObjectStore.

as_dict()[source]

Return the object converted into an API-friendly dict.

ObjectLink is used to embed links to other buckets and objects.

classmethod from_response(resp)[source]

Read the class instance from a server response.

Unknown fields are ignored (“open-world assumption”).

class nats.js.api.ObjectMetaOptions(link=None, max_chunk_size=None)[source]
classmethod from_response(resp)[source]

Read the class instance from a server response.

Unknown fields are ignored (“open-world assumption”).

class nats.js.api.ObjectMeta(name=None, description=None, headers=None, options=None)[source]

ObjectMeta is high level information about an object.

classmethod from_response(resp)[source]

Read the class instance from a server response.

Unknown fields are ignored (“open-world assumption”).

class nats.js.api.ObjectInfo(name, bucket, nuid, size=None, mtime=None, chunks=None, digest=None, deleted=False, description=None, headers=None, options=None)[source]

ObjectInfo is meta plus instance information.

classmethod from_response(resp)[source]

Read the class instance from a server response.

Unknown fields are ignored (“open-world assumption”).

NUID

class nats.nuid.NUID[source]

NUID is an implementation of the approach for fast generation of unique identifiers used for inboxes in NATS.

next()[source]

next returns the next unique identifier.

Errors

exception nats.errors.Error[source]
exception nats.errors.TimeoutError[source]
exception nats.errors.NoRespondersError[source]
exception nats.errors.StaleConnectionError[source]
exception nats.errors.OutboundBufferLimitError[source]
exception nats.errors.UnexpectedEOF[source]
exception nats.errors.FlushTimeoutError[source]
exception nats.errors.ConnectionClosedError[source]
exception nats.errors.SecureConnRequiredError[source]
exception nats.errors.SecureConnWantedError[source]
exception nats.errors.SecureConnFailedError[source]
exception nats.errors.BadSubscriptionError[source]
exception nats.errors.BadSubjectError[source]
exception nats.errors.SlowConsumerError(subject, reply, sid, sub)[source]
exception nats.errors.BadTimeoutError[source]
exception nats.errors.AuthorizationError[source]
exception nats.errors.NoServersError[source]
exception nats.errors.JsonParseError[source]
exception nats.errors.MaxPayloadError[source]
exception nats.errors.DrainTimeoutError[source]
exception nats.errors.ConnectionDrainingError[source]
exception nats.errors.ConnectionReconnectingError[source]
exception nats.errors.InvalidUserCredentialsError[source]
exception nats.errors.InvalidCallbackTypeError[source]
exception nats.errors.ProtocolError[source]
exception nats.errors.NotJSMessageError[source]

When it is attempted to use an API meant for JetStream on a message that does not belong to a stream.

exception nats.errors.MsgAlreadyAckdError(msg=None)[source]

JetStream Errors

exception nats.js.errors.Error(description=None)[source]

An Error raised by the NATS client when using JetStream.

exception nats.js.errors.APIError(code=None, description=None, err_code=None, stream=None, seq=None)[source]

An Error that is the result of interacting with NATS JetStream.

exception nats.js.errors.ServiceUnavailableError(code=None, description=None, err_code=None, stream=None, seq=None)[source]

A 503 error

exception nats.js.errors.ServerError(code=None, description=None, err_code=None, stream=None, seq=None)[source]

A 500 error

exception nats.js.errors.NotFoundError(code=None, description=None, err_code=None, stream=None, seq=None)[source]

A 404 error

exception nats.js.errors.BadRequestError(code=None, description=None, err_code=None, stream=None, seq=None)[source]

A 400 error.

exception nats.js.errors.NoStreamResponseError(description=None)[source]

Raised if the client gets a 503 when publishing a message.

exception nats.js.errors.ConsumerSequenceMismatchError(stream_resume_sequence=None, consumer_sequence=None, last_consumer_sequence=None)[source]

Async error raised by the client with idle_heartbeat mode enabled when one of the message sequences is not the expected one.

exception nats.js.errors.BucketNotFoundError(code=None, description=None, err_code=None, stream=None, seq=None)[source]

When attempted to bind to a JetStream KeyValue that does not exist.

exception nats.js.errors.BadBucketError(code=None, description=None, err_code=None, stream=None, seq=None)[source]
exception nats.js.errors.KeyValueError(code=None, description=None, err_code=None, stream=None, seq=None)[source]

Raised when there is an issue interacting with the KeyValue store.

exception nats.js.errors.KeyDeletedError(entry=None, op=None)[source]

Raised when trying to get a key that was deleted from a JetStream KeyValue store.

exception nats.js.errors.KeyNotFoundError(entry=None, op=None, message=None)[source]

Raised when trying to get a key that does not exists from a JetStream KeyValue store.

exception nats.js.errors.KeyWrongLastSequenceError(description=None)[source]

Raised when trying to update a key with the wrong last sequence.

exception nats.js.errors.NoKeysError(code=None, description=None, err_code=None, stream=None, seq=None)[source]
exception nats.js.errors.KeyHistoryTooLargeError(code=None, description=None, err_code=None, stream=None, seq=None)[source]
exception nats.js.errors.InvalidBucketNameError(description=None)[source]

Raised when trying to create a KV or OBJ bucket with invalid name.

exception nats.js.errors.InvalidObjectNameError(description=None)[source]

Raised when trying to put an object in Object Store with invalid key.

exception nats.js.errors.BadObjectMetaError(description=None)[source]

Raised when trying to read corrupted metadata from Object Store.

exception nats.js.errors.LinkIsABucketError(description=None)[source]

Raised when trying to get object from Object Store that is a bucket.

exception nats.js.errors.DigestMismatchError(description=None)[source]

Raised when getting an object from Object Store that has a different digest than expected.

exception nats.js.errors.ObjectNotFoundError(code=None, description=None, err_code=None, stream=None, seq=None)[source]

When attempted to lookup an Object that does not exist.

exception nats.js.errors.ObjectDeletedError(code=None, description=None, err_code=None, stream=None, seq=None)[source]

When attempted to do an operation to an Object that does not exist.

exception nats.js.errors.ObjectAlreadyExists(description=None)[source]

When attempted to do an operation to an Object that already exist.

Deprecated Errors

Catching the following errors will be removed eventually, please use the recommended alternative error instead.

exception nats.aio.errors.NatsError[source]

Deprecated since version v2.0.0.

Please use nats.errors.Error instead.

exception nats.aio.errors.ErrConnectionClosed[source]

Deprecated since version v2.0.0.

Please use nats.errors.ConnectionClosedError instead.

exception nats.aio.errors.ErrDrainTimeout[source]

Deprecated since version v2.0.0.

Please use nats.errors.DrainTimeoutError instead.

exception nats.aio.errors.ErrInvalidUserCredentials[source]

Deprecated since version v2.0.0.

Please use nats.errors.InvalidUserCredentialsError instead.

exception nats.aio.errors.ErrInvalidCallbackType[source]

Deprecated since version v2.0.0.

Please use nats.errors.InvalidCallbackTypeError instead.

exception nats.aio.errors.ErrConnectionReconnecting[source]

Deprecated since version v2.0.0.

Please use nats.errors.ConnectionReconnectingError instead.

exception nats.aio.errors.ErrConnectionDraining[source]

Deprecated since version v2.0.0.

Please use nats.errors.ConnectionDrainingError instead.

exception nats.aio.errors.ErrMaxPayload[source]

Deprecated since version v2.0.0.

Please use nats.errors.MaxPayloadError instead.

exception nats.aio.errors.ErrStaleConnection[source]

Deprecated since version v2.0.0.

Please use nats.errors.StaleConnectionError instead.

exception nats.aio.errors.ErrJsonParse[source]

Deprecated since version v2.0.0.

Please use nats.errors.JsonParseError instead.

exception nats.aio.errors.ErrSecureConnRequired[source]

Deprecated since version v2.0.0.

Please use nats.errors.SecureConnRequiredError instead.

exception nats.aio.errors.ErrSecureConnWanted[source]

Deprecated since version v2.0.0.

Please use nats.errors.SecureConnWantedError instead.

exception nats.aio.errors.ErrSecureConnFailed[source]

Deprecated since version v2.0.0.

Please use nats.errors.SecureConnFailedError instead.

exception nats.aio.errors.ErrBadSubscription[source]

Deprecated since version v2.0.0.

Please use nats.errors.BadSubscriptionError instead.

exception nats.aio.errors.ErrBadSubject[source]

Deprecated since version v2.0.0.

Please use nats.errors.BadSubjectError instead.

exception nats.aio.errors.ErrSlowConsumer(subject, reply, sid, sub)[source]

Deprecated since version v2.0.0.

Please use nats.errors.SlowConsumerError instead.

exception nats.aio.errors.ErrTimeout[source]

Deprecated since version v2.0.0.

Please use nats.errors.TimeoutError instead.

exception nats.aio.errors.ErrBadTimeout[source]

Deprecated since version v2.0.0.

Please use nats.errors.BadTimeoutError instead.

exception nats.aio.errors.ErrAuthorization[source]

Deprecated since version v2.0.0.

Please use nats.errors.AuthorizationError instead.

exception nats.aio.errors.ErrNoServers[source]

Deprecated since version v2.0.0.

Please use nats.errors.NoServersError instead.