Modules

async nats.connect(servers=['nats://localhost:4222'], **options) Client[source]
Parameters:
  • servers (Union[str, List[str]], default: ['nats://localhost:4222']) – List of servers to connect.

  • options – NATS connect options.

Return type:

Client

:: :rtype: Client

import asyncio import nats

async def main():

# Connect to NATS Server. nc = await nats.connect(‘demo.nats.io’) await nc.publish(‘foo’, b’Hello World!’) await nc.flush() await nc.close()

if __name__ == ‘__main__’:

asyncio.run(main())

Asyncio Client

Asyncio based client for NATS.

async nats.aio.client.Client.connect(self, servers=['nats://localhost:4222'], error_cb=None, disconnected_cb=None, closed_cb=None, discovered_server_cb=None, reconnected_cb=None, name=None, pedantic=False, verbose=False, allow_reconnect=True, connect_timeout=2, reconnect_time_wait=2, max_reconnect_attempts=60, ping_interval=120, max_outstanding_pings=2, dont_randomize=False, flusher_queue_size=1024, no_echo=False, tls=None, tls_hostname=None, tls_handshake_first=False, user=None, password=None, token=None, drain_timeout=30, signature_cb=None, user_jwt_cb=None, user_credentials=None, nkeys_seed=None, nkeys_seed_str=None, inbox_prefix=b'_INBOX', pending_size=2097152, flush_timeout=None) None

Establishes a connection to NATS.

Parameters:
  • servers (Union[str, List[str]], default: ['nats://localhost:4222']) – NATS Connection

  • name (Optional[str], default: None) – Label the connection with name (shown in NATS monitoring)

  • error_cb (Optional[Callable[[Exception], Awaitable[None]]], default: None) – Callback to report errors.

  • disconnected_cb (Optional[Callable[[], Awaitable[None]]], default: None) – Callback to report disconnection from NATS.

  • closed_cb (Optional[Callable[[], Awaitable[None]]], default: None) – Callback to report when client stops reconnection to NATS.

  • discovered_server_cb (Optional[Callable[[], Awaitable[None]]], default: None) – Callback to report when a new server joins the cluster.

  • pending_size (int, default: 2097152) – Max size of the pending buffer for publishing commands.

  • flush_timeout (Optional[float], default: None) – Max duration to wait for a forced flush to occur.

  • reconnected_cb (Callable[[], Awaitable[None]] | None)

  • pedantic (bool)

  • verbose (bool)

  • allow_reconnect (bool)

  • connect_timeout (int)

  • reconnect_time_wait (int)

  • max_reconnect_attempts (int)

  • ping_interval (int)

  • max_outstanding_pings (int)

  • dont_randomize (bool)

  • flusher_queue_size (int)

  • no_echo (bool)

  • tls (SSLContext | None)

  • tls_hostname (str | None)

  • tls_handshake_first (bool)

  • user (str | None)

  • password (str | None)

  • token (str | None)

  • drain_timeout (int)

  • signature_cb (Callable[[str], bytes] | None)

  • user_jwt_cb (Callable[[], bytearray | bytes] | None)

  • user_credentials (str | Tuple[str, str] | RawCredentials | Path | None)

  • nkeys_seed (str | None)

  • nkeys_seed_str (str | None)

  • inbox_prefix (str | bytes)

Return type:

None

Connecting setting all callbacks:

import asyncio
import nats

async def main():
    async def disconnected_cb():
        print('Got disconnected!')

    async def reconnected_cb():
        print(f'Got reconnected to {nc.connected_url.netloc}')

    async def error_cb(e):
        print(f'There was an error: {e}')

    async def closed_cb():
        print('Connection is closed')

    # Connect to NATS with logging callbacks.
    nc = await nats.connect('demo.nats.io',
                             error_cb=error_cb,
                             reconnected_cb=reconnected_cb,
                             disconnected_cb=disconnected_cb,
                             closed_cb=closed_cb,
                             )

    async def handler(msg):
        print(f'Received a message on {msg.subject} {msg.reply}: {msg.data}')
        await msg.respond(b'OK')

    sub = await nc.subscribe('help.please', cb=handler)

    resp = await nc.request('help.please', b'help')
    print('Response:', resp)

    await nc.close()

if __name__ == '__main__':
    asyncio.run(main())

Using a context manager:

import asyncio
import nats

async def main():

    is_done = asyncio.Future()

    async def closed_cb():
        print('Connection to NATS is closed.')
        is_done.set_result(True)

    async with (await nats.connect('nats://demo.nats.io:4222', closed_cb=closed_cb)) as nc:
        print(f'Connected to NATS at {nc.connected_url.netloc}...')

        async def subscribe_handler(msg):
            subject = msg.subject
            reply = msg.reply
            data = msg.data.decode()
            print('Received a message on '{subject} {reply}': {data}'.format(
                subject=subject, reply=reply, data=data))

        await nc.subscribe('discover', cb=subscribe_handler)
        await nc.flush()

        for i in range(0, 10):
            await nc.publish('discover', b'hello world')
            await asyncio.sleep(0.1)

    await asyncio.wait_for(is_done, 60.0)

if __name__ == '__main__':
    asyncio.run(main())
async nats.aio.client.Client.publish(self, subject, payload=b'', reply='', headers=None) None

Publishes a NATS message.

Parameters:
  • subject (str) – Subject to which the message will be published.

  • payload (bytes, default: b'') – Message data.

  • reply (str, default: '') – Inbox to which a responder can respond.

  • headers (Optional[Dict[str, str]], default: None) – Optional message header.

Return type:

None

:: :rtype: None

import asyncio import nats

async def main():

nc = await nats.connect(‘demo.nats.io’)

# Publish as message with an inbox. inbox = nc.new_inbox() sub = await nc.subscribe(‘hello’)

# Simple publishing await nc.publish(‘hello’, b’Hello World!’)

# Publish with a reply await nc.publish(‘hello’, b’Hello World!’, reply=inbox)

# Publish with headers await nc.publish(‘hello’, b’With Headers’, headers={‘Foo’:’Bar’})

while True:
try:

msg = await sub.next_msg()

except:

break

print(’———————-‘) print(‘Subject:’, msg.subject) print(‘Reply :’, msg.reply) print(‘Data :’, msg.data) print(‘Headers:’, msg.header)

if __name__ == ‘__main__’:

asyncio.run(main())

async nats.aio.client.Client.subscribe(self, subject, queue='', cb=None, future=None, max_msgs=0, pending_msgs_limit=524288, pending_bytes_limit=134217728) Subscription

subscribe registers interest in a given subject.

If a callback is provided, messages will be processed asychronously.

If a callback isn’t provided, messages can be retrieved via an asynchronous iterator on the returned subscription object.

Return type:

Subscription

