Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add @override #1312

Draft
wants to merge 8 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pyiceberg/catalog/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
Type,
Union,
cast,
override
)

from pyiceberg.exceptions import (
Expand Down Expand Up @@ -778,6 +779,7 @@ def __repr__(self) -> str:


class MetastoreCatalog(Catalog, ABC):
@override
def __init__(self, name: str, **properties: str):
super().__init__(name, **properties)

Expand All @@ -788,6 +790,7 @@ def __init__(self, name: str, **properties: str):
help_message=f"The property {DEPRECATED_BOTOCORE_SESSION} is deprecated and will be removed.",
)

@override
def create_table_transaction(
self,
identifier: Union[str, Identifier],
Expand All @@ -801,13 +804,15 @@ def create_table_transaction(
self._create_staged_table(identifier, schema, location, partition_spec, sort_order, properties)
)

@override
def table_exists(self, identifier: Union[str, Identifier]) -> bool:
try:
self.load_table(identifier)
return True
except NoSuchTableError:
return False

@override
def purge_table(self, identifier: Union[str, Identifier]) -> None:
identifier_tuple = self._identifier_to_tuple_without_catalog(identifier)
table = self.load_table(identifier_tuple)
Expand Down
15 changes: 15 additions & 0 deletions pyiceberg/catalog/dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
Set,
Tuple,
Union,
override
)

import boto3
Expand Down Expand Up @@ -140,6 +141,7 @@ def _dynamodb_table_exists(self) -> bool:
else:
return True

