# Copyright 2021 The NATS Authors
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from __future__ import annotations
import base64
import json
from email.parser import BytesParser
from typing import TYPE_CHECKING, Any, List, Optional, Dict
from nats.errors import NoRespondersError
from nats.js import api
from nats.js.errors import APIError, NotFoundError, ServiceUnavailableError
if TYPE_CHECKING:
from nats import NATS
NATS_HDR_LINE = bytearray(b'NATS/1.0')
NATS_HDR_LINE_SIZE = len(NATS_HDR_LINE)
_CRLF_ = b'\r\n'
_CRLF_LEN_ = len(_CRLF_)
[docs]
class JetStreamManager:
"""
JetStreamManager exposes management APIs for JetStream.
"""
def __init__(
self,
conn: NATS,
prefix: str = api.DEFAULT_PREFIX,
timeout: float = 5,
) -> None:
self._prefix = prefix
self._nc = conn
self._timeout = timeout
self._hdr_parser = BytesParser()
async def account_info(self) -> api.AccountInfo:
resp = await self._api_request(
f"{self._prefix}.INFO", b'', timeout=self._timeout
)
return api.AccountInfo.from_response(resp)
[docs]
async def find_stream_name_by_subject(self, subject: str) -> str:
"""
Find the stream to which a subject belongs in an account.
"""
req_sub = f"{self._prefix}.STREAM.NAMES"
req_data = json.dumps({"subject": subject})
info = await self._api_request(
req_sub, req_data.encode(), timeout=self._timeout
)
if not info['streams']:
raise NotFoundError
return info['streams'][0]
[docs]
async def stream_info(self, name: str, subjects_filter: Optional[str] = None) -> api.StreamInfo:
"""
Get the latest StreamInfo by stream name.
"""
req_data = ''
if subjects_filter:
req_data = json.dumps({"subjects_filter": subjects_filter})
resp = await self._api_request(
f"{self._prefix}.STREAM.INFO.{name}", req_data.encode(), timeout=self._timeout
)
return api.StreamInfo.from_response(resp)
[docs]
async def add_stream(
self,
config: Optional[api.StreamConfig] = None,
**params
) -> api.StreamInfo:
"""
add_stream creates a stream.
"""
if config is None:
config = api.StreamConfig()
config = config.evolve(**params)
if config.name is None:
raise ValueError("nats: stream name is required")
data = json.dumps(config.as_dict())
resp = await self._api_request(
f"{self._prefix}.STREAM.CREATE.{config.name}",
data.encode(),
timeout=self._timeout,
)
return api.StreamInfo.from_response(resp)
[docs]
async def update_stream(
self,
config: Optional[api.StreamConfig] = None,
**params
) -> api.StreamInfo:
"""
update_stream updates a stream.
"""
if config is None:
config = api.StreamConfig()
config = config.evolve(**params)
if config.name is None:
raise ValueError("nats: stream name is required")
data = json.dumps(config.as_dict())
resp = await self._api_request(
f"{self._prefix}.STREAM.UPDATE.{config.name}",
data.encode(),
timeout=self._timeout,
)
return api.StreamInfo.from_response(resp)
[docs]
async def delete_stream(self, name: str) -> bool:
"""
Delete a stream by name.
"""
resp = await self._api_request(
f"{self._prefix}.STREAM.DELETE.{name}", timeout=self._timeout
)
return resp['success']
[docs]
async def purge_stream(
self,
name: str,
seq: Optional[int] = None,
subject: Optional[str] = None,
keep: Optional[int] = None
) -> bool:
"""
Purge a stream by name.
"""
stream_req: Dict[str, Any] = {}
if seq:
stream_req['seq'] = seq
if subject:
stream_req['filter'] = subject
if keep:
stream_req['keep'] = keep
req = json.dumps(stream_req)
resp = await self._api_request(
f"{self._prefix}.STREAM.PURGE.{name}",
req.encode(),
timeout=self._timeout
)
return resp['success']
async def consumer_info(
self, stream: str, consumer: str, timeout: Optional[float] = None
):
# TODO: Validate the stream and consumer names.
if timeout is None:
timeout = self._timeout
resp = await self._api_request(
f"{self._prefix}.CONSUMER.INFO.{stream}.{consumer}",
b'',
timeout=timeout
)
return api.ConsumerInfo.from_response(resp)
[docs]
async def streams_info(self) -> List[api.StreamInfo]:
"""
streams_info retrieves a list of streams.
"""
resp = await self._api_request(
f"{self._prefix}.STREAM.LIST",
b'',
timeout=self._timeout,
)
streams = []
for stream in resp['streams']:
stream_info = api.StreamInfo.from_response(stream)
streams.append(stream_info)
return streams
async def add_consumer(
self,
stream: str,
config: Optional[api.ConsumerConfig] = None,
timeout: Optional[float] = None,
**params,
) -> api.ConsumerInfo:
if not timeout:
timeout = self._timeout
if config is None:
config = api.ConsumerConfig()
config = config.evolve(**params)
durable_name = config.durable_name
req = {"stream_name": stream, "config": config.as_dict()}
req_data = json.dumps(req).encode()
resp = None
subject = ''
version = self._nc.connected_server_version
consumer_name_supported = version.major >= 2 and version.minor >= 9
if consumer_name_supported and config.name:
# NOTE: Only supported after nats-server v2.9.0
if config.filter_subject and config.filter_subject != ">":
subject = f"{self._prefix}.CONSUMER.CREATE.{stream}.{config.name}.{config.filter_subject}"
else:
subject = f"{self._prefix}.CONSUMER.CREATE.{stream}.{config.name}"
elif durable_name:
# NOTE: Legacy approach to create consumers. After nats-server v2.9
# name option can be used instead.
subject = f"{self._prefix}.CONSUMER.DURABLE.CREATE.{stream}.{durable_name}"
else:
subject = f"{self._prefix}.CONSUMER.CREATE.{stream}"
resp = await self._api_request(subject, req_data, timeout=timeout)
return api.ConsumerInfo.from_response(resp)
async def delete_consumer(self, stream: str, consumer: str) -> bool:
resp = await self._api_request(
f"{self._prefix}.CONSUMER.DELETE.{stream}.{consumer}",
b'',
timeout=self._timeout
)
return resp['success']
[docs]
async def consumers_info(
self,
stream: str,
offset: Optional[int] = None
) -> List[api.ConsumerInfo]:
"""
consumers_info retrieves a list of consumers. Consumers list limit is 256 for more
consider to use offset
:param stream: stream to get consumers
:param offset: consumers list offset
"""
resp = await self._api_request(
f"{self._prefix}.CONSUMER.LIST.{stream}",
b'' if offset is None else json.dumps({
"offset": offset
}).encode(),
timeout=self._timeout,
)
consumers = []
for consumer in resp['consumers']:
consumer_info = api.ConsumerInfo.from_response(consumer)
consumers.append(consumer_info)
return consumers
[docs]
async def get_msg(
self,
stream_name: str,
seq: Optional[int] = None,
subject: Optional[str] = None,
direct: Optional[bool] = False,
next: Optional[bool] = False,
) -> api.RawStreamMsg:
"""
get_msg retrieves a message from a stream.
"""
req_subject = None
req: Dict[str, Any] = {}
if seq:
req['seq'] = seq
if subject:
req['seq'] = None
req.pop('seq', None)
req['last_by_subj'] = subject
if next:
req['seq'] = seq
req['last_by_subj'] = None
req.pop('last_by_subj', None)
req['next_by_subj'] = subject
data = json.dumps(req)
if direct:
# $JS.API.DIRECT.GET.KV_{stream_name}.$KV.TEST.{key}
if subject and not seq:
# last_by_subject type request requires no payload.
data = ''
req_subject = f"{self._prefix}.DIRECT.GET.{stream_name}.{subject}"
else:
req_subject = f"{self._prefix}.DIRECT.GET.{stream_name}"
resp = await self._nc.request(
req_subject, data.encode(), timeout=self._timeout
)
raw_msg = JetStreamManager._lift_msg_to_raw_msg(resp)
return raw_msg
# Non Direct form
req_subject = f"{self._prefix}.STREAM.MSG.GET.{stream_name}"
resp_data = await self._api_request(
req_subject, data.encode(), timeout=self._timeout
)
raw_msg = api.RawStreamMsg.from_response(resp_data['message'])
if raw_msg.hdrs:
hdrs = base64.b64decode(raw_msg.hdrs)
raw_headers = hdrs[NATS_HDR_LINE_SIZE + _CRLF_LEN_:]
parsed_headers = self._hdr_parser.parsebytes(raw_headers)
headers = None
if len(parsed_headers.items()) > 0:
headers = {}
for k, v in parsed_headers.items():
headers[k] = v
raw_msg.headers = headers
msg_data: Optional[bytes] = None
if raw_msg.data:
msg_data = base64.b64decode(raw_msg.data)
raw_msg.data = msg_data
return raw_msg
@classmethod
def _lift_msg_to_raw_msg(self, msg) -> api.RawStreamMsg:
if not msg.data:
msg.data = None
status = msg.headers.get('Status')
if status:
if status == '404':
raise NotFoundError
else:
raise APIError.from_msg(msg)
raw_msg = api.RawStreamMsg()
subject = msg.headers['Nats-Subject']
raw_msg.subject = subject
seq = msg.headers.get('Nats-Sequence')
if seq:
raw_msg.seq = int(seq)
raw_msg.data = msg.data
raw_msg.headers = msg.headers
return raw_msg
[docs]
async def delete_msg(self, stream_name: str, seq: int) -> bool:
"""
delete_msg retrieves a message from a stream based on the sequence ID.
"""
req_subject = f"{self._prefix}.STREAM.MSG.DELETE.{stream_name}"
req = {'seq': seq}
data = json.dumps(req)
resp = await self._api_request(req_subject, data.encode())
return resp['success']
[docs]
async def get_last_msg(
self,
stream_name: str,
subject: str,
direct: Optional[bool] = False,
) -> api.RawStreamMsg:
"""
get_last_msg retrieves the last message from a stream.
"""
return await self.get_msg(stream_name, subject=subject, direct=direct)
async def _api_request(
self,
req_subject: str,
req: bytes = b'',
timeout: float = 5,
) -> Dict[str, Any]:
try:
msg = await self._nc.request(req_subject, req, timeout=timeout)
resp = json.loads(msg.data)
except NoRespondersError:
raise ServiceUnavailableError
# Check for API errors.
if 'error' in resp:
raise APIError.from_error(resp['error'])
return resp