Source code for vastdb.transaction

"""VAST Database transaction.

A transcation is used as a context manager, since every Database-related operation in VAST requires a transaction.

    with session.transaction() as tx:
        tx.bucket("bucket").create_schema("schema")
"""

import logging
from copy import deepcopy
from dataclasses import dataclass
from typing import TYPE_CHECKING, Iterable, Optional

from vastdb._adbc import AdbcConnection, AdbcDriverVastdbNotInstalledException
from vastdb._table_interface import ITable
from vastdb.table import TableInTransaction
from vastdb.table_metadata import TableMetadata

from . import bucket, errors, schema, session

if TYPE_CHECKING:
    from bucket import Bucket
    from table import Table


log = logging.getLogger(__name__)

VAST_CATALOG_BUCKET_NAME = "vast-big-catalog-bucket"
VAST_CATALOG_SCHEMA_NAME = 'vast_big_catalog_schema'
VAST_CATALOG_TABLE_NAME = 'vast_big_catalog_table'

AUDIT_LOG_BUCKET_NAME = "vast-audit-log-bucket"
AUDIT_LOG_SCHEMA_NAME = 'vast_audit_log_schema'
AUDIT_LOG_TABLE_NAME = 'vast_audit_log_table'


[docs] class TransactionNotActiveError(Exception): """Transaction is not active error.""" pass
[docs] class NoAdbcConnectionError(Exception): """No Adbc Connection Error.""" pass
[docs] @dataclass class Transaction: """A holder of a single VAST transaction.""" _rpc: "session.Session" txid: Optional[int] = None _adbc_driver_path: Optional[str] = None _adbc_conn: Optional[AdbcConnection] = None _end_user: Optional[str] = None def __enter__(self): """Create a transaction and store its ID.""" response = self._rpc.api.begin_transaction() self.txid = int(response.headers['tabular-txid']) try: self._adbc_conn = AdbcConnection( self._rpc.endpoint, self._rpc.access, self._rpc.secret, self.txid, self._end_user, adbc_driver_path=self._adbc_driver_path ) except AdbcDriverVastdbNotInstalledException: self._adbc_conn = None log.debug("opened txid=%016x", self.txid) return self def __exit__(self, exc_type, exc_value, exc_traceback): """On success, the transaction is committed. Otherwise, it is rolled back.""" txid = self.txid self.txid = None if self._adbc_conn is not None: self.adbc_conn.close() self._adbc_conn = None if (exc_type, exc_value, exc_traceback) == (None, None, None): log.debug("committing txid=%016x", txid) self._rpc.api.commit_transaction(txid) else: log.exception("rolling back txid=%016x due to:", txid) self._rpc.api.rollback_transaction(txid) def __repr__(self): """Don't show the session details.""" if self.txid is None: return 'InvalidTransaction' return f'Transaction(id=0x{self.txid:016x})'
[docs] def bucket(self, name: str) -> "Bucket": """Return a VAST Bucket, if exists.""" try: self._rpc.api.head_bucket(name) except errors.NotFound as e: raise errors.MissingBucket(name) from e return bucket.Bucket(name, self)
[docs] def catalog_snapshots(self) -> Iterable["Bucket"]: """Return VAST Catalog bucket snapshots.""" return bucket.Bucket(VAST_CATALOG_BUCKET_NAME, self).snapshots()
[docs] def catalog(self, snapshot: Optional["Bucket"] = None, fail_if_missing=True) -> Optional["Table"]: """Return VAST Catalog table.""" b = snapshot or bucket.Bucket(VAST_CATALOG_BUCKET_NAME, self) s = schema.Schema(VAST_CATALOG_SCHEMA_NAME, b) return s.table(name=VAST_CATALOG_TABLE_NAME, fail_if_missing=fail_if_missing)
[docs] def audit_log(self, fail_if_missing=True) -> Optional["Table"]: """Return VAST Audit Log table.""" b = bucket.Bucket(AUDIT_LOG_BUCKET_NAME, self) s = schema.Schema(AUDIT_LOG_SCHEMA_NAME, b) return s.table(name=AUDIT_LOG_TABLE_NAME, fail_if_missing=fail_if_missing)
@property def is_active(self) -> bool: """Return whether transaction is active.""" return self.txid is not None @property def active_txid(self) -> int: """Return active transaction ID.""" if self.txid is None: raise TransactionNotActiveError() return self.txid
[docs] def table_from_metadata(self, metadata: TableMetadata) -> ITable: """Create Table from TableMetadata.""" return TableInTransaction(deepcopy(metadata), tx=self)
@property def adbc_conn(self) -> AdbcConnection: """ADBC connection in transaction.""" if self._adbc_conn is None: raise NoAdbcConnectionError("Adbc Driver may not have been supplied") return self._adbc_conn