Modules¶
- async nats.connect(servers=['nats://localhost:4222'], **options) Client [source]¶
- Parameters:
servers (
Union
[str
,List
[str
]], default:['nats://localhost:4222']
) – List of servers to connect.options – NATS connect options.
- Return type:
Client
:: :rtype:
Client
import asyncio import nats
- async def main():
# Connect to NATS Server. nc = await nats.connect(‘demo.nats.io’) await nc.publish(‘foo’, b’Hello World!’) await nc.flush() await nc.close()
- if __name__ == ‘__main__’:
asyncio.run(main())
Asyncio Client¶
Asyncio based client for NATS.
- async nats.aio.client.Client.connect(self, servers=['nats://localhost:4222'], error_cb=None, disconnected_cb=None, closed_cb=None, discovered_server_cb=None, reconnected_cb=None, name=None, pedantic=False, verbose=False, allow_reconnect=True, connect_timeout=2, reconnect_time_wait=2, max_reconnect_attempts=60, ping_interval=120, max_outstanding_pings=2, dont_randomize=False, flusher_queue_size=1024, no_echo=False, tls=None, tls_hostname=None, tls_handshake_first=False, user=None, password=None, token=None, drain_timeout=30, signature_cb=None, user_jwt_cb=None, user_credentials=None, nkeys_seed=None, nkeys_seed_str=None, inbox_prefix=b'_INBOX', pending_size=2097152, flush_timeout=None) None ¶
Establishes a connection to NATS.
- Parameters:
servers (
Union
[str
,List
[str
]], default:['nats://localhost:4222']
) – NATS Connectionname (
Optional
[str
], default:None
) – Label the connection with name (shown in NATS monitoring)error_cb (
Optional
[Callable
[[Exception
],Awaitable
[None
]]], default:None
) – Callback to report errors.disconnected_cb (
Optional
[Callable
[[],Awaitable
[None
]]], default:None
) – Callback to report disconnection from NATS.closed_cb (
Optional
[Callable
[[],Awaitable
[None
]]], default:None
) – Callback to report when client stops reconnection to NATS.discovered_server_cb (
Optional
[Callable
[[],Awaitable
[None
]]], default:None
) – Callback to report when a new server joins the cluster.pending_size (
int
, default:2097152
) – Max size of the pending buffer for publishing commands.flush_timeout (
Optional
[float
], default:None
) – Max duration to wait for a forced flush to occur.reconnected_cb (Callable[[], Awaitable[None]] | None)
pedantic (bool)
verbose (bool)
allow_reconnect (bool)
connect_timeout (int)
reconnect_time_wait (int)
max_reconnect_attempts (int)
ping_interval (int)
max_outstanding_pings (int)
dont_randomize (bool)
flusher_queue_size (int)
no_echo (bool)
tls (SSLContext | None)
tls_hostname (str | None)
tls_handshake_first (bool)
user (str | None)
password (str | None)
token (str | None)
drain_timeout (int)
signature_cb (Callable[[str], bytes] | None)
user_jwt_cb (Callable[[], bytearray | bytes] | None)
user_credentials (str | Tuple[str, str] | RawCredentials | Path | None)
nkeys_seed (str | None)
nkeys_seed_str (str | None)
inbox_prefix (str | bytes)
- Return type:
None
Connecting setting all callbacks:
import asyncio import nats async def main(): async def disconnected_cb(): print('Got disconnected!') async def reconnected_cb(): print(f'Got reconnected to {nc.connected_url.netloc}') async def error_cb(e): print(f'There was an error: {e}') async def closed_cb(): print('Connection is closed') # Connect to NATS with logging callbacks. nc = await nats.connect('demo.nats.io', error_cb=error_cb, reconnected_cb=reconnected_cb, disconnected_cb=disconnected_cb, closed_cb=closed_cb, ) async def handler(msg): print(f'Received a message on {msg.subject} {msg.reply}: {msg.data}') await msg.respond(b'OK') sub = await nc.subscribe('help.please', cb=handler) resp = await nc.request('help.please', b'help') print('Response:', resp) await nc.close() if __name__ == '__main__': asyncio.run(main())
Using a context manager:
import asyncio import nats async def main(): is_done = asyncio.Future() async def closed_cb(): print('Connection to NATS is closed.') is_done.set_result(True) async with (await nats.connect('nats://demo.nats.io:4222', closed_cb=closed_cb)) as nc: print(f'Connected to NATS at {nc.connected_url.netloc}...') async def subscribe_handler(msg): subject = msg.subject reply = msg.reply data = msg.data.decode() print('Received a message on '{subject} {reply}': {data}'.format( subject=subject, reply=reply, data=data)) await nc.subscribe('discover', cb=subscribe_handler) await nc.flush() for i in range(0, 10): await nc.publish('discover', b'hello world') await asyncio.sleep(0.1) await asyncio.wait_for(is_done, 60.0) if __name__ == '__main__': asyncio.run(main())
- async nats.aio.client.Client.publish(self, subject, payload=b'', reply='', headers=None) None ¶
Publishes a NATS message.
- Parameters:
subject (
str
) – Subject to which the message will be published.payload (
bytes
, default:b''
) – Message data.reply (
str
, default:''
) – Inbox to which a responder can respond.headers (
Optional
[Dict
[str
,str
]], default:None
) – Optional message header.
- Return type:
None
:: :rtype:
None
import asyncio import nats
- async def main():
nc = await nats.connect(‘demo.nats.io’)
# Publish as message with an inbox. inbox = nc.new_inbox() sub = await nc.subscribe(‘hello’)
# Simple publishing await nc.publish(‘hello’, b’Hello World!’)
# Publish with a reply await nc.publish(‘hello’, b’Hello World!’, reply=inbox)
# Publish with headers await nc.publish(‘hello’, b’With Headers’, headers={‘Foo’:’Bar’})
- while True:
- try:
msg = await sub.next_msg()
- except:
break
print(’———————-‘) print(‘Subject:’, msg.subject) print(‘Reply :’, msg.reply) print(‘Data :’, msg.data) print(‘Headers:’, msg.header)
- if __name__ == ‘__main__’:
asyncio.run(main())
- async nats.aio.client.Client.subscribe(self, subject, queue='', cb=None, future=None, max_msgs=0, pending_msgs_limit=524288, pending_bytes_limit=134217728) Subscription ¶
subscribe registers interest in a given subject.
If a callback is provided, messages will be processed asychronously.
If a callback isn’t provided, messages can be retrieved via an asynchronous iterator on the returned subscription object.
- Return type:
- Parameters:
subject (str)
queue (str)
cb (Callable[[Msg], Awaitable[None]] | None)
future (Future | None)
max_msgs (int)
pending_msgs_limit (int)
pending_bytes_limit (int)
- async nats.aio.client.Client.flush(self, timeout=10) None ¶
Sends a ping to the server expecting a pong back ensuring what we have written so far has made it to the server and also enabling measuring of roundtrip time. In case a pong is not returned within the allowed timeout, then it will raise nats.errors.TimeoutError
- Return type:
None
- Parameters:
timeout (int)
- async nats.aio.client.Client.close(self) None ¶
Closes the socket to which we are connected and sets the client to be in the CLOSED state. No further reconnections occur once reaching this point.
- Return type:
None
- async nats.aio.client.Client.drain(self) None ¶
drain will put a connection into a drain state. All subscriptions will immediately be put into a drain state. Upon completion, the publishers will be drained and can not publish any additional messages. Upon draining of the publishers, the connection will be closed. Use the closed_cb option to know when the connection has moved from draining to closed.
- Return type:
None
- nats.aio.client.Client.new_inbox(self) str ¶
new_inbox returns a unique inbox that can be used for NATS requests or subscriptions:
# Create unique subscription to receive direct messages. inbox = nc.new_inbox() sub = await nc.subscribe(inbox) nc.publish('broadcast', b'', reply=inbox) msg = sub.next_msg()
- Return type:
str
Subscription¶
- class nats.aio.subscription.Subscription(conn, id=0, subject='', queue='', cb=None, future=None, max_msgs=0, pending_msgs_limit=524288, pending_bytes_limit=134217728) None [source]¶
A Subscription represents interest in a particular subject.
A Subscription should not be constructed directly, rather connection.subscribe() should be used to get a subscription.
nc = await nats.connect() # Async Subscription async def cb(msg): print('Received', msg) await nc.subscribe('foo', cb=cb) # Sync Subscription sub = nc.subscribe('foo') msg = await sub.next_msg() print('Received', msg)
- Parameters:
id (int)
subject (str)
queue (str)
cb (Optional[Callable[[Msg], Awaitable[None]]])
future (Optional[asyncio.Future])
max_msgs (int)
pending_msgs_limit (int)
pending_bytes_limit (int)
- property subject: str¶
Returns the subject of the Subscription.
- property queue: str¶
Returns the queue name of the Subscription if part of a queue group.
- property messages: AsyncIterator[Msg]¶
Retrieves an async iterator for the messages from the subscription.
This is only available if a callback isn’t provided when creating a subscription.
nc = await nats.connect() sub = await nc.subscribe('foo') # Use `async for` which implicitly awaits messages async for msg in sub.messages: print('Received', msg)
- property pending_msgs: int¶
Number of delivered messages by the NATS Server that are being buffered in the pending queue.
- property pending_bytes: int¶
Size of data sent by the NATS Server that is being buffered in the pending queue.
- property delivered: int¶
Number of delivered messages to this subscription so far.
- async next_msg(timeout=1.0) Msg [source]¶
- Params timeout:
Time in seconds to wait for next message before timing out.
- Raises:
- Parameters:
timeout (float | None)
- Return type:
next_msg can be used to retrieve the next message from a stream of messages using await syntax, this only works when not passing a callback on subscribe:
sub = await nc.subscribe('hello') msg = await sub.next_msg(timeout=1)
- Return type:
- Parameters:
timeout (float | None)
- async unsubscribe(limit=0)[source]¶
- Parameters:
limit (
int
, default:0
) – Max number of messages to receive before unsubscribing.
Removes interest in a subject, remaining messages will be discarded.
If limit is greater than zero, interest is not immediately removed, rather, interest will be automatically removed after limit messages are received.
Msg¶
- class nats.aio.msg.Msg(_client, subject='', reply='', data=b'', headers=None, _metadata=None, _ackd=False, _sid=None) None [source]¶
Msg represents a message delivered by NATS.
- Parameters:
_client (NATS)
subject (str)
reply (str)
data (bytes)
headers (Optional[Dict[str, str]])
_metadata (Optional[Metadata])
_ackd (bool)
_sid (Optional[int])
- property header: Dict[str, str] | None¶
header returns the headers from a message.
- property sid: int¶
sid returns the subscription ID from a message.
- async respond(data) None [source]¶
respond replies to the inbox of the message if there is one.
- Return type:
None
- Parameters:
data (bytes)
- async ack_sync(timeout=1.0) Msg [source]¶
ack_sync waits for the acknowledgement to be processed by the server.
- Return type:
- Parameters:
timeout (float)
- async nak(delay=None) None [source]¶
nak negatively acknowledges a message delivered by JetStream triggering a redelivery. if delay is provided, redelivery is delayed for delay seconds
- Return type:
None
- Parameters:
delay (int | float | None)
- async in_progress() None [source]¶
in_progress acknowledges a message delivered by JetStream is still being worked on. Unlike other types of acks, an in-progress ack (+WPI) can be done multiple times.
- Return type:
None
- async term() None [source]¶
term terminates a message delivered by JetStream and disables redeliveries.
- Return type:
None
- class Metadata(sequence, num_pending, num_delivered, timestamp, stream, consumer, domain=None) None [source]¶
Metadata is the metadata from a JetStream message.
num_pending is the number of available messages in the Stream that have not been consumed yet.
num_delivered is the number of times that this message has been delivered. For example, num_delivered higher than one means that there have been redeliveries.
timestamp is the time at which the message was delivered.
stream is the name of the stream.
consumer is the name of the consumer.
- Parameters:
sequence (SequencePair)
num_pending (int)
num_delivered (int)
timestamp (datetime.datetime)
stream (str)
consumer (str)
domain (Optional[str])
Connection Properties¶
- property Client.servers: List[ParseResult]¶
- property Client.discovered_servers: List[ParseResult]¶
- property Client.max_payload: int¶
Returns the max payload which we received from the servers INFO
- property Client.connected_url: ParseResult | None¶
- property Client.client_id: int | None¶
Returns the client id which we received from the servers INFO
- property Client.pending_data_size: int¶
Connection State¶
- property Client.is_connected: bool¶
- property Client.is_closed: bool¶
- property Client.is_connecting: bool¶
- property Client.is_reconnecting: bool¶
- property Client.is_draining: bool¶
- property Client.is_draining_pubs: bool¶
- property Client.last_error: Exception | None¶
Returns the last error which may have occurred.
JetStream¶
- nats.aio.client.Client.jetstream(self, **opts) JetStreamContext ¶
jetstream returns a context that can be used to produce and consume messages from NATS JetStream.
- Parameters:
prefix – Default JetStream API Prefix.
domain – Optional domain used by the JetStream API.
timeout – Timeout for all JS API actions.
- Return type:
JetStreamContext
:: :rtype:
JetStreamContext
import asyncio import nats
- async def main():
nc = await nats.connect() js = nc.jetstream()
await js.add_stream(name=’hello’, subjects=[‘hello’]) ack = await js.publish(‘hello’, b’Hello JS!’) print(f’Ack: stream={ack.stream}, sequence={ack.seq}’) # Ack: stream=hello, sequence=1 await nc.close()
- if __name__ == ‘__main__’:
asyncio.run(main())
- async nats.js.client.JetStreamContext.publish(self, subject, payload=b'', timeout=None, stream=None, headers=None) PubAck ¶
publish emits a new message to JetStream and waits for acknowledgement.
- Return type:
- Parameters:
subject (str)
payload (bytes)
timeout (float | None)
stream (str | None)
headers (Dict[str, Any] | None)
- async nats.js.client.JetStreamContext.subscribe(self, subject, queue=None, cb=None, durable=None, stream=None, config=None, manual_ack=False, ordered_consumer=False, idle_heartbeat=None, flow_control=False, pending_msgs_limit=524288, pending_bytes_limit=268435456, deliver_policy=None, headers_only=None, inactive_threshold=None) PushSubscription ¶
Create consumer if needed and push-subscribe to it.
Check if consumer exists.
Creates consumer if needed.
Calls subscribe_bind.
- Parameters:
subject (str) – Subject from a stream from JetStream.
queue (Optional[str], default:
None
) – Deliver group name from a set a of queue subscribers.durable (Optional[str], default:
None
) – Name of the durable consumer to which the the subscription should be bound.stream (Optional[str], default:
None
) – Name of the stream to which the subscription should be bound. If not set, then the client will automatically look it up based on the subject.manual_ack (bool, default:
False
) – Disables auto acking for async subscriptions.ordered_consumer (bool, default:
False
) – Enable ordered consumer mode.idle_heartbeat (Optional[float], default:
None
) – Enable Heartbeats for a consumer to detect failures.flow_control (bool, default:
False
) – Enable Flow Control for a consumer.cb (Optional[Callback])
config (Optional[api.ConsumerConfig])
pending_msgs_limit (int)
pending_bytes_limit (int)
deliver_policy (Optional[api.DeliverPolicy])
headers_only (Optional[bool])
inactive_threshold (Optional[float])
- Return type:
PushSubscription
:: :rtype: PushSubscription
import asyncio import nats
- async def main():
nc = await nats.connect() js = nc.jetstream()
await js.add_stream(name=’hello’, subjects=[‘hello’]) await js.publish(‘hello’, b’Hello JS!’)
- async def cb(msg):
print(‘Received:’, msg)
# Ephemeral Async Subscribe await js.subscribe(‘hello’, cb=cb)
# Durable Async Subscribe # NOTE: Only one subscription can be bound to a durable name. It also auto acks by default. await js.subscribe(‘hello’, cb=cb, durable=’foo’)
# Durable Sync Subscribe # NOTE: Sync subscribers do not auto ack. await js.subscribe(‘hello’, durable=’bar’)
# Queue Async Subscribe # NOTE: Here ‘workers’ becomes deliver_group, durable name and queue name. await js.subscribe(‘hello’, ‘workers’, cb=cb)
- if __name__ == ‘__main__’:
asyncio.run(main())
- async nats.js.client.JetStreamContext.pull_subscribe(self, subject, durable=None, stream=None, config=None, pending_msgs_limit=524288, pending_bytes_limit=268435456, inbox_prefix=b'_INBOX.') PullSubscription ¶
Create consumer and pull subscription. :rtype:
PullSubscription
Find stream name by subject if stream is not passed.
Create consumer with the given config if not created.
Call pull_subscribe_bind.
import asyncio import nats async def main(): nc = await nats.connect() js = nc.jetstream() await js.add_stream(name='mystream', subjects=['foo']) await js.publish('foo', b'Hello World!') sub = await js.pull_subscribe('foo', stream='mystream') msgs = await sub.fetch() msg = msgs[0] await msg.ack() await nc.close() if __name__ == '__main__': asyncio.run(main())
- Parameters:
subject (str)
durable (str | None)
stream (str | None)
config (ConsumerConfig | None)
pending_msgs_limit (int)
pending_bytes_limit (int)
inbox_prefix (bytes)
- Return type:
- async nats.js.client.JetStreamContext.pull_subscribe_bind(self, consumer=None, stream=None, inbox_prefix=b'_INBOX.', pending_msgs_limit=524288, pending_bytes_limit=268435456, name=None, durable=None) PullSubscription ¶
pull_subscribe returns a PullSubscription that can be delivered messages from a JetStream pull based consumer by calling sub.fetch.
import asyncio import nats async def main(): nc = await nats.connect() js = nc.jetstream() await js.add_stream(name='mystream', subjects=['foo']) await js.publish('foo', b'Hello World!') msgs = await sub.fetch() msg = msgs[0] await msg.ack() await nc.close() if __name__ == '__main__': asyncio.run(main())
- Return type:
- Parameters:
consumer (str | None)
stream (str | None)
inbox_prefix (bytes)
pending_msgs_limit (int)
pending_bytes_limit (int)
name (str | None)
durable (str | None)
- class nats.js.client.JetStreamContext.PullSubscription(js, sub, stream, consumer, deliver) None ¶
PullSubscription is a subscription that can fetch messages.
- Parameters:
js (JetStreamContext)
sub (Subscription)
stream (str)
consumer (str)
deliver (bytes)
- async consumer_info() ConsumerInfo ¶
consumer_info gets the current info of the consumer from this subscription.
- Return type:
- property delivered: int¶
Number of delivered messages to this subscription so far.
- async fetch(batch=1, timeout=5, heartbeat=None) List[Msg] ¶
fetch makes a request to JetStream to be delivered a set of messages.
- Parameters:
batch (
int
, default:1
) – Number of messages to fetch from server.timeout (
Optional
[float
], default:5
) – Max duration of the fetch request before it expires.heartbeat (
Optional
[float
], default:None
) – Idle Heartbeat interval in seconds for the fetch request.
- Return type:
List[Msg]
:: :rtype:
List
[Msg
]import asyncio import nats
- async def main():
nc = await nats.connect() js = nc.jetstream()
await js.add_stream(name=’mystream’, subjects=[‘foo’]) await js.publish(‘foo’, b’Hello World!’)
msgs = await sub.fetch(5) for msg in msgs:
await msg.ack()
await nc.close()
- if __name__ == ‘__main__’:
asyncio.run(main())
- property pending_bytes: int¶
Size of data sent by the NATS Server that is being buffered in the pending queue.
- property pending_msgs: int¶
Number of delivered messages by the NATS Server that are being buffered in the pending queue.
- async unsubscribe() None ¶
unsubscribe destroys the inboxes of the pull subscription making it unable to continue to receive messages.
- Return type:
None
Manager¶
- class nats.js.manager.JetStreamManager(conn, prefix='$JS.API', timeout=5) None [source]¶
JetStreamManager exposes management APIs for JetStream.
- Parameters:
conn (NATS)
prefix (str)
timeout (float)
- async find_stream_name_by_subject(subject) str [source]¶
Find the stream to which a subject belongs in an account.
- Return type:
str
- Parameters:
subject (str)
- async stream_info(name, subjects_filter=None) StreamInfo [source]¶
Get the latest StreamInfo by stream name.
- Return type:
- Parameters:
name (str)
subjects_filter (str | None)
- async add_stream(config=None, **params) StreamInfo [source]¶
add_stream creates a stream.
- Return type:
- Parameters:
config (StreamConfig | None)
- async update_stream(config=None, **params) StreamInfo [source]¶
update_stream updates a stream.
- Return type:
- Parameters:
config (StreamConfig | None)
- async delete_stream(name) bool [source]¶
Delete a stream by name.
- Return type:
bool
- Parameters:
name (str)
- async purge_stream(name, seq=None, subject=None, keep=None) bool [source]¶
Purge a stream by name.
- Return type:
bool
- Parameters:
name (str)
seq (int | None)
subject (str | None)
keep (int | None)
- async streams_info(offset=0) List[StreamInfo] [source]¶
streams_info retrieves a list of streams with an optional offset.
- Return type:
List
[StreamInfo
]
- async streams_info_iterator(offset=0) Iterable[StreamInfo] [source]¶
streams_info retrieves a list of streams Iterator.
- Return type:
Iterable
[StreamInfo
]
- async consumers_info(stream, offset=None) List[ConsumerInfo] [source]¶
consumers_info retrieves a list of consumers. Consumers list limit is 256 for more consider to use offset :type stream:
str
:param stream: stream to get consumers :type offset:Optional
[int
], default:None
:param offset: consumers list offset- Return type:
List
[ConsumerInfo
]- Parameters:
stream (str)
offset (int | None)
- async get_msg(stream_name, seq=None, subject=None, direct=False, next=False) RawStreamMsg [source]¶
get_msg retrieves a message from a stream.
- Return type:
- Parameters:
stream_name (str)
seq (int | None)
subject (str | None)
direct (bool | None)
next (bool | None)
- async delete_msg(stream_name, seq) bool [source]¶
delete_msg retrieves a message from a stream based on the sequence ID.
- Return type:
bool
- Parameters:
stream_name (str)
seq (int)
- async get_last_msg(stream_name, subject, direct=False) RawStreamMsg [source]¶
get_last_msg retrieves the last message from a stream.
- Return type:
- Parameters:
stream_name (str)
subject (str)
direct (bool | None)
KeyValue¶
- class nats.js.kv.KeyValue(name, stream, pre, js, direct) None [source]¶
KeyValue uses the JetStream KeyValue functionality.
Note
This functionality is EXPERIMENTAL and may be changed in later releases.
import asyncio import nats async def main(): nc = await nats.connect() js = nc.jetstream() # Create a KV kv = await js.create_key_value(bucket='MY_KV') # Set and retrieve a value await kv.put('hello', b'world') entry = await kv.get('hello') print(f'KeyValue.Entry: key={entry.key}, value={entry.value}') # KeyValue.Entry: key=hello, value=world await nc.close() if __name__ == '__main__': asyncio.run(main())
- Parameters:
name (str)
stream (str)
pre (str)
js (JetStreamContext)
direct (bool)
- class Entry(bucket, key, value, revision, delta, created, operation) None [source]¶
An entry from a KeyValue store in JetStream.
- Parameters:
bucket (str)
key (str)
value (bytes | None)
revision (int | None)
delta (int | None)
created (int | None)
operation (str | None)
- class BucketStatus(stream_info, bucket) None [source]¶
BucketStatus is the status of a KeyValue bucket.
- Parameters:
stream_info (StreamInfo)
bucket (str)
- property values: int¶
values returns the number of stored messages in the stream.
- property history: int¶
history returns the max msgs per subject.
- property ttl: float | None¶
ttl returns the max age in seconds.
- async get(key, revision=None) Entry [source]¶
get returns the latest value for the key.
- Return type:
Entry
- Parameters:
key (str)
revision (Optional[int])
- async put(key, value) int [source]¶
put will place the new value for the key into the store and return the revision number.
- Return type:
int
- Parameters:
key (str)
value (bytes)
- async create(key, value) int [source]¶
create will add the key/value pair iff it does not exist.
- Return type:
int
- Parameters:
key (str)
value (bytes)
- async update(key, value, last=None) int [source]¶
update will update the value iff the latest revision matches.
- Return type:
int
- Parameters:
key (str)
value (bytes)
last (int | None)
- async delete(key, last=None) bool [source]¶
delete will place a delete marker and remove all previous revisions.
- Return type:
bool
- Parameters:
key (str)
last (int | None)
- async purge(key) bool [source]¶
purge will remove the key and all revisions.
- Return type:
bool
- Parameters:
key (str)
- async purge_deletes(olderthan=1800) bool [source]¶
purge will remove all current delete markers older. :type olderthan:
int
, default:1800
:param olderthan: time in seconds- Return type:
bool
- Parameters:
olderthan (int)
- async status() BucketStatus [source]¶
status retrieves the status and configuration of a bucket.
- Return type:
BucketStatus
- async watchall(**kwargs) KeyWatcher [source]¶
watchall returns a KeyValue watcher that matches all the keys.
- Return type:
KeyWatcher
- async keys(filters=None, **kwargs) List[str] [source]¶
Returns a list of the keys from a KeyValue store. Optionally filters the keys based on the provided filter list.
- Return type:
List
[str
]- Parameters:
filters (List[str])
- async history(key) List[Entry] [source]¶
history retrieves a list of the entries so far.
- Return type:
List[Entry]
- Parameters:
key (str)
- async watch(keys, headers_only=False, include_history=False, ignore_deletes=False, meta_only=False, inactive_threshold=None) KeyWatcher [source]¶
watch will fire a callback when a key that matches the keys pattern is updated. The first update after starting the watch is None in case there are no pending updates.
- Return type:
KeyWatcher
API¶
- class nats.js.api.Header(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]¶
- class nats.js.api.StatusCode(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]¶
- class nats.js.api.Base None [source]¶
Helper dataclass to filter unknown fields from the API.
- classmethod from_response(resp) _B [source]¶
Read the class instance from a server response.
Unknown fields are ignored (“open-world assumption”).
- Return type:
TypeVar
(_B
, bound= Base)- Parameters:
resp (Dict[str, Any])
- class nats.js.api.PubAck(stream, seq, domain=None, duplicate=None) None [source]¶
PubAck is the response of publishing a message to JetStream.
- Parameters:
stream (str)
seq (int)
domain (str | None)
duplicate (bool | None)
- class nats.js.api.Placement(cluster=None, tags=None) None [source]¶
Placement directives to consider when placing replicas of this stream
- Parameters:
cluster (str | None)
tags (List[str] | None)
- class nats.js.api.ExternalStream(api, deliver=None) None [source]¶
- Parameters:
api (str)
deliver (str | None)
- class nats.js.api.StreamSource(name, opt_start_seq=None, filter_subject=None, external=None, subject_transforms=None) None [source]¶
- Parameters:
name (str)
opt_start_seq (int | None)
filter_subject (str | None)
external (ExternalStream | None)
subject_transforms (List[SubjectTransform] | None)
- class nats.js.api.StreamSourceInfo(name, lag=None, active=None, error=None) None [source]¶
- Parameters:
name (str)
lag (int | None)
active (int | None)
error (Dict[str, Any] | None)
- class nats.js.api.LostStreamData(msgs=None, bytes=None) None [source]¶
- Parameters:
msgs (List[int] | None)
bytes (int | None)
- class nats.js.api.StreamState(messages, bytes, first_seq, last_seq, consumer_count, deleted=None, num_deleted=None, lost=None, subjects=None) None [source]¶
- Parameters:
messages (int)
bytes (int)
first_seq (int)
last_seq (int)
consumer_count (int)
deleted (List[int] | None)
num_deleted (int | None)
lost (LostStreamData | None)
subjects (Dict[str, int] | None)
- class nats.js.api.RetentionPolicy(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]¶
How message retention is considered
- class nats.js.api.StorageType(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]¶
The type of storage backend
- class nats.js.api.DiscardPolicy(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]¶
Discard policy when a stream reaches its limits
- class nats.js.api.StoreCompression(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]¶
If stream is file-based and a compression algorithm is specified, the stream data will be compressed on disk.
Valid options are none or s2 for Snappy compression. Introduced in nats-server 2.10.0.
- class nats.js.api.RePublish(src=None, dest=None, headers_only=None) None [source]¶
RePublish is for republishing messages once committed to a stream. The original subject cis remapped from the subject pattern to the destination pattern.
- Parameters:
src (str | None)
dest (str | None)
headers_only (bool | None)
- class nats.js.api.SubjectTransform(src, dest) None [source]¶
Subject transform to apply to matching messages.
- Parameters:
src (str)
dest (str)
- class nats.js.api.StreamConfig(name=None, description=None, subjects=None, retention=None, max_consumers=None, max_msgs=None, max_bytes=None, discard=DiscardPolicy.OLD, discard_new_per_subject=False, max_age=None, max_msgs_per_subject=-1, max_msg_size=-1, storage=None, num_replicas=None, no_ack=False, template_owner=None, duplicate_window=0, placement=None, mirror=None, sources=None, sealed=False, deny_delete=False, deny_purge=False, allow_rollup_hdrs=False, republish=None, subject_transform=None, allow_direct=None, mirror_direct=None, compression=None, metadata=None) None [source]¶
StreamConfig represents the configuration of a stream.
- Parameters:
name (str | None)
description (str | None)
subjects (List[str] | None)
retention (RetentionPolicy | None)
max_consumers (int | None)
max_msgs (int | None)
max_bytes (int | None)
discard (DiscardPolicy | None)
discard_new_per_subject (bool)
max_age (float | None)
max_msgs_per_subject (int)
max_msg_size (int | None)
storage (StorageType | None)
num_replicas (int | None)
no_ack (bool)
template_owner (str | None)
duplicate_window (float)
placement (Placement | None)
mirror (StreamSource | None)
sources (List[StreamSource] | None)
sealed (bool)
deny_delete (bool)
deny_purge (bool)
allow_rollup_hdrs (bool)
republish (RePublish | None)
subject_transform (SubjectTransform | None)
allow_direct (bool | None)
mirror_direct (bool | None)
compression (StoreCompression | None)
metadata (Dict[str, str] | None)
- class nats.js.api.PeerInfo(name=None, current=None, offline=None, active=None, lag=None) None [source]¶
- Parameters:
name (str | None)
current (bool | None)
offline (bool | None)
active (int | None)
lag (int | None)
- class nats.js.api.ClusterInfo(leader=None, name=None, replicas=None) None [source]¶
- Parameters:
leader (str | None)
name (str | None)
replicas (List[PeerInfo] | None)
- class nats.js.api.StreamInfo(config, state, mirror=None, sources=None, cluster=None, did_create=None) None [source]¶
StreamInfo is the latest information about a stream from JetStream.
- Parameters:
config (StreamConfig)
state (StreamState)
mirror (StreamSourceInfo | None)
sources (List[StreamSourceInfo] | None)
cluster (ClusterInfo | None)
did_create (bool | None)
- class nats.js.api.StreamsListIterator(offset, total, streams) None [source]¶
StreamsListIterator is an iterator for streams list responses from JetStream.
- Parameters:
offset (int)
total (int)
streams (List[Dict[str, any]])
- class nats.js.api.AckPolicy(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]¶
Policies defining how messages should be acknowledged.
If an ack is required but is not received within the AckWait window, the message will be redelivered.
References
- class nats.js.api.DeliverPolicy(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]¶
When a consumer is first created, it can specify where in the stream it wants to start receiving messages.
This is the DeliverPolicy, and this enumeration defines allowed values.
References
- class nats.js.api.ReplayPolicy(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]¶
The replay policy applies when the DeliverPolicy is one of the following:
all: Delivers all messages from the beginning of the stream.
by_start_sequence: Delivers messages starting from a specific sequence.
by_start_time: Delivers messages starting from a specific timestamp.
These deliver policies begin reading the stream at a position other than the end.
References
- class nats.js.api.ConsumerConfig(name=None, durable_name=None, description=None, deliver_policy=DeliverPolicy.ALL, opt_start_seq=None, opt_start_time=None, ack_policy=AckPolicy.EXPLICIT, ack_wait=None, max_deliver=None, backoff=None, filter_subject=None, filter_subjects=None, replay_policy=ReplayPolicy.INSTANT, rate_limit_bps=None, sample_freq=None, max_waiting=None, max_ack_pending=None, flow_control=None, idle_heartbeat=None, headers_only=None, deliver_subject=None, deliver_group=None, inactive_threshold=None, num_replicas=None, mem_storage=None, metadata=None) None [source]¶
Consumer configuration.
References
- Parameters:
name (str | None)
durable_name (str | None)
description (str | None)
deliver_policy (DeliverPolicy | None)
opt_start_seq (int | None)
opt_start_time (int | None)
ack_policy (AckPolicy | None)
ack_wait (float | None)
max_deliver (int | None)
backoff (List[float] | None)
filter_subject (str | None)
filter_subjects (List[str] | None)
replay_policy (ReplayPolicy | None)
rate_limit_bps (int | None)
sample_freq (str | None)
max_waiting (int | None)
max_ack_pending (int | None)
flow_control (bool | None)
idle_heartbeat (float | None)
headers_only (bool | None)
deliver_subject (str | None)
deliver_group (str | None)
inactive_threshold (float | None)
num_replicas (int | None)
mem_storage (bool | None)
metadata (Dict[str, str] | None)
- class nats.js.api.SequenceInfo(consumer_seq, stream_seq) None [source]¶
- Parameters:
consumer_seq (int)
stream_seq (int)
- class nats.js.api.ConsumerInfo(name, stream_name, config, delivered=None, ack_floor=None, num_ack_pending=None, num_redelivered=None, num_waiting=None, num_pending=None, cluster=None, push_bound=None) None [source]¶
ConsumerInfo represents the info about the consumer.
- Parameters:
name (str)
stream_name (str)
config (ConsumerConfig)
delivered (SequenceInfo | None)
ack_floor (SequenceInfo | None)
num_ack_pending (int | None)
num_redelivered (int | None)
num_waiting (int | None)
num_pending (int | None)
cluster (ClusterInfo | None)
push_bound (bool | None)
- class nats.js.api.AccountLimits(max_memory, max_storage, max_streams, max_consumers, max_ack_pending, memory_max_stream_bytes, storage_max_stream_bytes, max_bytes_required) None [source]¶
Account limits
References
- Parameters:
max_memory (int)
max_storage (int)
max_streams (int)
max_consumers (int)
max_ack_pending (int)
memory_max_stream_bytes (int)
storage_max_stream_bytes (int)
max_bytes_required (bool)
- class nats.js.api.Tier(memory, storage, streams, consumers, limits) None [source]¶
- Parameters:
memory (int)
storage (int)
streams (int)
consumers (int)
limits (AccountLimits)
- class nats.js.api.APIStats(total, errors) None [source]¶
API stats
- Parameters:
total (int)
errors (int)
- class nats.js.api.AccountInfo(memory, storage, streams, consumers, limits, api, domain=None, tiers=None) None [source]¶
Account information
References
- Parameters:
memory (int)
storage (int)
streams (int)
consumers (int)
limits (AccountLimits)
api (APIStats)
domain (str | None)
tiers (Dict[str, Tier] | None)
- class nats.js.api.RawStreamMsg(subject=None, seq=None, data=None, hdrs=None, headers=None, stream=None) None [source]¶
- Parameters:
subject (str | None)
seq (int | None)
data (bytes | None)
hdrs (bytes | None)
headers (Dict | None)
stream (str | None)
- property header: Dict | None¶
header returns the headers from a message.
- class nats.js.api.KeyValueConfig(bucket, description=None, max_value_size=None, history=1, ttl=None, max_bytes=None, storage=None, replicas=1, placement=None, republish=None, direct=None) None [source]¶
KeyValueConfig is the configuration of a KeyValue store.
- Parameters:
bucket (str)
description (str | None)
max_value_size (int | None)
history (int)
ttl (float | None)
max_bytes (int | None)
storage (StorageType | None)
replicas (int)
placement (Placement | None)
republish (RePublish | None)
direct (bool | None)
- class nats.js.api.StreamPurgeRequest(seq=None, filter=None, keep=None) None [source]¶
StreamPurgeRequest is optional request information to the purge API.
- Parameters:
seq (int | None)
filter (str | None)
keep (int | None)
- class nats.js.api.ObjectStoreConfig(bucket=None, description=None, ttl=None, max_bytes=None, storage=None, replicas=1, placement=None) None [source]¶
ObjectStoreConfig is the configurigation of an ObjectStore.
- Parameters:
bucket (str | None)
description (str | None)
ttl (float | None)
max_bytes (int | None)
storage (StorageType | None)
replicas (int)
placement (Placement | None)
- class nats.js.api.ObjectLink(bucket, name=None) None [source]¶
ObjectLink is used to embed links to other buckets and objects.
- Parameters:
bucket (str)
name (str | None)
- class nats.js.api.ObjectMetaOptions(link=None, max_chunk_size=None) None [source]¶
- Parameters:
link (ObjectLink | None)
max_chunk_size (int | None)
- class nats.js.api.ObjectMeta(name=None, description=None, headers=None, options=None) None [source]¶
ObjectMeta is high level information about an object.
- Parameters:
name (str | None)
description (str | None)
headers (dict | None)
options (ObjectMetaOptions | None)
- class nats.js.api.ObjectInfo(name, bucket, nuid, size=None, mtime=None, chunks=None, digest=None, deleted=False, description=None, headers=None, options=None) None [source]¶
ObjectInfo is meta plus instance information.
- Parameters:
name (str)
bucket (str)
nuid (str)
size (int | None)
mtime (str | None)
chunks (int | None)
digest (str | None)
deleted (bool | None)
description (str | None)
headers (dict | None)
options (ObjectMetaOptions | None)
NUID¶
Errors¶
- exception nats.errors.SlowConsumerError(subject, reply, sid, sub) None [source]¶
- Parameters:
subject (str)
reply (str)
sid (int)
sub (Subscription)
- Return type:
None
- exception nats.errors.NotJSMessageError[source]¶
When it is attempted to use an API meant for JetStream on a message that does not belong to a stream.
JetStream Errors¶
- exception nats.js.errors.Error(description=None) None [source]¶
An Error raised by the NATS client when using JetStream.
- Parameters:
description (Optional[str])
- Return type:
None
- exception nats.js.errors.APIError(code=None, description=None, err_code=None, stream=None, seq=None) None [source]¶
An Error that is the result of interacting with NATS JetStream.
- Parameters:
code (int | None)
description (str | None)
err_code (int | None)
stream (str | None)
seq (int | None)
- Return type:
None
A 503 error
- Parameters:
code (int | None)
description (str | None)
err_code (int | None)
stream (str | None)
seq (int | None)
- Return type:
None
- exception nats.js.errors.ServerError(code=None, description=None, err_code=None, stream=None, seq=None) None [source]¶
A 500 error
- Parameters:
code (int | None)
description (str | None)
err_code (int | None)
stream (str | None)
seq (int | None)
- Return type:
None
- exception nats.js.errors.NotFoundError(code=None, description=None, err_code=None, stream=None, seq=None) None [source]¶
A 404 error
- Parameters:
code (int | None)
description (str | None)
err_code (int | None)
stream (str | None)
seq (int | None)
- Return type:
None
- exception nats.js.errors.BadRequestError(code=None, description=None, err_code=None, stream=None, seq=None) None [source]¶
A 400 error.
- Parameters:
code (int | None)
description (str | None)
err_code (int | None)
stream (str | None)
seq (int | None)
- Return type:
None
- exception nats.js.errors.NoStreamResponseError(description=None) None [source]¶
Raised if the client gets a 503 when publishing a message.
- Parameters:
description (Optional[str])
- Return type:
None
- exception nats.js.errors.TooManyStalledMsgsError(description=None) None [source]¶
Raised when too many outstanding async published messages are waiting for ack.
- Parameters:
description (Optional[str])
- Return type:
None
- exception nats.js.errors.FetchTimeoutError[source]¶
Raised if the consumer timed out waiting for messages.
- exception nats.js.errors.ConsumerSequenceMismatchError(stream_resume_sequence=None, consumer_sequence=None, last_consumer_sequence=None) None [source]¶
Async error raised by the client with idle_heartbeat mode enabled when one of the message sequences is not the expected one.
- Return type:
None
- exception nats.js.errors.BucketNotFoundError(code=None, description=None, err_code=None, stream=None, seq=None) None [source]¶
When attempted to bind to a JetStream KeyValue that does not exist.
- Parameters:
code (int | None)
description (str | None)
err_code (int | None)
stream (str | None)
seq (int | None)
- Return type:
None
- exception nats.js.errors.BadBucketError(code=None, description=None, err_code=None, stream=None, seq=None) None [source]¶
- Parameters:
code (int | None)
description (str | None)
err_code (int | None)
stream (str | None)
seq (int | None)
- Return type:
None
- exception nats.js.errors.KeyValueError(code=None, description=None, err_code=None, stream=None, seq=None) None [source]¶
Raised when there is an issue interacting with the KeyValue store.
- Parameters:
code (int | None)
description (str | None)
err_code (int | None)
stream (str | None)
seq (int | None)
- Return type:
None
- exception nats.js.errors.KeyDeletedError(entry=None, op=None) None [source]¶
Raised when trying to get a key that was deleted from a JetStream KeyValue store.
- Return type:
None
- exception nats.js.errors.KeyNotFoundError(entry=None, op=None, message=None) None [source]¶
Raised when trying to get a key that does not exists from a JetStream KeyValue store.
- Return type:
None
- exception nats.js.errors.KeyWrongLastSequenceError(description=None) None [source]¶
Raised when trying to update a key with the wrong last sequence.
- Parameters:
description (str | None)
- Return type:
None
- exception nats.js.errors.NoKeysError(code=None, description=None, err_code=None, stream=None, seq=None) None [source]¶
- Parameters:
code (int | None)
description (str | None)
err_code (int | None)
stream (str | None)
seq (int | None)
- Return type:
None
- exception nats.js.errors.KeyHistoryTooLargeError(code=None, description=None, err_code=None, stream=None, seq=None) None [source]¶
- Parameters:
code (int | None)
description (str | None)
err_code (int | None)
stream (str | None)
seq (int | None)
- Return type:
None
- exception nats.js.errors.InvalidBucketNameError(description=None) None [source]¶
Raised when trying to create a KV or OBJ bucket with invalid name.
- Parameters:
description (Optional[str])
- Return type:
None
- exception nats.js.errors.InvalidObjectNameError(description=None) None [source]¶
Raised when trying to put an object in Object Store with invalid key.
- Parameters:
description (Optional[str])
- Return type:
None
- exception nats.js.errors.BadObjectMetaError(description=None) None [source]¶
Raised when trying to read corrupted metadata from Object Store.
- Parameters:
description (Optional[str])
- Return type:
None
- exception nats.js.errors.LinkIsABucketError(description=None) None [source]¶
Raised when trying to get object from Object Store that is a bucket.
- Parameters:
description (Optional[str])
- Return type:
None
- exception nats.js.errors.DigestMismatchError(description=None) None [source]¶
Raised when getting an object from Object Store that has a different digest than expected.
- Parameters:
description (Optional[str])
- Return type:
None
- exception nats.js.errors.ObjectNotFoundError(code=None, description=None, err_code=None, stream=None, seq=None) None [source]¶
When attempted to lookup an Object that does not exist.
- Parameters:
code (int | None)
description (str | None)
err_code (int | None)
stream (str | None)
seq (int | None)
- Return type:
None
- exception nats.js.errors.ObjectDeletedError(code=None, description=None, err_code=None, stream=None, seq=None) None [source]¶
When attempted to do an operation to an Object that does not exist.
- Parameters:
code (int | None)
description (str | None)
err_code (int | None)
stream (str | None)
seq (int | None)
- Return type:
None
Deprecated Errors¶
Catching the following errors will be removed eventually, please use the recommended alternative error instead.
- exception nats.aio.errors.NatsError[source]¶
Deprecated since version v2.0.0.
Please use nats.errors.Error instead.
- exception nats.aio.errors.ErrConnectionClosed[source]¶
Deprecated since version v2.0.0.
Please use nats.errors.ConnectionClosedError instead.
- exception nats.aio.errors.ErrDrainTimeout[source]¶
Deprecated since version v2.0.0.
Please use nats.errors.DrainTimeoutError instead.
- exception nats.aio.errors.ErrInvalidUserCredentials[source]¶
Deprecated since version v2.0.0.
Please use nats.errors.InvalidUserCredentialsError instead.
- exception nats.aio.errors.ErrInvalidCallbackType[source]¶
Deprecated since version v2.0.0.
Please use nats.errors.InvalidCallbackTypeError instead.
- exception nats.aio.errors.ErrConnectionReconnecting[source]¶
Deprecated since version v2.0.0.
Please use nats.errors.ConnectionReconnectingError instead.
- exception nats.aio.errors.ErrConnectionDraining[source]¶
Deprecated since version v2.0.0.
Please use nats.errors.ConnectionDrainingError instead.
- exception nats.aio.errors.ErrMaxPayload[source]¶
Deprecated since version v2.0.0.
Please use nats.errors.MaxPayloadError instead.
- exception nats.aio.errors.ErrStaleConnection[source]¶
Deprecated since version v2.0.0.
Please use nats.errors.StaleConnectionError instead.
- exception nats.aio.errors.ErrJsonParse[source]¶
Deprecated since version v2.0.0.
Please use nats.errors.JsonParseError instead.
- exception nats.aio.errors.ErrSecureConnRequired[source]¶
Deprecated since version v2.0.0.
Please use nats.errors.SecureConnRequiredError instead.
- exception nats.aio.errors.ErrSecureConnWanted[source]¶
Deprecated since version v2.0.0.
Please use nats.errors.SecureConnWantedError instead.
- exception nats.aio.errors.ErrSecureConnFailed[source]¶
Deprecated since version v2.0.0.
Please use nats.errors.SecureConnFailedError instead.
- exception nats.aio.errors.ErrBadSubscription[source]¶
Deprecated since version v2.0.0.
Please use nats.errors.BadSubscriptionError instead.
- exception nats.aio.errors.ErrBadSubject[source]¶
Deprecated since version v2.0.0.
Please use nats.errors.BadSubjectError instead.
- exception nats.aio.errors.ErrSlowConsumer(subject, reply, sid, sub) None [source]¶
Deprecated since version v2.0.0.
Please use nats.errors.SlowConsumerError instead.
- Parameters:
subject (str)
reply (str)
sid (int)
sub (Subscription)
- Return type:
None
- exception nats.aio.errors.ErrTimeout[source]¶
Deprecated since version v2.0.0.
Please use nats.errors.TimeoutError instead.
- exception nats.aio.errors.ErrBadTimeout[source]¶
Deprecated since version v2.0.0.
Please use nats.errors.BadTimeoutError instead.