From eb11e6d86b36e554089454ef9fd8a3c065c2959d Mon Sep 17 00:00:00 2001 From: Nikita Yurasov Date: Tue, 10 Sep 2024 12:04:44 +0200 Subject: [PATCH 1/2] feat: small improvements and fixes --- dbxio/core/client.py | 15 +++++++++++++++ dbxio/delta/table.py | 7 ++++--- dbxio/sql/types.py | 13 +++++++++++++ dbxio/utils/retries.py | 9 ++++++++- dbxio/volume/volume_commands.py | 17 ++++++++++------- tests/test_primary_types.py | 9 +++++++++ tests/test_table.py | 5 +++++ tests/test_volume_commands.py | 20 ++++++++++++++++---- 8 files changed, 80 insertions(+), 15 deletions(-) diff --git a/dbxio/core/client.py b/dbxio/core/client.py index 14acc3d..3059f2b 100644 --- a/dbxio/core/client.py +++ b/dbxio/core/client.py @@ -10,6 +10,9 @@ from dbxio.sql.results import _FutureBaseResult from dbxio.sql.sql_driver import SQLDriver, get_sql_driver from dbxio.utils.databricks import ClusterType +from dbxio.utils.logging import get_logger + +logger = get_logger() @attrs.define(slots=True) @@ -41,6 +44,18 @@ class DbxIOClient: session_configuration: Optional[Dict[str, Any]] = None + def __attrs_post_init__(self): + """ + This method is used only for logging + """ + logger.info( + 'Client is created with the following settings: %s; cluster settings: http-path: %s, server-hostname: %s', + self.settings, + self._cluster_credentials.http_path, + self._cluster_credentials.server_hostname, + ) + logger.info('Auth provider: %s', self.credential_provider.__class__.__name__) + def clear_cache(self): self.credential_provider.clear_cache() diff --git a/dbxio/delta/table.py b/dbxio/delta/table.py index ade4910..499e00c 100644 --- a/dbxio/delta/table.py +++ b/dbxio/delta/table.py @@ -102,9 +102,10 @@ def safe_table_identifier(self): """ Returns table identifier with special characters replaced with underscores and wrapped in backticks. """ - trunc_ti = self.table_identifier.translate( - str.maketrans('!"#$%&\'()*+,/:;<=>?@[\\]^`{|}~', '_____________________________') - ) + translations = {ord(char): ord('_') for char in '!"#$%&\'()*+,/:;<=>?@[\\]^{|}~'} + translations[ord('`')] = None + + trunc_ti = self.table_identifier.translate(translations) return '.'.join([f'`{ti_part}`' for ti_part in trunc_ti.split('.')]) @property diff --git a/dbxio/sql/types.py b/dbxio/sql/types.py index 0666fb4..b8d2483 100644 --- a/dbxio/sql/types.py +++ b/dbxio/sql/types.py @@ -662,6 +662,19 @@ def deserialize(self, obj): raise DbxIOTypeError(f'Cannot cast value `{obj}` to JSON') +class VariantType(BaseType): + def fit(self, obj) -> bool: + return True + + @nullable + def serialize(self, obj, unsafe: bool = False): + return str(obj) + + @nullable + def deserialize(self, obj): + return obj + + class GroupsPrimaryDataTypes: INTEGER = (IntType, BigIntType, DecimalType) FLOAT = (FloatType, DoubleType) diff --git a/dbxio/utils/retries.py b/dbxio/utils/retries.py index ad5deea..7e1b58b 100644 --- a/dbxio/utils/retries.py +++ b/dbxio/utils/retries.py @@ -1,5 +1,11 @@ +import logging + from databricks.sdk.errors.platform import PermissionDenied -from tenacity import RetryCallState, retry, retry_if_exception_type, stop_after_attempt, wait_exponential +from tenacity import RetryCallState, after_log, retry, retry_if_exception_type, stop_after_attempt, wait_exponential + +from dbxio.utils.logging import get_logger + +logger = get_logger() def _clear_client_cache(call_state: RetryCallState) -> None: @@ -28,4 +34,5 @@ def _clear_client_cache(call_state: RetryCallState) -> None: retry=retry_if_exception_type((PermissionDenied,)), reraise=True, before=_clear_client_cache, + after=after_log(logger, log_level=logging.INFO), ) diff --git a/dbxio/volume/volume_commands.py b/dbxio/volume/volume_commands.py index 99d3ff1..4e25f4f 100644 --- a/dbxio/volume/volume_commands.py +++ b/dbxio/volume/volume_commands.py @@ -3,7 +3,7 @@ from typing import TYPE_CHECKING, Optional, Union import attrs -from databricks.sdk.errors.platform import NotFound +from databricks.sdk.errors.platform import NotFound, ResourceDoesNotExist from databricks.sdk.service.catalog import VolumeType from dbxio.blobs.block_upload import upload_file @@ -98,7 +98,7 @@ def is_external(self): @dbxio_retry def create_volume(volume: Volume, client: 'DbxIOClient', skip_if_exists: bool = True) -> None: - if skip_if_exists and _exists_volume(volume.catalog, volume.schema, volume.name, client): + if skip_if_exists and exists_volume(volume.catalog, volume.schema, volume.name, client): logger.info(f'Volume {volume.safe_full_name} already exists, skipping creation.') return @@ -113,7 +113,7 @@ def create_volume(volume: Volume, client: 'DbxIOClient', skip_if_exists: bool = @dbxio_retry -def _exists_volume(catalog_name: str, schema_name: str, volume_name: str, client: 'DbxIOClient') -> bool: +def exists_volume(catalog_name: str, schema_name: str, volume_name: str, client: 'DbxIOClient') -> bool: for v in client.workspace_api.volumes.list(catalog_name=catalog_name, schema_name=schema_name): if v.name == volume_name: return True @@ -223,7 +223,7 @@ def _write_external_volume( create_volume_if_not_exists: bool, force: bool, ): - volume_exists = _exists_volume(catalog_name, schema_name, volume_name, client) + volume_exists = exists_volume(catalog_name, schema_name, volume_name, client) if volume_exists: volume = Volume.from_url(get_volume_url(catalog_name, schema_name, volume_name), client=client) assert volume.storage_location, f'External volume must have a storage location, got {volume=}' @@ -454,7 +454,7 @@ def get_comment_on_volume(volume: Volume, client: 'DbxIOClient') -> Union[str, N @dbxio_retry -def drop_volume(volume: Volume, client: 'DbxIOClient') -> None: +def drop_volume(volume: Volume, client: 'DbxIOClient', force: bool = False) -> None: """ Deletes a volume in Databricks. If the volume is external, it will also delete all blobs in the storage location. @@ -467,6 +467,9 @@ def drop_volume(volume: Volume, client: 'DbxIOClient') -> None: logger.debug(f'Blob {blob.name} was successfully deleted.') logger.info(f'External volume {volume.safe_full_name} was successfully cleaned up.') - - client.workspace_api.volumes.delete(volume.full_name) + try: + client.workspace_api.volumes.delete(volume.full_name) + except ResourceDoesNotExist as e: + if not force: + raise e logger.info(f'Volume {volume.safe_full_name} was successfully dropped.') diff --git a/tests/test_primary_types.py b/tests/test_primary_types.py index 0145933..296489c 100644 --- a/tests/test_primary_types.py +++ b/tests/test_primary_types.py @@ -367,3 +367,12 @@ def test_json(): assert types.JSONType().deserialize('1') == 1 assert types.JSONType().deserialize('"a"') == 'a' assert types.JSONType().deserialize('NULL') is None + + +def test_variant(): + assert types.VariantType().fit(1) + assert types.VariantType().fit('a') + assert types.VariantType().fit(None) + assert types.VariantType().fit(datetime.datetime(2014, 1, 1, 0, 0, 0)) + assert types.VariantType().fit([1, 2, 3]) + assert types.VariantType().fit({'a': 1}) diff --git a/tests/test_table.py b/tests/test_table.py index 7e38362..5dd1ef7 100644 --- a/tests/test_table.py +++ b/tests/test_table.py @@ -48,3 +48,8 @@ def test_safe_table_identifier(self): dbxio.Table(table_identifier='database!.schema.table+').safe_table_identifier, '`database_`.`schema`.`table_`', ) + + self.assertEqual( + dbxio.Table(table_identifier='`database!`.`schema`.`table+`').safe_table_identifier, + '`database_`.`schema`.`table_`', + ) diff --git a/tests/test_volume_commands.py b/tests/test_volume_commands.py index 6e0de6e..40175f8 100644 --- a/tests/test_volume_commands.py +++ b/tests/test_volume_commands.py @@ -5,6 +5,7 @@ from unittest.mock import patch import pytest +from databricks.sdk.errors.platform import ResourceDoesNotExist from databricks.sdk.service.catalog import VolumesAPI, VolumeType from databricks.sdk.service.files import DirectoryEntry, FilesAPI @@ -101,8 +102,8 @@ def _mock_download_blob_tree(object_storage_client, local_path: Path, prefix_pat @patch.object(VolumesAPI, 'create', return_value=None) -@patch('dbxio.volume.volume_commands._exists_volume', return_value=False) -def test_create_volume(mock_exists_volume, mock_volume_create): +@patch('dbxio.volume.volume_commands.exists_volume', return_value=False) +def test_create_volume(mockexists_volume, mock_volume_create): volume = Volume(catalog='catalog', schema='schema', name='volume') create_volume(volume, client) mock_volume_create.assert_called_once_with( @@ -115,8 +116,8 @@ def test_create_volume(mock_exists_volume, mock_volume_create): @patch.object(VolumesAPI, 'create', return_value=None) -@patch('dbxio.volume.volume_commands._exists_volume', return_value=True) -def test_create_volume__volume_exists(mock_exists_volume, mock_volume_create): +@patch('dbxio.volume.volume_commands.exists_volume', return_value=True) +def test_create_volume__volume_exists(mockexists_volume, mock_volume_create): volume = Volume(catalog='catalog', schema='schema', name='volume') create_volume(volume, client) mock_volume_create.assert_not_called() @@ -407,3 +408,14 @@ def test_drop_volume__external( mock_volume_delete.assert_called_once_with(volume.full_name) assert mock_try_delete_blob.call_count == 2 + + +@patch.object(VolumesAPI, 'delete', side_effect=ResourceDoesNotExist()) +def test_drop_volume_force(mock_volume_delete): + volume = Volume( + catalog='catalog', + schema='schema', + name='volume', + volume_type=VolumeType.MANAGED, + ) + drop_volume(volume, client, force=True) From 3a5dbdc4b3b7073579065eec39bfbe49e6b151ad Mon Sep 17 00:00:00 2001 From: Nikita Yurasov Date: Tue, 10 Sep 2024 12:08:12 +0200 Subject: [PATCH 2/2] feat: bump version --- dbxio/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbxio/__init__.py b/dbxio/__init__.py index 5c5d660..d1e6f32 100644 --- a/dbxio/__init__.py +++ b/dbxio/__init__.py @@ -4,4 +4,4 @@ from dbxio.utils import * # noqa: F403 from dbxio.volume import * # noqa: F403 -__version__ = '0.4.3' # single source of truth +__version__ = '0.4.4' # single source of truth