@override
def create_table(
self,
identifier: Union[str, Identifier],
Expand Down Expand Up @@ -194,6 +196,7 @@ def create_table(

return self.load_table(identifier=identifier)

@override
def register_table(self, identifier: Union[str, Identifier], metadata_location: str) -> Table:
"""Register a new table using existing metadata.

Expand All @@ -209,6 +212,7 @@ def register_table(self, identifier: Union[str, Identifier], metadata_location:
"""
raise NotImplementedError

@override
def commit_table(
self, table: Table, requirements: Tuple[TableRequirement, ...], updates: Tuple[TableUpdate, ...]
) -> CommitTableResponse:
Expand All @@ -228,6 +232,7 @@ def commit_table(
"""
raise NotImplementedError

@override
def load_table(self, identifier: Union[str, Identifier]) -> Table:
"""
Load the table's metadata and returns the table instance.
Expand All @@ -249,6 +254,7 @@ def load_table(self, identifier: Union[str, Identifier]) -> Table:
dynamo_table_item = self._get_iceberg_table_item(database_name=database_name, table_name=table_name)
return self._convert_dynamo_table_item_to_iceberg_table(dynamo_table_item=dynamo_table_item)

@override
def drop_table(self, identifier: Union[str, Identifier]) -> None:
"""Drop a table.

Expand All @@ -270,6 +276,7 @@ def drop_table(self, identifier: Union[str, Identifier]) -> None:
except ConditionalCheckFailedException as e:
raise NoSuchTableError(f"Table does not exist: {database_name}.{table_name}") from e

@override
def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: Union[str, Identifier]) -> Table:
"""Rename a fully classified table name.

Expand Down Expand Up @@ -337,6 +344,7 @@ def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: U

return self.load_table(to_identifier)

@override
def create_namespace(self, namespace: Union[str, Identifier], properties: Properties = EMPTY_DICT) -> None:
"""Create a namespace in the catalog.

Expand All @@ -358,6 +366,7 @@ def create_namespace(self, namespace: Union[str, Identifier], properties: Proper
except ConditionalCheckFailedException as e:
raise NamespaceAlreadyExistsError(f"Database {database_name} already exists") from e

@override
def drop_namespace(self, namespace: Union[str, Identifier]) -> None:
"""Drop a namespace.

Expand Down Expand Up @@ -385,6 +394,7 @@ def drop_namespace(self, namespace: Union[str, Identifier]) -> None:
except ConditionalCheckFailedException as e:
raise NoSuchNamespaceError(f"Database does not exist: {database_name}") from e

@override
def list_tables(self, namespace: Union[str, Identifier]) -> List[Identifier]:
"""List Iceberg tables under the given namespace in the catalog.

Expand Down Expand Up @@ -429,6 +439,7 @@ def list_tables(self, namespace: Union[str, Identifier]) -> List[Identifier]:

return table_identifiers

@override
def list_namespaces(self, namespace: Union[str, Identifier] = ()) -> List[Identifier]:
"""List top-level namespaces from the catalog.

Expand Down Expand Up @@ -471,6 +482,7 @@ def list_namespaces(self, namespace: Union[str, Identifier] = ()) -> List[Identi

return database_identifiers

@override
def load_namespace_properties(self, namespace: Union[str, Identifier]) -> Properties:
"""
Get properties for a namespace.
Expand All @@ -489,6 +501,7 @@ def load_namespace_properties(self, namespace: Union[str, Identifier]) -> Proper
namespace_dict = _convert_dynamo_item_to_regular_dict(namespace_item)
return _get_namespace_properties(namespace_dict=namespace_dict)

@override
def update_namespace_properties(
self, namespace: Union[str, Identifier], removals: Optional[Set[str]] = None, updates: Properties = EMPTY_DICT
) -> PropertiesUpdateSummary:
Expand Down Expand Up @@ -526,9 +539,11 @@ def update_namespace_properties(

return properties_update_summary

@override
def list_views(self, namespace: Union[str, Identifier]) -> List[Identifier]:
raise NotImplementedError

@override
def drop_view(self, identifier: Union[str, Identifier]) -> None:
raise NotImplementedError

Expand Down
21 changes: 21 additions & 0 deletions pyiceberg/catalog/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
Tuple,
Union,
cast,
override,
)

import boto3
Expand Down Expand Up @@ -158,21 +159,27 @@ def _construct_parameters(


class _IcebergSchemaToGlueType(SchemaVisitor[str]):
@override
def schema(self, schema: Schema, struct_result: str) -> str:
return struct_result

@override
def struct(self, struct: StructType, field_results: List[str]) -> str:
return f"struct<{','.join(field_results)}>"

@override
def field(self, field: NestedField, field_result: str) -> str:
return f"{field.name}:{field_result}"

@override
def list(self, list_type: ListType, element_result: str) -> str:
return f"array<{element_result}>"

@override
def map(self, map_type: MapType, key_result: str, value_result: str) -> str:
return f"map<{key_result},{value_result}>"

@override
def primitive(self, primitive: PrimitiveType) -> str:
if isinstance(primitive, DecimalType):
return f"decimal({primitive.precision},{primitive.scale})"
Expand Down Expand Up @@ -376,6 +383,7 @@ def _get_glue_table(self, database_name: str, table_name: str) -> TableTypeDef:
except self.glue.exceptions.EntityNotFoundException as e:
raise NoSuchTableError(f"Table does not exist: {database_name}.{table_name}") from e

@override
def create_table(
self,
identifier: Union[str, Identifier],
Expand Down Expand Up @@ -420,6 +428,7 @@ def create_table(

return self.load_table(identifier=identifier)

@override
def register_table(self, identifier: Union[str, Identifier], metadata_location: str) -> Table:
"""Register a new table using existing metadata.

Expand All @@ -442,6 +451,7 @@ def register_table(self, identifier: Union[str, Identifier], metadata_location:
self._create_glue_table(database_name=database_name, table_name=table_name, table_input=table_input)
return self.load_table(identifier=identifier)

@override
def commit_table(
self, table: Table, requirements: Tuple[TableRequirement, ...], updates: Tuple[TableUpdate, ...]
) -> CommitTableResponse:
Expand Down Expand Up @@ -521,6 +531,7 @@ def commit_table(
metadata=updated_staged_table.metadata, metadata_location=updated_staged_table.metadata_location
)

@override
def load_table(self, identifier: Union[str, Identifier]) -> Table:
"""Load the table's metadata and returns the table instance.

Expand All @@ -541,6 +552,7 @@ def load_table(self, identifier: Union[str, Identifier]) -> Table:

return self._convert_glue_to_iceberg(self._get_glue_table(database_name=database_name, table_name=table_name))

@override
def drop_table(self, identifier: Union[str, Identifier]) -> None:
"""Drop a table.

Expand All @@ -557,6 +569,7 @@ def drop_table(self, identifier: Union[str, Identifier]) -> None:
except self.glue.exceptions.EntityNotFoundException as e:
raise NoSuchTableError(f"Table does not exist: {database_name}.{table_name}") from e

@override
def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: Union[str, Identifier]) -> Table:
"""Rename a fully classified table name.

Expand Down Expand Up @@ -618,6 +631,7 @@ def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: U

return self.load_table(to_identifier)

@override
def create_namespace(self, namespace: Union[str, Identifier], properties: Properties = EMPTY_DICT) -> None:
"""Create a namespace in the catalog.

Expand All @@ -635,6 +649,7 @@ def create_namespace(self, namespace: Union[str, Identifier], properties: Proper
except self.glue.exceptions.AlreadyExistsException as e:
raise NamespaceAlreadyExistsError(f"Database {database_name} already exists") from e

@override
def drop_namespace(self, namespace: Union[str, Identifier]) -> None:
"""Drop a namespace.

Expand All @@ -658,6 +673,7 @@ def drop_namespace(self, namespace: Union[str, Identifier]) -> None:

self.glue.delete_database(Name=database_name)

@override
def list_tables(self, namespace: Union[str, Identifier]) -> List[Identifier]:
"""List Iceberg tables under the given namespace in the catalog.

Expand Down Expand Up @@ -689,6 +705,7 @@ def list_tables(self, namespace: Union[str, Identifier]) -> List[Identifier]:
raise NoSuchNamespaceError(f"Database does not exist: {database_name}") from e
return [(database_name, table["Name"]) for table in table_list if self.__is_iceberg_table(table)]

@override
def list_namespaces(self, namespace: Union[str, Identifier] = ()) -> List[Identifier]:
"""List namespaces from the given namespace. If not given, list top-level namespaces from the catalog.

Expand All @@ -711,6 +728,7 @@ def list_namespaces(self, namespace: Union[str, Identifier] = ()) -> List[Identi

return [self.identifier_to_tuple(database["Name"]) for database in database_list]

@override
def load_namespace_properties(self, namespace: Union[str, Identifier]) -> Properties:
"""Get properties for a namespace.

Expand Down Expand Up @@ -741,6 +759,7 @@ def load_namespace_properties(self, namespace: Union[str, Identifier]) -> Proper

return properties

@override
def update_namespace_properties(
self, namespace: Union[str, Identifier], removals: Optional[Set[str]] = None, updates: Properties = EMPTY_DICT
) -> PropertiesUpdateSummary:
Expand All @@ -765,9 +784,11 @@ def update_namespace_properties(

return properties_update_summary

@override
def list_views(self, namespace: Union[str, Identifier]) -> List[Identifier]:
raise NotImplementedError

@override
def drop_view(self, identifier: Union[str, Identifier]) -> None:
raise NotImplementedError

Expand Down
Loading
Loading