Source code for vastdb.errors

import logging
import xml.etree.ElementTree
from dataclasses import dataclass
from enum import Enum
from typing import TYPE_CHECKING, Optional

import pyarrow as pa
import requests

if TYPE_CHECKING:
    from vastdb.table_metadata import TableRef


[docs] class HttpStatus(Enum): SUCCESS = 200 BAD_REQUEST = 400 FOBIDDEN = 403 NOT_FOUND = 404 METHOD_NOT_ALLOWED = 405 REQUEST_TIMEOUT = 408 CONFLICT = 409 INTERNAL_SERVER_ERROR = 500 NOT_IMPLEMENTED = 501 SERVICE_UNAVAILABLE = 503 INSUFFICIENT_CAPACITY = 507
log = logging.getLogger(__name__)
[docs] @dataclass class HttpError(Exception): code: str message: str method: str url: str status: int # HTTP status headers: requests.structures.CaseInsensitiveDict # HTTP response headers def __post_init__(self): self.args = [vars(self)]
[docs] class NotFound(HttpError): pass
[docs] class Forbidden(HttpError): pass
[docs] class BadRequest(HttpError): pass
[docs] class MethodNotAllowed(HttpError): pass
[docs] class RequestTimeout(HttpError): pass
[docs] class Conflict(HttpError): pass
[docs] class InternalServerError(HttpError): pass
[docs] class NotImplemented(HttpError): pass
[docs] class ServiceUnavailable(HttpError): pass
[docs] class Slowdown(ServiceUnavailable): pass
[docs] class UnexpectedError(HttpError): pass
[docs] class InsufficientCapacity(HttpError): pass
[docs] @dataclass class ImportFilesError(Exception): message: str error_dict: dict def __post_init__(self): self.args = [vars(self)]
[docs] class InvalidArgument(Exception): pass
[docs] class TooLargeRequest(InvalidArgument): pass
[docs] class TooWideRow(TooLargeRequest): pass
[docs] class Missing(Exception): pass
[docs] class MissingTransaction(Missing): pass
[docs] class MissingRowIdColumn(Missing): pass
[docs] class NotSupported(Exception): pass
[docs] @dataclass class MissingBucket(Missing): bucket: str def __post_init__(self): self.args = [vars(self)]
[docs] @dataclass class MissingSnapshot(Missing): bucket: str snapshot: str def __post_init__(self): self.args = [vars(self)]
[docs] @dataclass class MissingSchema(Missing): bucket: str schema: str def __post_init__(self): self.args = [vars(self)]
[docs] @dataclass class MissingTable(Missing): bucket: str schema: str table: str def __post_init__(self): self.args = [vars(self)]
[docs] @dataclass class MissingProjection(Missing): bucket: str schema: str table: str projection: str def __post_init__(self): self.args = [vars(self)]
[docs] @dataclass class MissingBlobExpansion(Missing): """Raised when a blob expansion is not found.""" table_ref: "TableRef" source_column: str def __post_init__(self): self.args = [{"bucket": self.table_ref.bucket, "schema": self.table_ref.schema, "table": self.table_ref.table, "source_column": self.source_column}]
[docs] class Exists(Exception): pass
[docs] @dataclass class SchemaExists(Exists): bucket: str schema: str def __post_init__(self): self.args = [vars(self)]
[docs] @dataclass class TableExists(Exists): bucket: str schema: str table: str def __post_init__(self): self.args = [vars(self)]
[docs] @dataclass class NotSupportedCommand(NotSupported): bucket: str schema: str table: str def __post_init__(self): self.args = [vars(self)]
[docs] @dataclass class NotSupportedVersion(NotSupported): err_msg: str version: str def __post_init__(self): self.args = [vars(self)]
[docs] @dataclass class NotSupportedSchema(NotSupported): message: Optional[str] = None schema: Optional[pa.Schema] = None cause: Optional[Exception] = None def __post_init__(self): self.args = [vars(self)]
[docs] @dataclass class ConnectionError(Exception): cause: Exception may_retry: bool def __post_init__(self): self.args = [vars(self)]
[docs] class ApiResponseError(Exception): """Indicates a logically invalid or inconsistent server response.""" pass
[docs] def handle_unavailable(**kwargs): if kwargs['code'] == 'SlowDown': raise Slowdown(**kwargs) raise ServiceUnavailable(**kwargs)
HTTP_ERROR_TYPES_MAP = { HttpStatus.BAD_REQUEST: BadRequest, HttpStatus.FOBIDDEN: Forbidden, HttpStatus.NOT_FOUND: NotFound, HttpStatus.METHOD_NOT_ALLOWED: MethodNotAllowed, HttpStatus.REQUEST_TIMEOUT: RequestTimeout, HttpStatus.CONFLICT: Conflict, HttpStatus.INTERNAL_SERVER_ERROR: InternalServerError, HttpStatus.NOT_IMPLEMENTED: NotImplemented, HttpStatus.SERVICE_UNAVAILABLE: handle_unavailable, HttpStatus.INSUFFICIENT_CAPACITY: InsufficientCapacity, } SPECIFIC_ERROR_TYPES_MAP = { 'TabularUnsupportedColumnType': NotSupportedSchema, }
[docs] def from_response(res: requests.Response): if res.status_code == HttpStatus.SUCCESS.value: return None log.debug("response: url='%s', code=%s, headers=%s, body='%s'", res.request.url, res.status_code, res.headers, res.text) # try to parse S3 XML response for the error details: code_str = None message_str = None if res.text: try: root = xml.etree.ElementTree.fromstring(res.text) code = root.find('Code') code_str = code.text if code is not None else None message = root.find('Message') message_str = message.text if message is not None else None except xml.etree.ElementTree.ParseError: log.debug("invalid XML: %r", res.text) kwargs = dict( code=code_str, message=message_str, method=res.request.method, url=res.request.url, status=res.status_code, headers=res.headers, ) log.warning("RPC failed: %s", kwargs) status = HttpStatus(res.status_code) http_error_type = HTTP_ERROR_TYPES_MAP.get(status, UnexpectedError) http_error = http_error_type(**kwargs) # type: ignore # Wrap specific error types if applicable if code_str in SPECIFIC_ERROR_TYPES_MAP: error_type = SPECIFIC_ERROR_TYPES_MAP[code_str] return error_type(message=message_str, cause=http_error) return http_error