From b0b95924f5dda7a1b933c75e12acd6afe403415f Mon Sep 17 00:00:00 2001 From: Raghavendra M Dani Date: Wed, 30 Oct 2024 10:45:33 -0700 Subject: [PATCH] Fix a bug that caused the RCF to be written incorrectly during multiple rounds (#369) * Fix a bug that caused the RCF to be written incorrectly during multiple rounds * Add comment --- .../compute/compactor_v2/private/compaction_utils.py | 5 ++++- .../compute/test_compact_partition_incremental.py | 10 ++++++++++ .../compute/test_compact_partition_multiple_rounds.py | 10 ++++++++++ .../tests/compute/test_compact_partition_rebase.py | 11 +++++++++++ .../test_compact_partition_rebase_then_incremental.py | 10 ++++++++++ 5 files changed, 45 insertions(+), 1 deletion(-) diff --git a/deltacat/compute/compactor_v2/private/compaction_utils.py b/deltacat/compute/compactor_v2/private/compaction_utils.py index a5dae1aa..5f0ad38a 100644 --- a/deltacat/compute/compactor_v2/private/compaction_utils.py +++ b/deltacat/compute/compactor_v2/private/compaction_utils.py @@ -584,8 +584,11 @@ def _process_merge_results( f"Duplicate record count ({duplicate_hash_bucket_mat_results}) is as large " f"as or greater than params.num_rounds, which is {params.num_rounds}" ) + # ensure start index is the first file index if task index is same hb_id_to_entry_indices_range[str(mat_result.task_index)] = ( - file_index, + hb_id_to_entry_indices_range.get(str(mat_result.task_index), [file_index])[ + 0 + ], file_index + mat_result.pyarrow_write_result.files, ) diff --git a/deltacat/tests/compute/test_compact_partition_incremental.py b/deltacat/tests/compute/test_compact_partition_incremental.py index 5e53440e..c970e5c5 100644 --- a/deltacat/tests/compute/test_compact_partition_incremental.py +++ b/deltacat/tests/compute/test_compact_partition_incremental.py @@ -328,6 +328,16 @@ def _incremental_compaction_setup(): **compaction_audit_obj ) + # assert if RCF covers all files + if compactor_version != CompactorVersion.V1.value: + previous_end = None + for start, end in round_completion_info.hb_index_to_entry_range.values(): + assert (previous_end is None and start == 0) or start == previous_end + previous_end = end + assert ( + previous_end == round_completion_info.compacted_pyarrow_write_result.files + ) + tables = ds.download_delta( compacted_delta_locator, storage_type=StorageType.LOCAL, **ds_mock_kwargs ) diff --git a/deltacat/tests/compute/test_compact_partition_multiple_rounds.py b/deltacat/tests/compute/test_compact_partition_multiple_rounds.py index c1e3c9fc..bcaa10c4 100644 --- a/deltacat/tests/compute/test_compact_partition_multiple_rounds.py +++ b/deltacat/tests/compute/test_compact_partition_multiple_rounds.py @@ -309,6 +309,16 @@ def test_compact_partition_rebase_multiple_rounds_same_source_and_destination( **compaction_audit_obj ) + # assert if RCF covers all files + # multiple rounds feature is only supported in V2 compactor + previous_end = None + for start, end in round_completion_info.hb_index_to_entry_range.values(): + assert (previous_end is None and start == 0) or start == previous_end + previous_end = end + assert ( + previous_end == round_completion_info.compacted_pyarrow_write_result.files + ) + # Assert not in-place compacted assert ( execute_compaction_result_spy.call_args.args[-1] is False diff --git a/deltacat/tests/compute/test_compact_partition_rebase.py b/deltacat/tests/compute/test_compact_partition_rebase.py index a1b4088c..7b2004f0 100644 --- a/deltacat/tests/compute/test_compact_partition_rebase.py +++ b/deltacat/tests/compute/test_compact_partition_rebase.py @@ -299,6 +299,17 @@ def test_compact_partition_rebase_same_source_and_destination( round_completion_info.compaction_audit_url ) + # assert if RCF covers all files + if compactor_version != CompactorVersion.V1.value: + previous_end = None + for start, end in round_completion_info.hb_index_to_entry_range.values(): + assert (previous_end is None and start == 0) or start == previous_end + previous_end = end + assert ( + previous_end + == round_completion_info.compacted_pyarrow_write_result.files + ) + compaction_audit_obj: Dict[str, Any] = read_s3_contents( s3_resource, audit_bucket, audit_key ) diff --git a/deltacat/tests/compute/test_compact_partition_rebase_then_incremental.py b/deltacat/tests/compute/test_compact_partition_rebase_then_incremental.py index 1dbbe384..ac6f1d00 100644 --- a/deltacat/tests/compute/test_compact_partition_rebase_then_incremental.py +++ b/deltacat/tests/compute/test_compact_partition_rebase_then_incremental.py @@ -355,6 +355,16 @@ def test_compact_partition_rebase_then_incremental( compacted_delta_locator_incremental: DeltaLocator = ( round_completion_info.compacted_delta_locator ) + # assert if RCF covers all files + if compactor_version != CompactorVersion.V1.value: + previous_end = None + for start, end in round_completion_info.hb_index_to_entry_range.values(): + assert (previous_end is None and start == 0) or start == previous_end + previous_end = end + assert ( + previous_end == round_completion_info.compacted_pyarrow_write_result.files + ) + audit_bucket, audit_key = round_completion_info.compaction_audit_url.replace( "s3://", "" ).split("/", 1)