Source code for nats.errors

# Copyright 2021 The NATS Authors
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from __future__ import annotations

import asyncio
from typing import TYPE_CHECKING

if TYPE_CHECKING:
    from nats.aio.subscription import Subscription


[docs] class Error(Exception): pass
[docs] class TimeoutError(Error, asyncio.TimeoutError): def __str__(self) -> str: return "nats: timeout"
[docs] class NoRespondersError(Error): def __str__(self) -> str: return "nats: no responders available for request"
[docs] class StaleConnectionError(Error): def __str__(self) -> str: return "nats: stale connection"
[docs] class OutboundBufferLimitError(Error): def __str__(self) -> str: return "nats: outbound buffer limit exceeded"
[docs] class UnexpectedEOF(StaleConnectionError): def __str__(self) -> str: return "nats: unexpected EOF"
[docs] class FlushTimeoutError(TimeoutError): def __str__(self) -> str: return "nats: flush timeout"
[docs] class ConnectionClosedError(Error): def __str__(self) -> str: return "nats: connection closed"
[docs] class SecureConnRequiredError(Error): def __str__(self) -> str: return "nats: secure connection required"
[docs] class SecureConnWantedError(Error): def __str__(self) -> str: return "nats: secure connection not available"
[docs] class SecureConnFailedError(Error): def __str__(self) -> str: return "nats: secure connection failed"
[docs] class BadSubscriptionError(Error): def __str__(self) -> str: return "nats: invalid subscription"
[docs] class BadSubjectError(Error): def __str__(self) -> str: return "nats: invalid subject"
[docs] class SlowConsumerError(Error): def __init__( self, subject: str, reply: str, sid: int, sub: Subscription ) -> None: self.subject = subject self.reply = reply self.sid = sid self.sub = sub def __str__(self) -> str: return "nats: slow consumer, messages dropped subject: " \ f"{self.subject}, sid: {self.sid}, sub: {self.sub}"
[docs] class BadTimeoutError(Error): def __str__(self) -> str: return "nats: timeout invalid"
[docs] class AuthorizationError(Error): def __str__(self) -> str: return "nats: authorization failed"
[docs] class NoServersError(Error): def __str__(self) -> str: return "nats: no servers available for connection"
[docs] class JsonParseError(Error): def __str__(self) -> str: return "nats: connect message, json parse err"
[docs] class MaxPayloadError(Error): def __str__(self) -> str: return "nats: maximum payload exceeded"
[docs] class DrainTimeoutError(TimeoutError): def __str__(self) -> str: return "nats: draining connection timed out"
[docs] class ConnectionDrainingError(Error): def __str__(self) -> str: return "nats: connection draining"
[docs] class ConnectionReconnectingError(Error): def __str__(self) -> str: return "nats: connection reconnecting"
[docs] class InvalidUserCredentialsError(Error): def __str__(self) -> str: return "nats: invalid user credentials"
[docs] class InvalidCallbackTypeError(Error): def __str__(self) -> str: return "nats: callbacks must be coroutine functions"
[docs] class ProtocolError(Error): def __str__(self) -> str: return "nats: protocol error"
[docs] class NotJSMessageError(Error): """ When it is attempted to use an API meant for JetStream on a message that does not belong to a stream. """ def __str__(self) -> str: return "nats: not a JetStream message"
[docs] class MsgAlreadyAckdError(Error): def __init__(self, msg=None) -> None: self._msg = msg def __str__(self) -> str: return f"nats: message was already acknowledged: {self._msg}"