diff --git a/deltacat/.github/pull_request_template.md b/.github/pull_request_template.md similarity index 100% rename from deltacat/.github/pull_request_template.md rename to .github/pull_request_template.md diff --git a/deltacat/__init__.py b/deltacat/__init__.py index d65fcc5f..1f19744e 100644 --- a/deltacat/__init__.py +++ b/deltacat/__init__.py @@ -44,7 +44,7 @@ deltacat.logs.configure_deltacat_logger(logging.getLogger(__name__)) -__version__ = "1.1.25" +__version__ = "1.1.26" __all__ = [ diff --git a/deltacat/compute/compactor_v2/private/compaction_utils.py b/deltacat/compute/compactor_v2/private/compaction_utils.py index 34ff121d..a5dae1aa 100644 --- a/deltacat/compute/compactor_v2/private/compaction_utils.py +++ b/deltacat/compute/compactor_v2/private/compaction_utils.py @@ -227,7 +227,6 @@ def _run_hash_and_merge( previous_compacted_delta_manifest: Optional[Manifest], compacted_partition: Partition, ) -> List[MergeResult]: - created_obj_ids = set() telemetry_time_hb = 0 total_input_records_count = np.int64(0) total_hb_record_count = np.int64(0) @@ -289,7 +288,6 @@ def _run_hash_and_merge( hb_result.hash_bucket_group_to_obj_id_tuple ): if object_id_size_tuple: - created_obj_ids.add(object_id_size_tuple[0]) all_hash_group_idx_to_obj_id[hash_group_index].append( object_id_size_tuple[0], ) @@ -369,10 +367,9 @@ def _run_hash_and_merge( ) if params.num_rounds > 1: logger.info( - f"Detected number of rounds to be {params.num_rounds}, " - f"preparing to delete {len(created_obj_ids)} objects from object store..." + f"Detected number of rounds to be {params.num_rounds}, preparing to clear object store..." ) - params.object_store.delete_many(list(created_obj_ids)) + params.object_store.clear() else: logger.info( f"Detected number of rounds to be {params.num_rounds}, not cleaning up object store..." diff --git a/deltacat/io/memcached_object_store.py b/deltacat/io/memcached_object_store.py index 028f40d8..c94104fa 100644 --- a/deltacat/io/memcached_object_store.py +++ b/deltacat/io/memcached_object_store.py @@ -205,7 +205,8 @@ def delete_many(self, refs: List[Any], *args, **kwargs) -> bool: except BaseException: # if an exception is raised then all, some, or none of the keys may have been deleted logger.warning( - f"Failed to fully delete refs: {current_refs}", exc_info=True + f"Failed to fully delete {len(current_refs)} refs for ip: {ip}", + exc_info=True, ) all_deleted = False @@ -224,6 +225,7 @@ def delete_many(self, refs: List[Any], *args, **kwargs) -> bool: return all_deleted def clear(self) -> bool: + start = time.monotonic() flushed = all( [ self._get_client_by_ip(ip).flush_all(noreply=False) @@ -231,10 +233,13 @@ def clear(self) -> bool: ] ) self.client_cache.clear() + end = time.monotonic() if flushed: logger.info("Successfully cleared cache contents.") + logger.info(f"The total time taken to clear the cache is: {end - start}") + return flushed def close(self) -> None: diff --git a/deltacat/io/redis_object_store.py b/deltacat/io/redis_object_store.py index 99e31855..ac4298a1 100644 --- a/deltacat/io/redis_object_store.py +++ b/deltacat/io/redis_object_store.py @@ -115,6 +115,21 @@ def delete_many(self, refs: List[Any], *args, **kwargs) -> bool: return num_deleted == len(refs) + def clear(self) -> bool: + start = time.monotonic() + current_ip = self._get_current_ip() + client = self._get_client_by_ip(current_ip) + flushed = client.flushall() + self.client_cache.clear() + end = time.monotonic() + + if flushed: + logger.info("Successfully cleared cache contents.") + + logger.info(f"The total time taken to clear the cache is: {end - start}") + + return flushed + def _get_client_by_ip(self, ip_address: str): if ip_address in self.client_cache: return self.client_cache[ip_address] diff --git a/deltacat/tests/compute/test_compact_partition_multiple_rounds.py b/deltacat/tests/compute/test_compact_partition_multiple_rounds.py index e712ec7e..c1e3c9fc 100644 --- a/deltacat/tests/compute/test_compact_partition_multiple_rounds.py +++ b/deltacat/tests/compute/test_compact_partition_multiple_rounds.py @@ -290,7 +290,7 @@ def test_compact_partition_rebase_multiple_rounds_same_source_and_destination( execute_compaction_result_spy = mocker.spy( ExecutionCompactionResult, "__init__" ) - object_store_delete_many_spy = mocker.spy(FileObjectStore, "delete_many") + object_store_clear_spy = mocker.spy(FileObjectStore, "clear") # execute rcf_file_s3_uri = benchmark(compact_partition_func, compact_partition_params) @@ -339,8 +339,5 @@ def test_compact_partition_rebase_multiple_rounds_same_source_and_destination( if assert_compaction_audit: if not assert_compaction_audit(compactor_version, compaction_audit): assert False, "Compaction audit assertion failed" - assert os.listdir(test_dir) == [] - assert ( - object_store_delete_many_spy.call_count - ), "Object store was never cleaned up!" + assert object_store_clear_spy.call_count, "Object store was never cleaned up!" return diff --git a/deltacat/tests/io/test_redis_object_store.py b/deltacat/tests/io/test_redis_object_store.py index 88d03d2b..c042472f 100644 --- a/deltacat/tests/io/test_redis_object_store.py +++ b/deltacat/tests/io/test_redis_object_store.py @@ -121,3 +121,15 @@ def test_delete_many_sanity(self, mock_client): self.assertTrue(delete_success) self.assertEqual(2, mock_client.Redis.return_value.delete.call_count) + + @mock.patch("deltacat.io.redis_object_store.redis") + def test_clear_sanity(self, mock_client): + # setup + mock_client.Redis.return_value.flushall.side_effect = [True] + + # action + clear_success = self.object_store.clear() + + # assert + self.assertTrue(clear_success) + self.assertEqual(1, mock_client.Redis.return_value.flushall.call_count)