Skip to content

Commit

Permalink
Clear multiround (#367)
Browse files Browse the repository at this point in the history
* Switch from delete_many to clear at the end of each round in multi-round compaction

* Move pull request template

* Bump deltacat version to 1.1.26

---------

Co-authored-by: Kevin Yan <[email protected]>
  • Loading branch information
yankevn and Kevin Yan authored Oct 23, 2024
1 parent 26fce7f commit 88ccf0c
Show file tree
Hide file tree
Showing 7 changed files with 38 additions and 12 deletions.
File renamed without changes.
2 changes: 1 addition & 1 deletion deltacat/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@

deltacat.logs.configure_deltacat_logger(logging.getLogger(__name__))

__version__ = "1.1.25"
__version__ = "1.1.26"


__all__ = [
Expand Down
7 changes: 2 additions & 5 deletions deltacat/compute/compactor_v2/private/compaction_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,6 @@ def _run_hash_and_merge(
previous_compacted_delta_manifest: Optional[Manifest],
compacted_partition: Partition,
) -> List[MergeResult]:
created_obj_ids = set()
telemetry_time_hb = 0
total_input_records_count = np.int64(0)
total_hb_record_count = np.int64(0)
Expand Down Expand Up @@ -289,7 +288,6 @@ def _run_hash_and_merge(
hb_result.hash_bucket_group_to_obj_id_tuple
):
if object_id_size_tuple:
created_obj_ids.add(object_id_size_tuple[0])
all_hash_group_idx_to_obj_id[hash_group_index].append(
object_id_size_tuple[0],
)
Expand Down Expand Up @@ -369,10 +367,9 @@ def _run_hash_and_merge(
)
if params.num_rounds > 1:
logger.info(
f"Detected number of rounds to be {params.num_rounds}, "
f"preparing to delete {len(created_obj_ids)} objects from object store..."
f"Detected number of rounds to be {params.num_rounds}, preparing to clear object store..."
)
params.object_store.delete_many(list(created_obj_ids))
params.object_store.clear()
else:
logger.info(
f"Detected number of rounds to be {params.num_rounds}, not cleaning up object store..."
Expand Down
7 changes: 6 additions & 1 deletion deltacat/io/memcached_object_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,8 @@ def delete_many(self, refs: List[Any], *args, **kwargs) -> bool:
except BaseException:
# if an exception is raised then all, some, or none of the keys may have been deleted
logger.warning(
f"Failed to fully delete refs: {current_refs}", exc_info=True
f"Failed to fully delete {len(current_refs)} refs for ip: {ip}",
exc_info=True,
)
all_deleted = False

Expand All @@ -224,17 +225,21 @@ def delete_many(self, refs: List[Any], *args, **kwargs) -> bool:
return all_deleted

def clear(self) -> bool:
start = time.monotonic()
flushed = all(
[
self._get_client_by_ip(ip).flush_all(noreply=False)
for ip in self.storage_node_ips
]
)
self.client_cache.clear()
end = time.monotonic()

if flushed:
logger.info("Successfully cleared cache contents.")

logger.info(f"The total time taken to clear the cache is: {end - start}")

return flushed

def close(self) -> None:
Expand Down
15 changes: 15 additions & 0 deletions deltacat/io/redis_object_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,21 @@ def delete_many(self, refs: List[Any], *args, **kwargs) -> bool:

return num_deleted == len(refs)

def clear(self) -> bool:
start = time.monotonic()
current_ip = self._get_current_ip()
client = self._get_client_by_ip(current_ip)
flushed = client.flushall()
self.client_cache.clear()
end = time.monotonic()

if flushed:
logger.info("Successfully cleared cache contents.")

logger.info(f"The total time taken to clear the cache is: {end - start}")

return flushed

def _get_client_by_ip(self, ip_address: str):
if ip_address in self.client_cache:
return self.client_cache[ip_address]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ def test_compact_partition_rebase_multiple_rounds_same_source_and_destination(
execute_compaction_result_spy = mocker.spy(
ExecutionCompactionResult, "__init__"
)
object_store_delete_many_spy = mocker.spy(FileObjectStore, "delete_many")
object_store_clear_spy = mocker.spy(FileObjectStore, "clear")

# execute
rcf_file_s3_uri = benchmark(compact_partition_func, compact_partition_params)
Expand Down Expand Up @@ -339,8 +339,5 @@ def test_compact_partition_rebase_multiple_rounds_same_source_and_destination(
if assert_compaction_audit:
if not assert_compaction_audit(compactor_version, compaction_audit):
assert False, "Compaction audit assertion failed"
assert os.listdir(test_dir) == []
assert (
object_store_delete_many_spy.call_count
), "Object store was never cleaned up!"
assert object_store_clear_spy.call_count, "Object store was never cleaned up!"
return
12 changes: 12 additions & 0 deletions deltacat/tests/io/test_redis_object_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,3 +121,15 @@ def test_delete_many_sanity(self, mock_client):

self.assertTrue(delete_success)
self.assertEqual(2, mock_client.Redis.return_value.delete.call_count)

@mock.patch("deltacat.io.redis_object_store.redis")
def test_clear_sanity(self, mock_client):
# setup
mock_client.Redis.return_value.flushall.side_effect = [True]

# action
clear_success = self.object_store.clear()

# assert
self.assertTrue(clear_success)
self.assertEqual(1, mock_client.Redis.return_value.flushall.call_count)

0 comments on commit 88ccf0c

Please sign in to comment.