# Copyright 2016-2024 The NATS Authors# Licensed under the Apache License, Version 2.0 (the "License");# you may not use this file except in compliance with the License.# You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing, software# distributed under the License is distributed on an "AS IS" BASIS,# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# See the License for the specific language governing permissions and# limitations under the License.#from__future__importannotationsfromdataclassesimportdataclassfromtypingimportTYPE_CHECKING,Any,Dict,NoReturn,Optionalimportnats.errorsfromnats.jsimportapiifTYPE_CHECKING:fromnats.aio.msgimportMsg
[docs]classError(nats.errors.Error):""" An Error raised by the NATS client when using JetStream. """def__init__(self,description:Optional[str]=None)->None:self.description=descriptiondef__str__(self)->str:desc=""ifself.description:desc=self.descriptionreturnf"nats: JetStream.{self.__class__.__name__}{desc}"
[docs]@dataclass(repr=False,init=False)classAPIError(Error):""" An Error that is the result of interacting with NATS JetStream. """code:Optional[int]err_code:Optional[int]description:Optional[str]stream:Optional[str]seq:Optional[int]def__init__(self,code:Optional[int]=None,description:Optional[str]=None,err_code:Optional[int]=None,stream:Optional[str]=None,seq:Optional[int]=None,)->None:self.code=codeself.err_code=err_codeself.description=descriptionself.stream=streamself.seq=seq@classmethoddeffrom_msg(cls,msg:Msg)->NoReturn:ifmsg.headerisNone:raiseAPIErrorcode=msg.header[api.Header.STATUS]ifcode==api.StatusCode.SERVICE_UNAVAILABLE:raiseServiceUnavailableErrorelse:desc=msg.header[api.Header.DESCRIPTION]raiseAPIError(code=int(code),description=desc)@classmethoddeffrom_error(cls,err:Dict[str,Any]):code=err["code"]ifcode==503:raiseServiceUnavailableError(**err)elifcode==500:raiseServerError(**err)elifcode==404:raiseNotFoundError(**err)elifcode==400:raiseBadRequestError(**err)else:raiseAPIError(**err)def__str__(self)->str:return(f"nats: {type(self).__name__}: code={self.code} err_code={self.err_code} "f"description='{self.description}'")
[docs]classServiceUnavailableError(APIError):""" A 503 error """pass
[docs]classServerError(APIError):""" A 500 error """pass
[docs]classNotFoundError(APIError):""" A 404 error """pass
[docs]classBadRequestError(APIError):""" A 400 error. """pass
[docs]classNoStreamResponseError(Error):""" Raised if the client gets a 503 when publishing a message. """def__str__(self)->str:return"nats: no response from stream"
[docs]classTooManyStalledMsgsError(Error):""" Raised when too many outstanding async published messages are waiting for ack. """def__str__(self)->str:return"nats: stalled with too many outstanding async published messages"
[docs]classFetchTimeoutError(nats.errors.TimeoutError):""" Raised if the consumer timed out waiting for messages. """def__str__(self)->str:return"nats: fetch timeout"
[docs]classConsumerSequenceMismatchError(Error):""" Async error raised by the client with idle_heartbeat mode enabled when one of the message sequences is not the expected one. """def__init__(self,stream_resume_sequence=None,consumer_sequence=None,last_consumer_sequence=None,)->None:self.stream_resume_sequence=stream_resume_sequenceself.consumer_sequence=consumer_sequenceself.last_consumer_sequence=last_consumer_sequencedef__str__(self)->str:gap=self.last_consumer_sequence-self.consumer_sequencereturn(f"nats: sequence mismatch for consumer at sequence {self.consumer_sequence} "f"({gap} sequences behind), should restart consumer from stream sequence {self.stream_resume_sequence}")
[docs]classBucketNotFoundError(NotFoundError):""" When attempted to bind to a JetStream KeyValue that does not exist. """pass
[docs]classKeyValueError(APIError):""" Raised when there is an issue interacting with the KeyValue store. """pass
[docs]classKeyDeletedError(KeyValueError,NotFoundError):""" Raised when trying to get a key that was deleted from a JetStream KeyValue store. """def__init__(self,entry=None,op=None)->None:self.entry=entryself.op=opdef__str__(self)->str:return"nats: key was deleted"
[docs]classKeyNotFoundError(KeyValueError,NotFoundError):""" Raised when trying to get a key that does not exists from a JetStream KeyValue store. """def__init__(self,entry=None,op=None,message=None)->None:self.entry=entryself.op=opself.message=messagedef__str__(self)->str:s="nats: key not found"ifself.message:s+=f": {self.message}"returns
[docs]classKeyWrongLastSequenceError(KeyValueError,BadRequestError):""" Raised when trying to update a key with the wrong last sequence. """def__init__(self,description=None)->None:self.description=descriptiondef__str__(self)->str:returnf"nats: {self.description}"
[docs]classNoKeysError(KeyValueError):def__str__(self)->str:return"nats: no keys found"
[docs]classKeyHistoryTooLargeError(KeyValueError):def__str__(self)->str:return"nats: history limited to a max of 64"
[docs]classInvalidBucketNameError(Error):""" Raised when trying to create a KV or OBJ bucket with invalid name. """pass
[docs]classInvalidObjectNameError(Error):""" Raised when trying to put an object in Object Store with invalid key. """pass
[docs]classBadObjectMetaError(Error):""" Raised when trying to read corrupted metadata from Object Store. """pass
[docs]classLinkIsABucketError(Error):""" Raised when trying to get object from Object Store that is a bucket. """pass
[docs]classDigestMismatchError(Error):""" Raised when getting an object from Object Store that has a different digest than expected. """pass
[docs]classObjectNotFoundError(NotFoundError):""" When attempted to lookup an Object that does not exist. """pass
[docs]classObjectDeletedError(NotFoundError):""" When attempted to do an operation to an Object that does not exist. """pass
[docs]classObjectAlreadyExists(Error):""" When attempted to do an operation to an Object that already exist. """pass