Parameters:
  • subject (str)

  • queue (str)

  • cb (Callable[[Msg], Awaitable[None]] | None)

  • future (Future | None)

  • max_msgs (int)

  • pending_msgs_limit (int)

  • pending_bytes_limit (int)

async nats.aio.client.Client.flush(self, timeout=10) None

Sends a ping to the server expecting a pong back ensuring what we have written so far has made it to the server and also enabling measuring of roundtrip time. In case a pong is not returned within the allowed timeout, then it will raise nats.errors.TimeoutError

Return type:

None

Parameters:

timeout (int)

async nats.aio.client.Client.close(self) None

Closes the socket to which we are connected and sets the client to be in the CLOSED state. No further reconnections occur once reaching this point.

Return type:

None

async nats.aio.client.Client.drain(self) None

drain will put a connection into a drain state. All subscriptions will immediately be put into a drain state. Upon completion, the publishers will be drained and can not publish any additional messages. Upon draining of the publishers, the connection will be closed. Use the closed_cb option to know when the connection has moved from draining to closed.

Return type:

None

nats.aio.client.Client.new_inbox(self) str

new_inbox returns a unique inbox that can be used for NATS requests or subscriptions:

# Create unique subscription to receive direct messages.
inbox = nc.new_inbox()
sub = await nc.subscribe(inbox)
nc.publish('broadcast', b'', reply=inbox)
msg = sub.next_msg()
Return type:

str

Subscription

class nats.aio.subscription.Subscription(conn, id=0, subject='', queue='', cb=None, future=None, max_msgs=0, pending_msgs_limit=524288, pending_bytes_limit=134217728) None[source]

A Subscription represents interest in a particular subject.

A Subscription should not be constructed directly, rather connection.subscribe() should be used to get a subscription.

nc = await nats.connect()

# Async Subscription
async def cb(msg):
  print('Received', msg)
await nc.subscribe('foo', cb=cb)

# Sync Subscription
sub = nc.subscribe('foo')
msg = await sub.next_msg()
print('Received', msg)
Parameters:
  • id (int)

  • subject (str)

  • queue (str)

  • cb (Optional[Callable[[Msg], Awaitable[None]]])

  • future (Optional[asyncio.Future])

  • max_msgs (int)

  • pending_msgs_limit (int)

  • pending_bytes_limit (int)

property subject: str

Returns the subject of the Subscription.

property queue: str

Returns the queue name of the Subscription if part of a queue group.

property messages: AsyncIterator[Msg]

Retrieves an async iterator for the messages from the subscription.

This is only available if a callback isn’t provided when creating a subscription.

nc = await nats.connect()
sub = await nc.subscribe('foo')
# Use `async for` which implicitly awaits messages
async for msg in sub.messages:
    print('Received', msg)
property pending_msgs: int

Number of delivered messages by the NATS Server that are being buffered in the pending queue.

property pending_bytes: int

Size of data sent by the NATS Server that is being buffered in the pending queue.

property delivered: int

Number of delivered messages to this subscription so far.

async next_msg(timeout=1.0) Msg[source]
Params timeout:

Time in seconds to wait for next message before timing out.

Raises:

nats.errors.TimeoutError

Parameters:

timeout (float | None)

Return type:

Msg

next_msg can be used to retrieve the next message from a stream of messages using await syntax, this only works when not passing a callback on subscribe:

sub = await nc.subscribe('hello')
msg = await sub.next_msg(timeout=1)
Return type:

Msg

Parameters:

timeout (float | None)

async drain()[source]

Removes interest in a subject, but will process remaining messages.

async unsubscribe(limit=0)[source]
Parameters:

limit (int, default: 0) – Max number of messages to receive before unsubscribing.

Removes interest in a subject, remaining messages will be discarded.

If limit is greater than zero, interest is not immediately removed, rather, interest will be automatically removed after limit messages are received.

Msg

class nats.aio.msg.Msg(_client, subject='', reply='', data=b'', headers=None, _metadata=None, _ackd=False, _sid=None) None[source]

Msg represents a message delivered by NATS.

Parameters:
  • _client (NATS)

  • subject (str)

  • reply (str)

  • data (bytes)

  • headers (Optional[Dict[str, str]])

  • _metadata (Optional[Metadata])

  • _ackd (bool)

  • _sid (Optional[int])

property header: Dict[str, str] | None

header returns the headers from a message.

property sid: int

sid returns the subscription ID from a message.

async respond(data) None[source]

respond replies to the inbox of the message if there is one.

Return type:

None

Parameters:

data (bytes)

async ack() None[source]

ack acknowledges a message delivered by JetStream.

Return type:

None

async ack_sync(timeout=1.0) Msg[source]

ack_sync waits for the acknowledgement to be processed by the server.

Return type:

Msg

Parameters:

timeout (float)

async nak(delay=None) None[source]

nak negatively acknowledges a message delivered by JetStream triggering a redelivery. if delay is provided, redelivery is delayed for delay seconds

Return type:

None

Parameters:

delay (int | float | None)

async in_progress() None[source]

in_progress acknowledges a message delivered by JetStream is still being worked on. Unlike other types of acks, an in-progress ack (+WPI) can be done multiple times.

Return type:

None

async term() None[source]

term terminates a message delivered by JetStream and disables redeliveries.

Return type:

None

property metadata: Metadata

metadata returns the Metadata of a JetStream message.

class Metadata(sequence, num_pending, num_delivered, timestamp, stream, consumer, domain=None) None[source]

Metadata is the metadata from a JetStream message.

  • num_pending is the number of available messages in the Stream that have not been consumed yet.

  • num_delivered is the number of times that this message has been delivered. For example, num_delivered higher than one means that there have been redeliveries.

  • timestamp is the time at which the message was delivered.

  • stream is the name of the stream.

  • consumer is the name of the consumer.

Parameters:
  • sequence (SequencePair)

  • num_pending (int)

  • num_delivered (int)

  • timestamp (datetime.datetime)

  • stream (str)

  • consumer (str)

  • domain (Optional[str])

class SequencePair(consumer, stream) None[source]

SequencePair represents a pair of consumer and stream sequence.

Parameters:
  • consumer (int)

  • stream (int)

Connection Properties

property Client.servers: List[ParseResult]
property Client.discovered_servers: List[ParseResult]
property Client.max_payload: int

Returns the max payload which we received from the servers INFO

property Client.connected_url: ParseResult | None
property Client.client_id: int | None

Returns the client id which we received from the servers INFO

property Client.pending_data_size: int

Connection State

property Client.is_connected: bool
property Client.is_closed: bool
property Client.is_connecting: bool
property Client.is_reconnecting: bool
property Client.is_draining: bool
property Client.is_draining_pubs: bool
property Client.last_error: Exception | None

Returns the last error which may have occurred.

JetStream

