nats.py v2.0.0

Major upgrade to the APIs of the Python3 client! For this release the client has also been renamed to be nats-py from asyncio-nats-client, it can now be installed with:

pip install nats-py

# With NKEYS / JWT support
pip install nats-py[nkeys]

This version of the client is not completely compatible with previous versions of the client and it is designed to be used with Python 3.7.

Overall, the API of the client should resemble more the APIs of the Go client:

import nats

async def main():
  nc = await nats.connect("demo.nats.io")

  sub = await nc.subscribe("hello")

  await nc.publish("hello")

  msg = await sub.next_msg()
  print(f"Received [{msg.subject}]: {msg.data}")

  await nc.close()

if __name__ == '__main__':
    asyncio.run(main())

There is support for NATS Headers ⚡

import asyncio
import nats
from nats.errors import TimeoutError

async def main():
  nc = await nats.connect("demo.nats.io")

  async def help_request(msg):
      print(f"Received a message on '{msg.subject} {msg.reply}': {msg.data.decode()}")
      print("Headers", msg.header)
      await msg.respond(b'OK')

  sub = await nc.subscribe("hello", "workers", help_request)

  try:
      response = await nc.request("help", b'help me', timeout=0.5)
      print("Received response: {message}".format(
          message=response.data.decode()))
  except TimeoutError:
      print("Request timed out")

  await nc.close()

if __name__ == '__main__':
    asyncio.run(main())

It also now includes JetStream support:

import asyncio
import nats

async def main():
  nc = await nats.connect("demo.nats.io")

  # Create JetStream context.
  js = nc.jetstream()

  # Persist messages on 'foo' subject.
  await js.add_stream(name="sample-stream", subjects=["foo"])

  for i in range(0, 10):
      ack = await js.publish("foo", f"hello world: {i}".encode())
      print(ack)

  # Create pull based consumer on 'foo'.
  psub = await js.pull_subscribe("foo", "psub")

  # Fetch and ack messagess from consumer.
  for i in range(0, 10):
      msgs = await psub.fetch()
      for msg in msgs:
          print(msg)

  await nc.close()

if __name__ == '__main__':
  asyncio.run(main())

As well as JetStream KV support:

import asyncio
import nats

async def main():
  nc = await nats.connect()
  js = nc.jetstream()

  # Create a KV
  kv = await js.create_key_value(bucket='MY_KV')

  # Set and retrieve a value
  await kv.put('hello', b'world')
  entry = await kv.get('hello')
  print(f'KeyValue.Entry: key={entry.key}, value={entry.value}')
  # KeyValue.Entry: key=hello, value=world

  await nc.close()

if __name__ == '__main__':
    asyncio.run(main())

Breaking changes

  • Changed the return type of subscribe instead of returning a sid.

  • Changed suffix of most errors to follow PEP-8 style and now use the Error suffix. For example, ErrSlowConsumer is now SlowConsumerError. Old style errors are subclasses of the new ones so exceptions under try...catch blocks would be still caught.

Deprecated

Several areas of the client got deprecated in this release:

  • Deprecated is_async parameter for subscribe

  • Deprecated Client.timed_request

  • Deprecated passing loop parameter to most functions