Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Modify in-place compacted check to include rebase source partition locator #318

Merged
merged 10 commits into from
Jun 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions deltacat/compute/compactor_v2/compaction_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,12 +123,12 @@ def compact_partition(params: CompactPartitionParams, **kwargs) -> Optional[str]
f"Committing compacted partition to: {execute_compaction_result.new_compacted_partition.locator} "
f"using previous partition: {previous_partition.locator if previous_partition else None}"
)
commited_partition: Partition = params.deltacat_storage.commit_partition(
committed_partition: Partition = params.deltacat_storage.commit_partition(
execute_compaction_result.new_compacted_partition,
previous_partition,
**params.deltacat_storage_kwargs,
)
logger.info(f"Committed compacted partition: {commited_partition}")
logger.info(f"Committed compacted partition: {committed_partition}")
round_completion_file_s3_url = rcf.write_round_completion_file(
params.compaction_artifact_s3_bucket,
execute_compaction_result.new_round_completion_file_partition_locator,
Expand Down Expand Up @@ -662,13 +662,16 @@ def merge_input_provider(index, item):
)

logger.info(
f"partition-{params.source_partition_locator.partition_values},"
f"Partition-{params.source_partition_locator.partition_values},"
f"compacted at: {params.last_stream_position_to_compact},"
)
logger.info(
f"Checking if partition {rcf_source_partition_locator} is inplace compacted against {params.destination_partition_locator}..."
)
is_inplace_compacted: bool = (
params.source_partition_locator.partition_values
rcf_source_partition_locator.partition_values
yankevn marked this conversation as resolved.
Show resolved Hide resolved
== params.destination_partition_locator.partition_values
and params.source_partition_locator.stream_id
and rcf_source_partition_locator.stream_id
== params.destination_partition_locator.stream_id
)
if is_inplace_compacted:
Expand Down
88 changes: 88 additions & 0 deletions deltacat/tests/compute/compact_partition_rebase_test_cases.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
import pyarrow as pa
from deltacat.tests.compute.test_util_common import (
PartitionKey,
PartitionKeyType,
)
from deltacat.tests.compute.test_util_constant import (
DEFAULT_MAX_RECORDS_PER_FILE,
DEFAULT_HASH_BUCKET_COUNT,
)
from dataclasses import dataclass


from deltacat.storage import (
DeltaType,
)

from deltacat.compute.compactor.model.compactor_version import CompactorVersion

from deltacat.storage.model.sort_key import SortKey

from deltacat.tests.compute.compact_partition_test_cases import (
BaseCompactorTestCase,
with_compactor_version_func_test_param,
)


@dataclass(frozen=True)
class RebaseCompactionTestCaseParams(BaseCompactorTestCase):
"""
A pytest parameterized test case for the `compact_partition` function with rebase compaction.

Args:
* (inherited from CompactorTestCase): see CompactorTestCase docstring for details
rebase_expected_compact_partition_result: pa.Table - expected table after rebase compaction runs. An output that is asserted on in Rebase unit tests
"""

yankevn marked this conversation as resolved.
Show resolved Hide resolved
rebase_expected_compact_partition_result: pa.Table


REBASE_TEST_CASES = {
"1-rebase-sanity": RebaseCompactionTestCaseParams(
primary_keys={"pk_col_1"},
sort_keys=[
SortKey.of(key_name="sk_col_1"),
SortKey.of(key_name="sk_col_2"),
],
partition_keys=[PartitionKey.of("region_id", PartitionKeyType.INT)],
partition_values=["1"],
input_deltas=pa.Table.from_arrays(
[
pa.array([str(i) for i in range(10)]),
pa.array([i for i in range(0, 10)]),
pa.array(["foo"] * 10),
pa.array([i / 10 for i in range(10, 20)]),
],
names=["pk_col_1", "sk_col_1", "sk_col_2", "col_1"],
),
input_deltas_delta_type=DeltaType.UPSERT,
rebase_expected_compact_partition_result=pa.Table.from_arrays(
[
pa.array([str(i) for i in range(10)]),
pa.array([i for i in range(0, 10)]),
pa.array(["foo"] * 10),
pa.array([i / 10 for i in range(10, 20)]),
],
names=["pk_col_1", "sk_col_1", "sk_col_2", "col_1"],
),
expected_terminal_compact_partition_result=pa.Table.from_arrays(
[
pa.array([str(i) for i in range(10)]),
pa.array([i for i in range(20, 30)]),
pa.array(["foo"] * 10),
pa.array([i / 10 for i in range(40, 50)]),
],
names=["pk_col_1", "sk_col_1", "sk_col_2", "col_1"],
),
expected_terminal_exception=None,
expected_terminal_exception_message=None,
do_create_placement_group=False,
records_per_compacted_file=DEFAULT_MAX_RECORDS_PER_FILE,
hash_bucket_count=DEFAULT_HASH_BUCKET_COUNT,
read_kwargs_provider=None,
drop_duplicates=True,
skip_enabled_compact_partition_drivers=[CompactorVersion.V1],
),
}

REBASE_TEST_CASES = with_compactor_version_func_test_param(REBASE_TEST_CASES)
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
from unittest.mock import patch
import deltacat.tests.local_deltacat_storage as ds
from deltacat.types.media import ContentType
from deltacat.compute.compactor_v2.compaction_session import compact_partition
from deltacat.compute.compactor_v2.compaction_session import (
compact_partition,
)
from deltacat.compute.compactor.model.compact_partition_params import (
CompactPartitionParams,
)
Expand Down
Loading
Loading