import logging
import xml.etree.ElementTree
from dataclasses import dataclass
from enum import Enum
from typing import TYPE_CHECKING, Optional
import pyarrow as pa
import requests
if TYPE_CHECKING:
from vastdb.table_metadata import TableRef
[docs]
class HttpStatus(Enum):
SUCCESS = 200
BAD_REQUEST = 400
FOBIDDEN = 403
NOT_FOUND = 404
METHOD_NOT_ALLOWED = 405
REQUEST_TIMEOUT = 408
CONFLICT = 409
INTERNAL_SERVER_ERROR = 500
NOT_IMPLEMENTED = 501
SERVICE_UNAVAILABLE = 503
INSUFFICIENT_CAPACITY = 507
log = logging.getLogger(__name__)
[docs]
@dataclass
class HttpError(Exception):
code: str
message: str
method: str
url: str
status: int # HTTP status
headers: requests.structures.CaseInsensitiveDict # HTTP response headers
def __post_init__(self):
self.args = [vars(self)]
[docs]
class NotFound(HttpError):
pass
[docs]
class Forbidden(HttpError):
pass
[docs]
class BadRequest(HttpError):
pass
[docs]
class MethodNotAllowed(HttpError):
pass
[docs]
class RequestTimeout(HttpError):
pass
[docs]
class Conflict(HttpError):
pass
[docs]
class InternalServerError(HttpError):
pass
[docs]
class NotImplemented(HttpError):
pass
[docs]
class ServiceUnavailable(HttpError):
pass
[docs]
class Slowdown(ServiceUnavailable):
pass
[docs]
class UnexpectedError(HttpError):
pass
[docs]
class InsufficientCapacity(HttpError):
pass
[docs]
@dataclass
class ImportFilesError(Exception):
message: str
error_dict: dict
def __post_init__(self):
self.args = [vars(self)]
[docs]
class InvalidArgument(Exception):
pass
[docs]
class TooWideRow(TooLargeRequest):
pass
[docs]
class Missing(Exception):
pass
[docs]
class MissingTransaction(Missing):
pass
[docs]
class MissingRowIdColumn(Missing):
pass
[docs]
class NotSupported(Exception):
pass
[docs]
@dataclass
class MissingBucket(Missing):
bucket: str
def __post_init__(self):
self.args = [vars(self)]
[docs]
@dataclass
class MissingSnapshot(Missing):
bucket: str
snapshot: str
def __post_init__(self):
self.args = [vars(self)]
[docs]
@dataclass
class MissingSchema(Missing):
bucket: str
schema: str
def __post_init__(self):
self.args = [vars(self)]
[docs]
@dataclass
class MissingTable(Missing):
bucket: str
schema: str
table: str
def __post_init__(self):
self.args = [vars(self)]
[docs]
@dataclass
class MissingProjection(Missing):
bucket: str
schema: str
table: str
projection: str
def __post_init__(self):
self.args = [vars(self)]
[docs]
@dataclass
class MissingBlobExpansion(Missing):
"""Raised when a blob expansion is not found."""
table_ref: "TableRef"
source_column: str
def __post_init__(self):
self.args = [{"bucket": self.table_ref.bucket,
"schema": self.table_ref.schema,
"table": self.table_ref.table,
"source_column": self.source_column}]
[docs]
class Exists(Exception):
pass
[docs]
@dataclass
class SchemaExists(Exists):
bucket: str
schema: str
def __post_init__(self):
self.args = [vars(self)]
[docs]
@dataclass
class TableExists(Exists):
bucket: str
schema: str
table: str
def __post_init__(self):
self.args = [vars(self)]
[docs]
@dataclass
class NotSupportedCommand(NotSupported):
bucket: str
schema: str
table: str
def __post_init__(self):
self.args = [vars(self)]
[docs]
@dataclass
class NotSupportedVersion(NotSupported):
err_msg: str
version: str
def __post_init__(self):
self.args = [vars(self)]
[docs]
@dataclass
class NotSupportedSchema(NotSupported):
message: Optional[str] = None
schema: Optional[pa.Schema] = None
cause: Optional[Exception] = None
def __post_init__(self):
self.args = [vars(self)]
[docs]
@dataclass
class ConnectionError(Exception):
cause: Exception
may_retry: bool
def __post_init__(self):
self.args = [vars(self)]
[docs]
class ApiResponseError(Exception):
"""Indicates a logically invalid or inconsistent server response."""
pass
[docs]
def handle_unavailable(**kwargs):
if kwargs['code'] == 'SlowDown':
raise Slowdown(**kwargs)
raise ServiceUnavailable(**kwargs)
HTTP_ERROR_TYPES_MAP = {
HttpStatus.BAD_REQUEST: BadRequest,
HttpStatus.FOBIDDEN: Forbidden,
HttpStatus.NOT_FOUND: NotFound,
HttpStatus.METHOD_NOT_ALLOWED: MethodNotAllowed,
HttpStatus.REQUEST_TIMEOUT: RequestTimeout,
HttpStatus.CONFLICT: Conflict,
HttpStatus.INTERNAL_SERVER_ERROR: InternalServerError,
HttpStatus.NOT_IMPLEMENTED: NotImplemented,
HttpStatus.SERVICE_UNAVAILABLE: handle_unavailable,
HttpStatus.INSUFFICIENT_CAPACITY: InsufficientCapacity,
}
SPECIFIC_ERROR_TYPES_MAP = {
'TabularUnsupportedColumnType': NotSupportedSchema,
}
[docs]
def from_response(res: requests.Response):
if res.status_code == HttpStatus.SUCCESS.value:
return None
log.debug("response: url='%s', code=%s, headers=%s, body='%s'", res.request.url, res.status_code, res.headers, res.text)
# try to parse S3 XML response for the error details:
code_str = None
message_str = None
if res.text:
try:
root = xml.etree.ElementTree.fromstring(res.text)
code = root.find('Code')
code_str = code.text if code is not None else None
message = root.find('Message')
message_str = message.text if message is not None else None
except xml.etree.ElementTree.ParseError:
log.debug("invalid XML: %r", res.text)
kwargs = dict(
code=code_str,
message=message_str,
method=res.request.method,
url=res.request.url,
status=res.status_code,
headers=res.headers,
)
log.warning("RPC failed: %s", kwargs)
status = HttpStatus(res.status_code)
http_error_type = HTTP_ERROR_TYPES_MAP.get(status, UnexpectedError)
http_error = http_error_type(**kwargs) # type: ignore
# Wrap specific error types if applicable
if code_str in SPECIFIC_ERROR_TYPES_MAP:
error_type = SPECIFIC_ERROR_TYPES_MAP[code_str]
return error_type(message=message_str, cause=http_error)
return http_error