Skip to content

Commit

Permalink
Fix a bug that caused the RCF to be written incorrectly during multip…
Browse files Browse the repository at this point in the history
…le rounds
  • Loading branch information
raghumdani committed Oct 30, 2024
1 parent 1655368 commit 5ade815
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 1 deletion.
4 changes: 3 additions & 1 deletion deltacat/compute/compactor_v2/private/compaction_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -585,7 +585,9 @@ def _process_merge_results(
f"as or greater than params.num_rounds, which is {params.num_rounds}"
)
hb_id_to_entry_indices_range[str(mat_result.task_index)] = (
file_index,
hb_id_to_entry_indices_range.get(str(mat_result.task_index), [file_index])[
0
],
file_index + mat_result.pyarrow_write_result.files,
)

Expand Down
10 changes: 10 additions & 0 deletions deltacat/tests/compute/test_compact_partition_incremental.py
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,16 @@ def _incremental_compaction_setup():
**compaction_audit_obj
)

# assert if RCF covers all files
if compactor_version != CompactorVersion.V1.value:
previous_end = None
for start, end in round_completion_info.hb_index_to_entry_range.values():
assert (previous_end is None and start == 0) or start == previous_end
previous_end = end
assert (
previous_end == round_completion_info.compacted_pyarrow_write_result.files
)

tables = ds.download_delta(
compacted_delta_locator, storage_type=StorageType.LOCAL, **ds_mock_kwargs
)
Expand Down
10 changes: 10 additions & 0 deletions deltacat/tests/compute/test_compact_partition_multiple_rounds.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,16 @@ def test_compact_partition_rebase_multiple_rounds_same_source_and_destination(
**compaction_audit_obj
)

# assert if RCF covers all files
# multiple rounds feature is only supported in V2 compactor
previous_end = None
for start, end in round_completion_info.hb_index_to_entry_range.values():
assert (previous_end is None and start == 0) or start == previous_end
previous_end = end
assert (
previous_end == round_completion_info.compacted_pyarrow_write_result.files
)

# Assert not in-place compacted
assert (
execute_compaction_result_spy.call_args.args[-1] is False
Expand Down
11 changes: 11 additions & 0 deletions deltacat/tests/compute/test_compact_partition_rebase.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,17 @@ def test_compact_partition_rebase_same_source_and_destination(
round_completion_info.compaction_audit_url
)

# assert if RCF covers all files
if compactor_version != CompactorVersion.V1.value:
previous_end = None
for start, end in round_completion_info.hb_index_to_entry_range.values():
assert (previous_end is None and start == 0) or start == previous_end
previous_end = end
assert (
previous_end
== round_completion_info.compacted_pyarrow_write_result.files
)

compaction_audit_obj: Dict[str, Any] = read_s3_contents(
s3_resource, audit_bucket, audit_key
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,16 @@ def test_compact_partition_rebase_then_incremental(
compacted_delta_locator_incremental: DeltaLocator = (
round_completion_info.compacted_delta_locator
)
# assert if RCF covers all files
if compactor_version != CompactorVersion.V1.value:
previous_end = None
for start, end in round_completion_info.hb_index_to_entry_range.values():
assert (previous_end is None and start == 0) or start == previous_end
previous_end = end
assert (
previous_end == round_completion_info.compacted_pyarrow_write_result.files
)

audit_bucket, audit_key = round_completion_info.compaction_audit_url.replace(
"s3://", ""
).split("/", 1)
Expand Down

0 comments on commit 5ade815

Please sign in to comment.