nats.py v2.0.0¶
Major upgrade to the APIs of the Python3 client! For this release the client has
also been renamed to be nats-py
from asyncio-nats-client
, it can now be installed with:
pip install nats-py
# With NKEYS / JWT support
pip install nats-py[nkeys]
This version of the client is not completely compatible with previous versions of the client and it is designed to be used with Python 3.7.
Overall, the API of the client should resemble more the APIs of the Go client:
import nats
async def main():
nc = await nats.connect("demo.nats.io")
sub = await nc.subscribe("hello")
await nc.publish("hello")
msg = await sub.next_msg()
print(f"Received [{msg.subject}]: {msg.data}")
await nc.close()
if __name__ == '__main__':
asyncio.run(main())
There is support for NATS Headers ⚡
import asyncio
import nats
from nats.errors import TimeoutError
async def main():
nc = await nats.connect("demo.nats.io")
async def help_request(msg):
print(f"Received a message on '{msg.subject} {msg.reply}': {msg.data.decode()}")
print("Headers", msg.header)
await msg.respond(b'OK')
sub = await nc.subscribe("hello", "workers", help_request)
try:
response = await nc.request("help", b'help me', timeout=0.5)
print("Received response: {message}".format(
message=response.data.decode()))
except TimeoutError:
print("Request timed out")
await nc.close()
if __name__ == '__main__':
asyncio.run(main())
It also now includes JetStream
support:
import asyncio
import nats
async def main():
nc = await nats.connect("demo.nats.io")
# Create JetStream context.
js = nc.jetstream()
# Persist messages on 'foo' subject.
await js.add_stream(name="sample-stream", subjects=["foo"])
for i in range(0, 10):
ack = await js.publish("foo", f"hello world: {i}".encode())
print(ack)
# Create pull based consumer on 'foo'.
psub = await js.pull_subscribe("foo", "psub")
# Fetch and ack messagess from consumer.
for i in range(0, 10):
msgs = await psub.fetch()
for msg in msgs:
print(msg)
await nc.close()
if __name__ == '__main__':
asyncio.run(main())
As well as JetStream KV
support:
import asyncio
import nats
async def main():
nc = await nats.connect()
js = nc.jetstream()
# Create a KV
kv = await js.create_key_value(bucket='MY_KV')
# Set and retrieve a value
await kv.put('hello', b'world')
entry = await kv.get('hello')
print(f'KeyValue.Entry: key={entry.key}, value={entry.value}')
# KeyValue.Entry: key=hello, value=world
await nc.close()
if __name__ == '__main__':
asyncio.run(main())
Breaking changes¶
Changed the return type of
subscribe
instead of returning a sid.Changed suffix of most errors to follow PEP-8 style and now use the
Error
suffix. For example,ErrSlowConsumer
is nowSlowConsumerError
. Old style errors are subclasses of the new ones so exceptions undertry...catch
blocks would be still caught.
Deprecated¶
Several areas of the client got deprecated in this release:
Deprecated
is_async
parameter forsubscribe
Deprecated
Client.timed_request
Deprecated passing
loop
parameter to most functions