nats.aio.client.Client.jetstream(self, **opts) JetStreamContext

jetstream returns a context that can be used to produce and consume messages from NATS JetStream.

Parameters:
  • prefix – Default JetStream API Prefix.

  • domain – Optional domain used by the JetStream API.

  • timeout – Timeout for all JS API actions.

Return type:

JetStreamContext

:: :rtype: JetStreamContext

import asyncio import nats

async def main():

nc = await nats.connect() js = nc.jetstream()

await js.add_stream(name=’hello’, subjects=[‘hello’]) ack = await js.publish(‘hello’, b’Hello JS!’) print(f’Ack: stream={ack.stream}, sequence={ack.seq}’) # Ack: stream=hello, sequence=1 await nc.close()

if __name__ == ‘__main__’:

asyncio.run(main())

async nats.js.client.JetStreamContext.publish(self, subject, payload=b'', timeout=None, stream=None, headers=None) PubAck

publish emits a new message to JetStream and waits for acknowledgement.

Return type:

PubAck

Parameters:
  • subject (str)

  • payload (bytes)

  • timeout (float | None)

  • stream (str | None)

  • headers (Dict[str, Any] | None)

async nats.js.client.JetStreamContext.subscribe(self, subject, queue=None, cb=None, durable=None, stream=None, config=None, manual_ack=False, ordered_consumer=False, idle_heartbeat=None, flow_control=False, pending_msgs_limit=524288, pending_bytes_limit=268435456, deliver_policy=None, headers_only=None, inactive_threshold=None) PushSubscription

Create consumer if needed and push-subscribe to it.

  1. Check if consumer exists.

  2. Creates consumer if needed.

  3. Calls subscribe_bind.

Parameters:
  • subject (str) – Subject from a stream from JetStream.

  • queue (Optional[str], default: None) – Deliver group name from a set a of queue subscribers.

  • durable (Optional[str], default: None) – Name of the durable consumer to which the the subscription should be bound.

  • stream (Optional[str], default: None) – Name of the stream to which the subscription should be bound. If not set, then the client will automatically look it up based on the subject.

  • manual_ack (bool, default: False) – Disables auto acking for async subscriptions.

  • ordered_consumer (bool, default: False) – Enable ordered consumer mode.

  • idle_heartbeat (Optional[float], default: None) – Enable Heartbeats for a consumer to detect failures.

  • flow_control (bool, default: False) – Enable Flow Control for a consumer.

  • cb (Optional[Callback])

  • config (Optional[api.ConsumerConfig])

  • pending_msgs_limit (int)

  • pending_bytes_limit (int)

  • deliver_policy (Optional[api.DeliverPolicy])

  • headers_only (Optional[bool])

  • inactive_threshold (Optional[float])

Return type:

PushSubscription

:: :rtype: PushSubscription

import asyncio import nats

async def main():

nc = await nats.connect() js = nc.jetstream()

await js.add_stream(name=’hello’, subjects=[‘hello’]) await js.publish(‘hello’, b’Hello JS!’)

async def cb(msg):

print(‘Received:’, msg)

# Ephemeral Async Subscribe await js.subscribe(‘hello’, cb=cb)

# Durable Async Subscribe # NOTE: Only one subscription can be bound to a durable name. It also auto acks by default. await js.subscribe(‘hello’, cb=cb, durable=’foo’)

# Durable Sync Subscribe # NOTE: Sync subscribers do not auto ack. await js.subscribe(‘hello’, durable=’bar’)

# Queue Async Subscribe # NOTE: Here ‘workers’ becomes deliver_group, durable name and queue name. await js.subscribe(‘hello’, ‘workers’, cb=cb)

if __name__ == ‘__main__’:

asyncio.run(main())

async nats.js.client.JetStreamContext.pull_subscribe(self, subject, durable=None, stream=None, config=None, pending_msgs_limit=524288, pending_bytes_limit=268435456, inbox_prefix=b'_INBOX.') PullSubscription

Create consumer and pull subscription. :rtype: PullSubscription

  1. Find stream name by subject if stream is not passed.

  2. Create consumer with the given config if not created.

  3. Call pull_subscribe_bind.

import asyncio
import nats

async def main():
    nc = await nats.connect()
    js = nc.jetstream()

    await js.add_stream(name='mystream', subjects=['foo'])
    await js.publish('foo', b'Hello World!')

    sub = await js.pull_subscribe('foo', stream='mystream')

    msgs = await sub.fetch()
    msg = msgs[0]
    await msg.ack()

    await nc.close()

if __name__ == '__main__':
    asyncio.run(main())
Parameters:
  • subject (str)

  • durable (str | None)

  • stream (str | None)

  • config (ConsumerConfig | None)

  • pending_msgs_limit (int)

  • pending_bytes_limit (int)

  • inbox_prefix (bytes)

Return type:

PullSubscription

async nats.js.client.JetStreamContext.pull_subscribe_bind(self, consumer=None, stream=None, inbox_prefix=b'_INBOX.', pending_msgs_limit=524288, pending_bytes_limit=268435456, name=None, durable=None) PullSubscription

pull_subscribe returns a PullSubscription that can be delivered messages from a JetStream pull based consumer by calling sub.fetch.

import asyncio
import nats

async def main():
    nc = await nats.connect()
    js = nc.jetstream()

    await js.add_stream(name='mystream', subjects=['foo'])
    await js.publish('foo', b'Hello World!')

    msgs = await sub.fetch()
    msg = msgs[0]
    await msg.ack()

    await nc.close()

if __name__ == '__main__':
    asyncio.run(main())
Return type:

PullSubscription

Parameters:
  • consumer (str | None)

  • stream (str | None)

  • inbox_prefix (bytes)

  • pending_msgs_limit (int)

  • pending_bytes_limit (int)

  • name (str | None)

  • durable (str | None)

class nats.js.client.JetStreamContext.PullSubscription(js, sub, stream, consumer, deliver) None

PullSubscription is a subscription that can fetch messages.

Parameters:
  • js (JetStreamContext)

  • sub (Subscription)

  • stream (str)

  • consumer (str)

  • deliver (bytes)

async consumer_info() ConsumerInfo

consumer_info gets the current info of the consumer from this subscription.

Return type:

ConsumerInfo

property delivered: int

Number of delivered messages to this subscription so far.

async fetch(batch=1, timeout=5, heartbeat=None) List[Msg]

fetch makes a request to JetStream to be delivered a set of messages.

Parameters:
  • batch (int, default: 1) – Number of messages to fetch from server.

  • timeout (Optional[float], default: 5) – Max duration of the fetch request before it expires.

  • heartbeat (Optional[float], default: None) – Idle Heartbeat interval in seconds for the fetch request.

Return type:

List[Msg]

:: :rtype: List[Msg]

