Source code for vastdb.schema

"""VAST Database schema (a container of tables).

VAST S3 buckets can be used to create Database schemas and tables.
It is possible to list and access VAST snapshots generated over a bucket.
"""

import logging
from dataclasses import dataclass
from typing import TYPE_CHECKING, Iterable, List, Optional

import pyarrow as pa

from vastdb.table_metadata import TableMetadata, TableRef

from . import bucket, errors, schema, table
from ._ibis_support import validate_ibis_support_schema
from ._internal import SortingKey, VectorIndexSpec

if TYPE_CHECKING:
    from .table import Table


log = logging.getLogger(__name__)


[docs] @dataclass class Schema: """VAST Schema.""" name: str bucket: "bucket.Bucket" @property def tx(self): """VAST transaction used for this schema.""" return self.bucket.tx def _subschema_full_name(self, name: str) -> str: return f"{self.name}/{name}" if self.name else name
[docs] def create_schema(self, name: str, fail_if_exists=True) -> "Schema": """Create a new schema (a container of tables) under this schema.""" if current := self.schema(name, fail_if_missing=False): if fail_if_exists: raise errors.SchemaExists(self.bucket.name, name) else: return current full_name = self._subschema_full_name(name) self.tx._rpc.api.create_schema(self.bucket.name, full_name, txid=self.tx.txid) log.info("Created schema: %s", full_name) return self.schema(name) # type: ignore[return-value]
[docs] def schema(self, name: str, fail_if_missing=True) -> Optional["Schema"]: """Get a specific schema (a container of tables) under this schema.""" _bucket_name, schemas, _next_key, _is_truncated, _ = \ self.tx._rpc.api.list_schemas(bucket=self.bucket.name, schema=self.name, next_key=0, txid=self.tx.txid, name_prefix=name, exact_match=True, max_keys=1) names = [name for name, *_ in schemas] log.debug("Found schemas: %s", names) if not names: if fail_if_missing: raise errors.MissingSchema(self.bucket.name, self._subschema_full_name(name)) else: return None assert len(names) == 1, f"Expected to receive only a single schema, but got {len(schemas)}: ({schemas})" return schema.Schema(name=self._subschema_full_name(names[0]), bucket=self.bucket)
[docs] def schemas(self, batch_size=None) -> Iterable["Schema"]: """List child schemas.""" next_key = 0 if not batch_size: batch_size = 1000 result: List["Schema"] = [] while True: _, curr_schemas, next_key, is_truncated, _ = self.tx._rpc.api.list_schemas( bucket=self.bucket.name, schema=self.name, next_key=next_key, max_keys=batch_size, txid=self.tx.txid ) result.extend(schema.Schema(name=self._subschema_full_name(name), bucket=self.bucket) for name, *_ in curr_schemas) if not is_truncated: break return result
[docs] def create_table(self, table_name: str, columns: pa.Schema, fail_if_exists: bool = True, use_external_row_ids_allocation: bool = False, sorting_key: SortingKey = [], vector_index: Optional[VectorIndexSpec] = None) -> "Table": """Create a new table under this schema. A virtual `vastdb_rowid` column (of `int64` type) can be created to access and filter by internal VAST row IDs. See https://support.vastdata.com/s/article/UUID-48d0a8cf-5786-5ef3-3fa3-9c64e63a0967 for more details. Args: ---- table_name: Name of the table to create columns: PyArrow schema defining the table columns fail_if_exists: Whether to fail if the table already exists use_external_row_ids_allocation: Whether to use external row ID allocation sorting_key: List of column names to use as sorting key (for Elysium tables) vector_index: Optional vector index. Returns: ------- The created table """ if current := self.table(table_name, fail_if_missing=False): if fail_if_exists: raise errors.TableExists(self.bucket.name, self.name, table_name) else: return current if use_external_row_ids_allocation: self.tx._rpc.features.check_external_row_ids_allocation() validate_ibis_support_schema(columns) self.tx._rpc.api.create_table(self.bucket.name, self.name, table_name, columns, txid=self.tx.txid, use_external_row_ids_allocation=use_external_row_ids_allocation, sorting_key=sorting_key, vector_index=vector_index) log.info("Created table: %s", table_name) return self.table(table_name) # type: ignore[return-value]
[docs] def table(self, name: str, fail_if_missing=True) -> Optional["table.Table"]: """Get a specific table under this schema.""" t = self.tables(table_name=name) if not t: if fail_if_missing: raise errors.MissingTable(self.bucket.name, self.name, name) else: return None assert len(t) == 1, f"Expected to receive only a single table, but got: {len(t)}. tables: {t}" log.debug("Found table: %s", t[0]) return t[0]
def _iter_tables(self, table_name=None, page_size=1000): next_key = 0 name_prefix = table_name if table_name else "" exact_match = bool(table_name) while True: _, _, curr_tables, next_key, is_truncated, _ = self.tx._rpc.api.list_tables( bucket=self.bucket.name, schema=self.name, next_key=next_key, max_keys=page_size, txid=self.tx.active_txid, exact_match=exact_match, name_prefix=name_prefix, include_list_stats=exact_match ) if not curr_tables: # Is this a bug? should be on is_truncated break yield from curr_tables if not is_truncated: break
[docs] def tables(self, table_name: str = "", page_size=1000) -> List["Table"]: """List all tables under this schema if `table_name` is empty. Otherwise, list only the specific table (if exists). """ return [ _parse_table_info(table_info, self) for table_info in self._iter_tables(table_name=table_name, page_size=page_size) ]
[docs] def tablenames(self, page_size=1000) -> List[str]: """List all table names under this schema.""" return [table_info.name for table_info in self._iter_tables(page_size=page_size)]
[docs] def drop(self) -> None: """Delete this schema.""" self.tx._rpc.api.drop_schema(self.bucket.name, self.name, txid=self.tx.txid) log.info("Dropped schema: %s", self.name)
[docs] def rename(self, new_name) -> None: """Rename this schema.""" self.tx._rpc.api.alter_schema(self.bucket.name, self.name, txid=self.tx.txid, new_name=new_name) log.info("Renamed schema: %s to %s", self.name, new_name) self.name = new_name
def _parse_table_info(table_info, schema: "schema.Schema"): ref = TableRef(bucket=schema.bucket.name, schema=schema.name, table=table_info.name) table_metadata = TableMetadata(ref) table_metadata.load(schema.tx) return table.Table(handle=int(table_info.handle), metadata=table_metadata, tx=schema.tx)