Source code for nats.errors
# Copyright 2021 The NATS Authors
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from __future__ import annotations
import asyncio
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from nats.aio.subscription import Subscription
[docs]
class Error(Exception):
pass
[docs]
class TimeoutError(Error, asyncio.TimeoutError):
def __str__(self) -> str:
return "nats: timeout"
[docs]
class NoRespondersError(Error):
def __str__(self) -> str:
return "nats: no responders available for request"
[docs]
class StaleConnectionError(Error):
def __str__(self) -> str:
return "nats: stale connection"
[docs]
class OutboundBufferLimitError(Error):
def __str__(self) -> str:
return "nats: outbound buffer limit exceeded"
[docs]
class UnexpectedEOF(StaleConnectionError):
def __str__(self) -> str:
return "nats: unexpected EOF"
[docs]
class FlushTimeoutError(TimeoutError):
def __str__(self) -> str:
return "nats: flush timeout"
[docs]
class ConnectionClosedError(Error):
def __str__(self) -> str:
return "nats: connection closed"
[docs]
class SecureConnRequiredError(Error):
def __str__(self) -> str:
return "nats: secure connection required"
[docs]
class SecureConnWantedError(Error):
def __str__(self) -> str:
return "nats: secure connection not available"
[docs]
class SecureConnFailedError(Error):
def __str__(self) -> str:
return "nats: secure connection failed"
[docs]
class BadSubscriptionError(Error):
def __str__(self) -> str:
return "nats: invalid subscription"
[docs]
class BadSubjectError(Error):
def __str__(self) -> str:
return "nats: invalid subject"
[docs]
class SlowConsumerError(Error):
def __init__(
self, subject: str, reply: str, sid: int, sub: Subscription
) -> None:
self.subject = subject
self.reply = reply
self.sid = sid
self.sub = sub
def __str__(self) -> str:
return "nats: slow consumer, messages dropped subject: " \
f"{self.subject}, sid: {self.sid}, sub: {self.sub}"
[docs]
class BadTimeoutError(Error):
def __str__(self) -> str:
return "nats: timeout invalid"
[docs]
class AuthorizationError(Error):
def __str__(self) -> str:
return "nats: authorization failed"
[docs]
class NoServersError(Error):
def __str__(self) -> str:
return "nats: no servers available for connection"
[docs]
class JsonParseError(Error):
def __str__(self) -> str:
return "nats: connect message, json parse err"
[docs]
class MaxPayloadError(Error):
def __str__(self) -> str:
return "nats: maximum payload exceeded"
[docs]
class DrainTimeoutError(TimeoutError):
def __str__(self) -> str:
return "nats: draining connection timed out"
[docs]
class ConnectionDrainingError(Error):
def __str__(self) -> str:
return "nats: connection draining"
[docs]
class ConnectionReconnectingError(Error):
def __str__(self) -> str:
return "nats: connection reconnecting"
[docs]
class InvalidUserCredentialsError(Error):
def __str__(self) -> str:
return "nats: invalid user credentials"
[docs]
class InvalidCallbackTypeError(Error):
def __str__(self) -> str:
return "nats: callbacks must be coroutine functions"
[docs]
class ProtocolError(Error):
def __str__(self) -> str:
return "nats: protocol error"
[docs]
class NotJSMessageError(Error):
"""
When it is attempted to use an API meant for JetStream on a message
that does not belong to a stream.
"""
def __str__(self) -> str:
return "nats: not a JetStream message"
[docs]
class MsgAlreadyAckdError(Error):
def __init__(self, msg=None) -> None:
self._msg = msg
def __str__(self) -> str:
return f"nats: message was already acknowledged: {self._msg}"