import asyncio import nats

async def main():

nc = await nats.connect() js = nc.jetstream()

await js.add_stream(name=’mystream’, subjects=[‘foo’]) await js.publish(‘foo’, b’Hello World!’)

msgs = await sub.fetch(5) for msg in msgs:

await msg.ack()

await nc.close()

if __name__ == ‘__main__’:

asyncio.run(main())

property pending_bytes: int

Size of data sent by the NATS Server that is being buffered in the pending queue.

property pending_msgs: int

Number of delivered messages by the NATS Server that are being buffered in the pending queue.

async unsubscribe() None

unsubscribe destroys the inboxes of the pull subscription making it unable to continue to receive messages.

Return type:

None

Manager

class nats.js.manager.JetStreamManager(conn, prefix='$JS.API', timeout=5) None[source]

JetStreamManager exposes management APIs for JetStream.

Parameters:
  • conn (NATS)

  • prefix (str)

  • timeout (float)

async find_stream_name_by_subject(subject) str[source]

Find the stream to which a subject belongs in an account.

Return type:

str

Parameters:

subject (str)

async stream_info(name, subjects_filter=None) StreamInfo[source]

Get the latest StreamInfo by stream name.

Return type:

StreamInfo

Parameters:
  • name (str)

  • subjects_filter (str | None)

async add_stream(config=None, **params) StreamInfo[source]

add_stream creates a stream.

Return type:

StreamInfo

Parameters:

config (StreamConfig | None)

async update_stream(config=None, **params) StreamInfo[source]

update_stream updates a stream.

Return type:

StreamInfo

Parameters:

config (StreamConfig | None)

async delete_stream(name) bool[source]

Delete a stream by name.

Return type:

bool

Parameters:

name (str)

async purge_stream(name, seq=None, subject=None, keep=None) bool[source]

Purge a stream by name.

Return type:

bool

Parameters:
  • name (str)

  • seq (int | None)

  • subject (str | None)

  • keep (int | None)

async streams_info(offset=0) List[StreamInfo][source]

streams_info retrieves a list of streams with an optional offset.

Return type:

List[StreamInfo]

async streams_info_iterator(offset=0) Iterable[StreamInfo][source]

streams_info retrieves a list of streams Iterator.

Return type:

Iterable[StreamInfo]

async consumers_info(stream, offset=None) List[ConsumerInfo][source]

consumers_info retrieves a list of consumers. Consumers list limit is 256 for more consider to use offset :type stream: str :param stream: stream to get consumers :type offset: Optional[int], default: None :param offset: consumers list offset

Return type:

List[ConsumerInfo]

Parameters:
  • stream (str)

  • offset (int | None)

async get_msg(stream_name, seq=None, subject=None, direct=False, next=False) RawStreamMsg[source]

get_msg retrieves a message from a stream.

Return type:

RawStreamMsg

Parameters:
  • stream_name (str)

  • seq (int | None)

  • subject (str | None)

  • direct (bool | None)

  • next (bool | None)

async delete_msg(stream_name, seq) bool[source]

delete_msg retrieves a message from a stream based on the sequence ID.

Return type:

bool

Parameters:
  • stream_name (str)

  • seq (int)

async get_last_msg(stream_name, subject, direct=False) RawStreamMsg[source]

get_last_msg retrieves the last message from a stream.

Return type:

RawStreamMsg

Parameters:
  • stream_name (str)

  • subject (str)

  • direct (bool | None)

KeyValue

class nats.js.kv.KeyValue(name, stream, pre, js, direct) None[source]

KeyValue uses the JetStream KeyValue functionality.

Note

This functionality is EXPERIMENTAL and may be changed in later releases.

import asyncio
import nats

async def main():
    nc = await nats.connect()
    js = nc.jetstream()

    # Create a KV
    kv = await js.create_key_value(bucket='MY_KV')

    # Set and retrieve a value
    await kv.put('hello', b'world')
    entry = await kv.get('hello')
    print(f'KeyValue.Entry: key={entry.key}, value={entry.value}')
    # KeyValue.Entry: key=hello, value=world

    await nc.close()

if __name__ == '__main__':
    asyncio.run(main())
Parameters:
  • name (str)

  • stream (str)

  • pre (str)

  • js (JetStreamContext)

  • direct (bool)

class Entry(bucket, key, value, revision, delta, created, operation) None[source]

An entry from a KeyValue store in JetStream.

Parameters:
  • bucket (str)

  • key (str)

  • value (bytes | None)

  • revision (int | None)

  • delta (int | None)

  • created (int | None)

  • operation (str | None)

class BucketStatus(stream_info, bucket) None[source]

BucketStatus is the status of a KeyValue bucket.

Parameters:
property values: int

values returns the number of stored messages in the stream.

property history: int

history returns the max msgs per subject.

property ttl: float | None

ttl returns the max age in seconds.

async get(key, revision=None) Entry[source]

get returns the latest value for the key.

Return type:

Entry

Parameters:
  • key (str)

  • revision (Optional[int])

async put(key, value) int[source]

put will place the new value for the key into the store and return the revision number.

Return type:

int

Parameters:
  • key (str)

  • value (bytes)

async create(key, value) int[source]

create will add the key/value pair iff it does not exist.

Return type:

int

Parameters:
  • key (str)

  • value (bytes)

async update(key, value, last=None) int[source]

update will update the value iff the latest revision matches.

Return type:

int

Parameters:
  • key (str)

  • value (bytes)

  • last (int | None)

async delete(key, last=None) bool[source]

delete will place a delete marker and remove all previous revisions.

Return type:

bool

Parameters:
  • key (str)

  • last (int | None)

async purge(key) bool[source]

purge will remove the key and all revisions.

Return type:

bool

Parameters:

key (str)

async purge_deletes(olderthan=1800) bool[source]

purge will remove all current delete markers older. :type olderthan: int, default: 1800 :param olderthan: time in seconds

Return type:

bool

Parameters:

olderthan (int)

async status() BucketStatus[source]

status retrieves the status and configuration of a bucket.

Return type:

BucketStatus

async watchall(**kwargs) KeyWatcher[source]

watchall returns a KeyValue watcher that matches all the keys.

Return type:

KeyWatcher

async keys(filters=None, **kwargs) List[str][source]

Returns a list of the keys from a KeyValue store. Optionally filters the keys based on the provided filter list.

Return type:

List[str]

Parameters:

filters (List[str])

async history(key) List[Entry][source]

history retrieves a list of the entries so far.

Return type:

List[Entry]

Parameters:

key (str)

async watch(keys, headers_only=False, include_history=False, ignore_deletes=False, meta_only=False, inactive_threshold=None) KeyWatcher[source]

watch will fire a callback when a key that matches the keys pattern is updated. The first update after starting the watch is None in case there are no pending updates.

