Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Modify in-place compacted check to include rebase source partition locator #318

Merged
merged 10 commits into from
Jun 11, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions deltacat/compute/compactor_v2/compaction_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -662,13 +662,16 @@ def merge_input_provider(index, item):
)

logger.info(
f"partition-{params.source_partition_locator.partition_values},"
f"Partition-{params.source_partition_locator.partition_values},"
f"compacted at: {params.last_stream_position_to_compact},"
)
logger.info(
f"Checking if partition {rcf_source_partition_locator} is inplace compacted against {params.destination_partition_locator}..."
)
is_inplace_compacted: bool = (
params.source_partition_locator.partition_values
rcf_source_partition_locator.partition_values
yankevn marked this conversation as resolved.
Show resolved Hide resolved
== params.destination_partition_locator.partition_values
and params.source_partition_locator.stream_id
and rcf_source_partition_locator.stream_id
== params.destination_partition_locator.stream_id
)
if is_inplace_compacted:
Expand Down
57 changes: 56 additions & 1 deletion deltacat/tests/compute/compactor_v2/test_compaction_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,16 @@
from unittest.mock import patch
import deltacat.tests.local_deltacat_storage as ds
from deltacat.types.media import ContentType
from deltacat.compute.compactor_v2.compaction_session import compact_partition
from deltacat.compute.compactor_v2.compaction_session import (
compact_partition,
_execute_compaction,
)
from deltacat.compute.compactor.model.compact_partition_params import (
CompactPartitionParams,
)
from deltacat.compute.compactor_v2.model.compaction_session import (
ExecutionCompactionResult,
)
from deltacat.utils.common import current_time_ms
from deltacat.tests.test_utils.pyarrow import stage_partition_from_file_paths

Expand Down Expand Up @@ -86,3 +92,52 @@ def test_compact_partition_when_no_input_deltas_to_compact(self, s3_utils, rcf_u

# verify that no RCF is written
self.assertIsNone(rcf_url)

@patch("deltacat.compute.compactor_v2.compaction_session.rcf")
@patch("deltacat.compute.compactor_v2.compaction_session.s3_utils")
def test_compact_partition_rebase_no_source_partition(self, s3_utils, rcf_url):
# setup
rcf_url.read_round_completion_file.return_value = None
staged_source = stage_partition_from_file_paths(
self.NAMESPACE, ["test"], **self.deltacat_storage_kwargs
)
source_partition = ds.commit_partition(
staged_source, **self.deltacat_storage_kwargs
)

staged_dest = stage_partition_from_file_paths(
self.NAMESPACE, ["destination"], **self.deltacat_storage_kwargs
)
dest_partition = ds.commit_partition(
staged_dest, **self.deltacat_storage_kwargs
)

# action
execute_compaction_result: ExecutionCompactionResult = _execute_compaction(
CompactPartitionParams.of(
{
"compaction_artifact_s3_bucket": "test_bucket",
"compacted_file_content_type": ContentType.PARQUET,
"dd_max_parallelism_ratio": 1.0,
"deltacat_storage": ds,
"deltacat_storage_kwargs": self.deltacat_storage_kwargs,
"destination_partition_locator": dest_partition.locator,
"drop_duplicates": True,
"hash_bucket_count": 1,
"last_stream_position_to_compact": source_partition.stream_position,
"list_deltas_kwargs": {
**self.deltacat_storage_kwargs,
**{"equivalent_table_types": []},
},
"primary_keys": [],
"rebase_source_partition_locator": source_partition.locator,
"rebase_source_partition_high_watermark": None,
"records_per_compacted_file": 4000,
"s3_client_kwargs": {},
"source_partition_locator": source_partition.locator,
}
)
)

# verify that the job was NOT marked as in-place compacted
self.assertFalse(execute_compaction_result.is_inplace_compacted)
Loading