Return type:

KeyWatcher

API

class nats.js.api.Header(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]
class nats.js.api.StatusCode(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]
class nats.js.api.Base None[source]

Helper dataclass to filter unknown fields from the API.

classmethod from_response(resp) _B[source]

Read the class instance from a server response.

Unknown fields are ignored (“open-world assumption”).

Return type:

TypeVar(_B, bound= Base)

Parameters:

resp (Dict[str, Any])

evolve(**params) _B[source]

Return a copy of the instance with the passed values replaced.

Return type:

TypeVar(_B, bound= Base)

Parameters:

self (_B)

as_dict() Dict[str, object][source]

Return the object converted into an API-friendly dict.

Return type:

Dict[str, object]

class nats.js.api.PubAck(stream, seq, domain=None, duplicate=None) None[source]

PubAck is the response of publishing a message to JetStream.

Parameters:
  • stream (str)

  • seq (int)

  • domain (str | None)

  • duplicate (bool | None)

class nats.js.api.Placement(cluster=None, tags=None) None[source]

Placement directives to consider when placing replicas of this stream

Parameters:
  • cluster (str | None)

  • tags (List[str] | None)

class nats.js.api.ExternalStream(api, deliver=None) None[source]
Parameters:
  • api (str)

  • deliver (str | None)

as_dict() Dict[str, object][source]

Return the object converted into an API-friendly dict.

Return type:

Dict[str, object]

class nats.js.api.StreamSource(name, opt_start_seq=None, filter_subject=None, external=None, subject_transforms=None) None[source]
Parameters:
  • name (str)

  • opt_start_seq (int | None)

  • filter_subject (str | None)

  • external (ExternalStream | None)

  • subject_transforms (List[SubjectTransform] | None)

classmethod from_response(resp)[source]

Read the class instance from a server response.

Unknown fields are ignored (“open-world assumption”).

Parameters:

resp (Dict[str, Any])

as_dict() Dict[str, object][source]

Return the object converted into an API-friendly dict.

Return type:

Dict[str, object]

class nats.js.api.StreamSourceInfo(name, lag=None, active=None, error=None) None[source]
Parameters:
  • name (str)

  • lag (int | None)

  • active (int | None)

  • error (Dict[str, Any] | None)

class nats.js.api.LostStreamData(msgs=None, bytes=None) None[source]
Parameters:
  • msgs (List[int] | None)

  • bytes (int | None)

class nats.js.api.StreamState(messages, bytes, first_seq, last_seq, consumer_count, deleted=None, num_deleted=None, lost=None, subjects=None) None[source]
Parameters:
  • messages (int)

  • bytes (int)

  • first_seq (int)

  • last_seq (int)

  • consumer_count (int)

  • deleted (List[int] | None)

  • num_deleted (int | None)

  • lost (LostStreamData | None)

  • subjects (Dict[str, int] | None)

classmethod from_response(resp)[source]

Read the class instance from a server response.

Unknown fields are ignored (“open-world assumption”).

Parameters:

resp (Dict[str, Any])

class nats.js.api.RetentionPolicy(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]

How message retention is considered

class nats.js.api.StorageType(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]

The type of storage backend

class nats.js.api.DiscardPolicy(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Discard policy when a stream reaches its limits

class nats.js.api.StoreCompression(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]

If stream is file-based and a compression algorithm is specified, the stream data will be compressed on disk.

Valid options are none or s2 for Snappy compression. Introduced in nats-server 2.10.0.

class nats.js.api.RePublish(src=None, dest=None, headers_only=None) None[source]

RePublish is for republishing messages once committed to a stream. The original subject cis remapped from the subject pattern to the destination pattern.

Parameters:
  • src (str | None)

  • dest (str | None)

  • headers_only (bool | None)

class nats.js.api.SubjectTransform(src, dest) None[source]

Subject transform to apply to matching messages.

Parameters:
  • src (str)

  • dest (str)

as_dict() Dict[str, object][source]

Return the object converted into an API-friendly dict.

Return type:

Dict[str, object]

class nats.js.api.StreamConfig(name=None, description=None, subjects=None, retention=None, max_consumers=None, max_msgs=None, max_bytes=None, discard=DiscardPolicy.OLD, discard_new_per_subject=False, max_age=None, max_msgs_per_subject=-1, max_msg_size=-1, storage=None, num_replicas=None, no_ack=False, template_owner=None, duplicate_window=0, placement=None, mirror=None, sources=None, sealed=False, deny_delete=False, deny_purge=False, allow_rollup_hdrs=False, republish=None, subject_transform=None, allow_direct=None, mirror_direct=None, compression=None, metadata=None) None[source]

StreamConfig represents the configuration of a stream.

Parameters:
  • name (str | None)

  • description (str | None)

  • subjects (List[str] | None)

  • retention (RetentionPolicy | None)

  • max_consumers (int | None)

  • max_msgs (int | None)

  • max_bytes (int | None)

  • discard (DiscardPolicy | None)

  • discard_new_per_subject (bool)

  • max_age (float | None)

  • max_msgs_per_subject (int)

  • max_msg_size (int | None)

  • storage (StorageType | None)

  • num_replicas (int | None)

  • no_ack (bool)

  • template_owner (str | None)

  • duplicate_window (float)

  • placement (Placement | None)

  • mirror (StreamSource | None)

  • sources (List[StreamSource] | None)

  • sealed (bool)

  • deny_delete (bool)

  • deny_purge (bool)

  • allow_rollup_hdrs (bool)

  • republish (RePublish | None)

  • subject_transform (SubjectTransform | None)

  • allow_direct (bool | None)

  • mirror_direct (bool | None)

  • compression (StoreCompression | None)

  • metadata (Dict[str, str] | None)

classmethod from_response(resp)[source]

Read the class instance from a server response.

Unknown fields are ignored (“open-world assumption”).

Parameters:

resp (Dict[str, Any])

as_dict() Dict[str, object][source]

Return the object converted into an API-friendly dict.

Return type:

Dict[str, object]

class nats.js.api.PeerInfo(name=None, current=None, offline=None, active=None, lag=None) None[source]
Parameters:
  • name (str | None)

  • current (bool | None)

  • offline (bool | None)

  • active (int | None)

  • lag (int | None)

class nats.js.api.ClusterInfo(leader=None, name=None, replicas=None) None[source]
Parameters:
  • leader (str | None)

  • name (str | None)

  • replicas (List[PeerInfo] | None)

classmethod from_response(resp)[source]

Read the class instance from a server response.

Unknown fields are ignored (“open-world assumption”).

Parameters:

resp (Dict[str, Any])

class nats.js.api.StreamInfo(config, state, mirror=None, sources=None, cluster=None, did_create=None) None[source]

StreamInfo is the latest information about a stream from JetStream.

Parameters:
classmethod from_response(resp)[source]

Read the class instance from a server response.

Unknown fields are ignored (“open-world assumption”).

Parameters:

resp (Dict[str, Any])

class nats.js.api.StreamsListIterator(offset, total, streams) None[source]

StreamsListIterator is an iterator for streams list responses from JetStream.

Parameters:
  • offset (int)

  • total (int)

  • streams (List[Dict[str, any]])

class nats.js.api.AckPolicy(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Policies defining how messages should be acknowledged.

If an ack is required but is not received within the AckWait window, the message will be redelivered.

References

class nats.js.api.DeliverPolicy(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]

When a consumer is first created, it can specify where in the stream it wants to start receiving messages.

This is the DeliverPolicy, and this enumeration defines allowed values.

References

class nats.js.api.ReplayPolicy(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]

The replay policy applies when the DeliverPolicy is one of the following:

  • all: Delivers all messages from the beginning of the stream.

  • by_start_sequence: Delivers messages starting from a specific sequence.

  • by_start_time: Delivers messages starting from a specific timestamp.

These deliver policies begin reading the stream at a position other than the end.

References

class nats.js.api.ConsumerConfig(name=None, durable_name=None, description=None, deliver_policy=DeliverPolicy.ALL, opt_start_seq=None, opt_start_time=None, ack_policy=AckPolicy.EXPLICIT, ack_wait=None, max_deliver=None, backoff=None, filter_subject=None, filter_subjects=None, replay_policy=ReplayPolicy.INSTANT, rate_limit_bps=None, sample_freq=None, max_waiting=None, max_ack_pending=None, flow_control=None, idle_heartbeat=None, headers_only=None, deliver_subject=None, deliver_group=None, inactive_threshold=None, num_replicas=None, mem_storage=None, metadata=None) None[source]

Consumer configuration.

References

Parameters:
  • name (str | None)

  • durable_name (str | None)

  • description (str | None)

  • deliver_policy (DeliverPolicy | None)

  • opt_start_seq (int | None)

  • opt_start_time (int | None)

  • ack_policy (AckPolicy | None)

  • ack_wait (float | None)

  • max_deliver (int | None)

  • backoff (List[float] | None)

  • filter_subject (str | None)

  • filter_subjects (List[str] | None)

  • replay_policy (ReplayPolicy | None)

  • rate_limit_bps (int | None)

  • sample_freq (str | None)

  • max_waiting (int | None)

  • max_ack_pending (int | None)

  • flow_control (bool | None)

  • idle_heartbeat (float | None)

  • headers_only (bool | None)

  • deliver_subject (str | None)

  • deliver_group (str | None)

  • inactive_threshold (float | None)

  • num_replicas (int | None)

  • mem_storage (bool | None)

  • metadata (Dict[str, str] | None)

classmethod from_response(resp)[source]

Read the class instance from a server response.

Unknown fields are ignored (“open-world assumption”).

Parameters:

resp (Dict[str, Any])

as_dict() Dict[str, object][source]

Return the object converted into an API-friendly dict.

Return type:

Dict[str, object]

class nats.js.api.SequenceInfo(consumer_seq, stream_seq) None[source]
Parameters:
  • consumer_seq (int)

  • stream_seq (int)

class nats.js.api.ConsumerInfo(name, stream_name, config, delivered=None, ack_floor=None, num_ack_pending=None, num_redelivered=None, num_waiting=None, num_pending=None, cluster=None, push_bound=None) None[source]

ConsumerInfo represents the info about the consumer.

Parameters:
  • name (str)

  • stream_name (str)

  • config (ConsumerConfig)

  • delivered (SequenceInfo | None)

  • ack_floor (SequenceInfo | None)

  • num_ack_pending (int | None)

  • num_redelivered (int | None)

  • num_waiting (int | None)

  • num_pending (int | None)

  • cluster (ClusterInfo | None)

  • push_bound (bool | None)

classmethod from_response(resp)[source]

Read the class instance from a server response.

Unknown fields are ignored (“open-world assumption”).

Parameters:

resp (Dict[str, Any])

class nats.js.api.AccountLimits(max_memory, max_storage, max_streams, max_consumers, max_ack_pending, memory_max_stream_bytes, storage_max_stream_bytes, max_bytes_required) None[source]

Account limits

References

Parameters:
  • max_memory (int)

  • max_storage (int)

  • max_streams (int)

  • max_consumers (int)

  • max_ack_pending (int)

  • memory_max_stream_bytes (int)

  • storage_max_stream_bytes (int)

  • max_bytes_required (bool)

class nats.js.api.Tier(memory, storage, streams, consumers, limits) None[source]
Parameters:
  • memory (int)

  • storage (int)

  • streams (int)

  • consumers (int)

  • limits (AccountLimits)

classmethod from_response(resp)[source]

Read the class instance from a server response.

Unknown fields are ignored (“open-world assumption”).

Parameters:

resp (Dict[str, Any])

class nats.js.api.APIStats(total, errors) None[source]

API stats

Parameters:
  • total (int)

  • errors (int)

class nats.js.api.AccountInfo(memory, storage, streams, consumers, limits, api, domain=None, tiers=None) None[source]

Account information

References

Parameters:
  • memory (int)

  • storage (int)

  • streams (int)

  • consumers (int)

  • limits (AccountLimits)

  • api (APIStats)

  • domain (str | None)

  • tiers (Dict[str, Tier] | None)

classmethod from_response(resp)[source]

Read the class instance from a server response.

Unknown fields are ignored (“open-world assumption”).

Parameters:

resp (Dict[str, Any])

class nats.js.api.RawStreamMsg(subject=None, seq=None, data=None, hdrs=None, headers=None, stream=None) None[source]
Parameters:
  • subject (str | None)

  • seq (int | None)

  • data (bytes | None)

  • hdrs (bytes | None)

  • headers (Dict | None)

  • stream (str | None)

property header: Dict | None

header returns the headers from a message.

class nats.js.api.KeyValueConfig(bucket, description=None, max_value_size=None, history=1, ttl=None, max_bytes=None, storage=None, replicas=1, placement=None, republish=None, direct=None) None[source]

KeyValueConfig is the configuration of a KeyValue store.

Parameters:
  • bucket (str)

  • description (str | None)

  • max_value_size (int | None)

  • history (int)

  • ttl (float | None)

  • max_bytes (int | None)

  • storage (StorageType | None)

  • replicas (int)

  • placement (Placement | None)

  • republish (RePublish | None)

  • direct (bool | None)

as_dict() Dict[str, object][source]

Return the object converted into an API-friendly dict.

Return type:

Dict[str, object]

class nats.js.api.StreamPurgeRequest(seq=None, filter=None, keep=None) None[source]

StreamPurgeRequest is optional request information to the purge API.

Parameters:
  • seq (int | None)

  • filter (str | None)

  • keep (int | None)

class nats.js.api.ObjectStoreConfig(bucket=None, description=None, ttl=None, max_bytes=None, storage=None, replicas=1, placement=None) None[source]

ObjectStoreConfig is the configurigation of an ObjectStore.

Parameters:
  • bucket (str | None)

  • description (str | None)

  • ttl (float | None)

  • max_bytes (int | None)

  • storage (StorageType | None)

  • replicas (int)

  • placement (Placement | None)

as_dict() Dict[str, object][source]

Return the object converted into an API-friendly dict.

Return type:

Dict[str, object]

ObjectLink is used to embed links to other buckets and objects.

Parameters:
  • bucket (str)

  • name (str | None)

classmethod from_response(resp)[source]

Read the class instance from a server response.

Unknown fields are ignored (“open-world assumption”).

Parameters:

resp (Dict[str, Any])

class nats.js.api.ObjectMetaOptions(link=None, max_chunk_size=None) None[source]
Parameters:
  • link (ObjectLink | None)

  • max_chunk_size (int | None)

classmethod from_response(resp)[source]

Read the class instance from a server response.

Unknown fields are ignored (“open-world assumption”).

Parameters:

resp (Dict[str, Any])

class nats.js.api.ObjectMeta(name=None, description=None, headers=None, options=None) None[source]

ObjectMeta is high level information about an object.

Parameters:
  • name (str | None)

  • description (str | None)

  • headers (dict | None)

  • options (ObjectMetaOptions | None)

classmethod from_response(resp)[source]

Read the class instance from a server response.

Unknown fields are ignored (“open-world assumption”).

Parameters:

resp (Dict[str, Any])

class nats.js.api.ObjectInfo(name, bucket, nuid, size=None, mtime=None, chunks=None, digest=None, deleted=False, description=None, headers=None, options=None) None[source]

ObjectInfo is meta plus instance information.

Parameters:
  • name (str)

  • bucket (str)

  • nuid (str)

  • size (int | None)

  • mtime (str | None)

  • chunks (int | None)

  • digest (str | None)

  • deleted (bool | None)

  • description (str | None)

  • headers (dict | None)

  • options (ObjectMetaOptions | None)

classmethod from_response(resp)[source]

Read the class instance from a server response.

Unknown fields are ignored (“open-world assumption”).

Parameters:

resp (Dict[str, Any])

NUID

class nats.nuid.NUID None[source]

NUID is an implementation of the approach for fast generation of unique identifiers used for inboxes in NATS.

next() bytearray[source]

next returns the next unique identifier.

Return type:

bytearray

Errors

exception nats.errors.Error[source]
exception nats.errors.TimeoutError[source]
exception nats.errors.NoRespondersError[source]
exception nats.errors.StaleConnectionError[source]
exception nats.errors.OutboundBufferLimitError[source]
exception nats.errors.UnexpectedEOF[source]
exception nats.errors.FlushTimeoutError[source]
exception nats.errors.ConnectionClosedError[source]
exception nats.errors.SecureConnRequiredError[source]
exception nats.errors.SecureConnWantedError[source]
exception nats.errors.SecureConnFailedError[source]
exception nats.errors.BadSubscriptionError[source]
exception nats.errors.BadSubjectError[source]
exception nats.errors.SlowConsumerError(subject, reply, sid, sub) None[source]
Parameters:
Return type:

None

exception nats.errors.BadTimeoutError[source]
exception nats.errors.AuthorizationError[source]
exception nats.errors.NoServersError[source]
exception nats.errors.JsonParseError[source]
exception nats.errors.MaxPayloadError[source]
exception nats.errors.DrainTimeoutError[source]
exception nats.errors.ConnectionDrainingError[source]
exception nats.errors.ConnectionReconnectingError[source]
exception nats.errors.InvalidUserCredentialsError[source]
exception nats.errors.InvalidCallbackTypeError[source]
exception nats.errors.ProtocolError[source]
exception nats.errors.NotJSMessageError[source]

When it is attempted to use an API meant for JetStream on a message that does not belong to a stream.

exception nats.errors.MsgAlreadyAckdError(msg=None) None[source]
Return type:

None

JetStream Errors

exception nats.js.errors.Error(description=None) None[source]

An Error raised by the NATS client when using JetStream.

Parameters:

description (Optional[str])

Return type:

None

exception nats.js.errors.APIError(code=None, description=None, err_code=None, stream=None, seq=None) None[source]

An Error that is the result of interacting with NATS JetStream.

Parameters:
  • code (int | None)

  • description (str | None)

  • err_code (int | None)

  • stream (str | None)

  • seq (int | None)

Return type:

None

exception nats.js.errors.ServiceUnavailableError(code=None, description=None, err_code=None, stream=None, seq=None) None[source]

A 503 error

Parameters:
  • code (int | None)

  • description (str | None)

  • err_code (int | None)

  • stream (str | None)

  • seq (int | None)

Return type:

None

exception nats.js.errors.ServerError(code=None, description=None, err_code=None, stream=None, seq=None) None[source]

A 500 error

Parameters:
  • code (int | None)

  • description (str | None)

  • err_code (int | None)

  • stream (str | None)

  • seq (int | None)

Return type:

None

exception nats.js.errors.NotFoundError(code=None, description=None, err_code=None, stream=None, seq=None) None[source]

A 404 error

Parameters:
  • code (int | None)

  • description (str | None)

  • err_code (int | None)

  • stream (str | None)

  • seq (int | None)

Return type:

None

exception nats.js.errors.BadRequestError(code=None, description=None, err_code=None, stream=None, seq=None) None[source]

A 400 error.

Parameters:
  • code (int | None)

  • description (str | None)

  • err_code (int | None)

  • stream (str | None)

  • seq (int | None)

Return type:

None

exception nats.js.errors.NoStreamResponseError(description=None) None[source]

Raised if the client gets a 503 when publishing a message.

Parameters:

description (Optional[str])

Return type:

None

exception nats.js.errors.TooManyStalledMsgsError(description=None) None[source]

Raised when too many outstanding async published messages are waiting for ack.

Parameters:

description (Optional[str])

Return type:

None

exception nats.js.errors.FetchTimeoutError[source]

Raised if the consumer timed out waiting for messages.

exception nats.js.errors.ConsumerSequenceMismatchError(stream_resume_sequence=None, consumer_sequence=None, last_consumer_sequence=None) None[source]

Async error raised by the client with idle_heartbeat mode enabled when one of the message sequences is not the expected one.

Return type:

None

exception nats.js.errors.BucketNotFoundError(code=None, description=None, err_code=None, stream=None, seq=None) None[source]

When attempted to bind to a JetStream KeyValue that does not exist.

Parameters:
  • code (int | None)

  • description (str | None)

  • err_code (int | None)

  • stream (str | None)

  • seq (int | None)

Return type:

None

exception nats.js.errors.BadBucketError(code=None, description=None, err_code=None, stream=None, seq=None) None[source]
Parameters:
  • code (int | None)

  • description (str | None)

  • err_code (int | None)

  • stream (str | None)

  • seq (int | None)

Return type:

None

exception nats.js.errors.KeyValueError(code=None, description=None, err_code=None, stream=None, seq=None) None[source]

Raised when there is an issue interacting with the KeyValue store.

Parameters:
  • code (int | None)

  • description (str | None)

  • err_code (int | None)

  • stream (str | None)

  • seq (int | None)

Return type:

None

exception nats.js.errors.KeyDeletedError(entry=None, op=None) None[source]

Raised when trying to get a key that was deleted from a JetStream KeyValue store.

Return type:

None

exception nats.js.errors.KeyNotFoundError(entry=None, op=None, message=None) None[source]

Raised when trying to get a key that does not exists from a JetStream KeyValue store.

Return type:

None

exception nats.js.errors.KeyWrongLastSequenceError(description=None) None[source]

Raised when trying to update a key with the wrong last sequence.

Parameters:

description (str | None)

Return type:

None

exception nats.js.errors.NoKeysError(code=None, description=None, err_code=None, stream=None, seq=None) None[source]
Parameters:
  • code (int | None)

  • description (str | None)

  • err_code (int | None)

  • stream (str | None)

  • seq (int | None)

Return type:

None

exception nats.js.errors.KeyHistoryTooLargeError(code=None, description=None, err_code=None, stream=None, seq=None) None[source]
Parameters:
  • code (int | None)

  • description (str | None)

  • err_code (int | None)

  • stream (str | None)

  • seq (int | None)

Return type:

None

exception nats.js.errors.InvalidBucketNameError(description=None) None[source]

Raised when trying to create a KV or OBJ bucket with invalid name.

Parameters:

description (Optional[str])

Return type:

None

exception nats.js.errors.InvalidObjectNameError(description=None) None[source]

Raised when trying to put an object in Object Store with invalid key.

Parameters:

description (Optional[str])

Return type:

None

exception nats.js.errors.BadObjectMetaError(description=None) None[source]

Raised when trying to read corrupted metadata from Object Store.

Parameters:

description (Optional[str])

Return type:

None

exception nats.js.errors.LinkIsABucketError(description=None) None[source]

Raised when trying to get object from Object Store that is a bucket.

Parameters:

description (Optional[str])

Return type:

None

exception nats.js.errors.DigestMismatchError(description=None) None[source]

Raised when getting an object from Object Store that has a different digest than expected.

Parameters:

description (Optional[str])

Return type:

None

exception nats.js.errors.ObjectNotFoundError(code=None, description=None, err_code=None, stream=None, seq=None) None[source]

When attempted to lookup an Object that does not exist.

Parameters:
  • code (int | None)

  • description (str | None)

  • err_code (int | None)

  • stream (str | None)

  • seq (int | None)

Return type:

None

exception nats.js.errors.ObjectDeletedError(code=None, description=None, err_code=None, stream=None, seq=None) None[source]

When attempted to do an operation to an Object that does not exist.

Parameters:
  • code (int | None)

  • description (str | None)

  • err_code (int | None)

  • stream (str | None)

  • seq (int | None)

Return type:

None

exception nats.js.errors.ObjectAlreadyExists(description=None) None[source]

When attempted to do an operation to an Object that already exist.

Parameters:

description (Optional[str])

Return type:

None

Deprecated Errors

Catching the following errors will be removed eventually, please use the recommended alternative error instead.

exception nats.aio.errors.NatsError[source]

Deprecated since version v2.0.0.

Please use nats.errors.Error instead.

exception nats.aio.errors.ErrConnectionClosed[source]

Deprecated since version v2.0.0.

Please use nats.errors.ConnectionClosedError instead.

exception nats.aio.errors.ErrDrainTimeout[source]

Deprecated since version v2.0.0.

Please use nats.errors.DrainTimeoutError instead.

exception nats.aio.errors.ErrInvalidUserCredentials[source]

Deprecated since version v2.0.0.

Please use nats.errors.InvalidUserCredentialsError instead.

exception nats.aio.errors.ErrInvalidCallbackType[source]

Deprecated since version v2.0.0.

Please use nats.errors.InvalidCallbackTypeError instead.

exception nats.aio.errors.ErrConnectionReconnecting[source]

Deprecated since version v2.0.0.

Please use nats.errors.ConnectionReconnectingError instead.

exception nats.aio.errors.ErrConnectionDraining[source]

Deprecated since version v2.0.0.

Please use nats.errors.ConnectionDrainingError instead.

exception nats.aio.errors.ErrMaxPayload[source]

Deprecated since version v2.0.0.

Please use nats.errors.MaxPayloadError instead.

exception nats.aio.errors.ErrStaleConnection[source]

Deprecated since version v2.0.0.

Please use nats.errors.StaleConnectionError instead.

exception nats.aio.errors.ErrJsonParse[source]

Deprecated since version v2.0.0.

Please use nats.errors.JsonParseError instead.

exception nats.aio.errors.ErrSecureConnRequired[source]

Deprecated since version v2.0.0.

Please use nats.errors.SecureConnRequiredError instead.

exception nats.aio.errors.ErrSecureConnWanted[source]

Deprecated since version v2.0.0.

Please use nats.errors.SecureConnWantedError instead.

exception nats.aio.errors.ErrSecureConnFailed[source]

Deprecated since version v2.0.0.

Please use nats.errors.SecureConnFailedError instead.

exception nats.aio.errors.ErrBadSubscription[source]

Deprecated since version v2.0.0.

Please use nats.errors.BadSubscriptionError instead.

exception nats.aio.errors.ErrBadSubject[source]

Deprecated since version v2.0.0.

Please use nats.errors.BadSubjectError instead.

exception nats.aio.errors.ErrSlowConsumer(subject, reply, sid, sub) None[source]

Deprecated since version v2.0.0.

Please use nats.errors.SlowConsumerError instead.

Parameters:
Return type:

None

exception nats.aio.errors.ErrTimeout[source]

Deprecated since version v2.0.0.

Please use nats.errors.TimeoutError instead.

exception nats.aio.errors.ErrBadTimeout[source]

Deprecated since version v2.0.0.

Please use nats.errors.BadTimeoutError instead.

exception nats.aio.errors.ErrAuthorization[source]

Deprecated since version v2.0.0.

Please use nats.errors.AuthorizationError instead.

exception nats.aio.errors.ErrNoServers[source]

Deprecated since version v2.0.0.

Please use nats.errors.NoServersError instead.