Skip to content

Commit

Permalink
Merge pull request #132 from ray-project/repartition
Browse files Browse the repository at this point in the history
Support Repartition to split and organize the data into multiple groups
  • Loading branch information
valiantljk authored and Zyiqin-Miranda committed Jun 13, 2023
2 parents 141bb45 + fb6fd7d commit c633130
Show file tree
Hide file tree
Showing 7 changed files with 476 additions and 21 deletions.
19 changes: 19 additions & 0 deletions deltacat/compute/compactor/compaction_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,25 @@ def _execute_compaction_round(
)
logger.info(f"Getting {len(mat_tasks_pending)} materialize result(s)...")
mat_results = ray.get(mat_tasks_pending)
total_count_of_src_dfl_not_touched_list = [
m.count_of_src_dfl_not_touched for m in mat_results
]
total_length_src_dfl_list = [m.count_of_src_dfl for m in mat_results]
total_count_of_src_dfl_not_touched = sum(total_count_of_src_dfl_not_touched_list)
total_length_src_dfl = sum(total_length_src_dfl_list)
logger.info(
f"Got total of {total_count_of_src_dfl_not_touched} manifest files not touched."
)
logger.info(
f"Got total of {total_length_src_dfl} manifest files during compaction."
)
untouched_manifest_file_percentage = (
round(total_count_of_src_dfl_not_touched / total_length_src_dfl, 3) * 100
)
logger.info(
f"{untouched_manifest_file_percentage} percent of manifest files are not touched during materialize."
)

logger.info(f"Got {len(mat_results)} materialize result(s).")

mat_results = sorted(mat_results, key=lambda m: m.task_index)
Expand Down
18 changes: 16 additions & 2 deletions deltacat/compute/compactor/model/materialize_result.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Allow classes to use self-referencing Type hints in Python 3.7.
from __future__ import annotations

from typing import Any, Dict
from typing import Any, Dict, Optional

from deltacat.compute.compactor.model.pyarrow_write_result import PyArrowWriteResult
from deltacat.storage import Delta
Expand All @@ -10,12 +10,18 @@
class MaterializeResult(dict):
@staticmethod
def of(
delta: Delta, task_index: int, pyarrow_write_result: PyArrowWriteResult
delta: Delta,
task_index: int,
pyarrow_write_result: PyArrowWriteResult,
count_of_src_dfl_not_touched: Optional[int] = 0,
count_of_src_dfl: Optional[int] = 0,
) -> MaterializeResult:
materialize_result = MaterializeResult()
materialize_result["delta"] = delta
materialize_result["taskIndex"] = task_index
materialize_result["paWriteResult"] = pyarrow_write_result
materialize_result["countOfSrcFileNotTouched"] = count_of_src_dfl_not_touched
materialize_result["countOfSrcFile"] = count_of_src_dfl
return materialize_result

@property
Expand All @@ -35,3 +41,11 @@ def pyarrow_write_result(self) -> PyArrowWriteResult:
if val is not None and not isinstance(val, PyArrowWriteResult):
self["paWriteResult"] = val = PyArrowWriteResult(val)
return val

@property
def count_of_src_dfl_not_touched(self) -> int:
return self["countOfSrcFileNotTouched"]

@property
def count_of_src_dfl(self) -> int:
return self["countOfSrcFile"]
6 changes: 6 additions & 0 deletions deltacat/compute/compactor/model/repartition_result.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from typing import NamedTuple, List
from deltacat.storage import Delta


class RepartitionResult(NamedTuple):
range_deltas: List[Delta]
178 changes: 178 additions & 0 deletions deltacat/compute/compactor/repartition_session.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
import ray
import time
import logging
from deltacat import logs
from deltacat.utils.common import ReadKwargsProvider
import functools
import itertools
from deltacat.compute.compactor import (
RoundCompletionInfo,
SortKey,
)
from deltacat.types.media import ContentType
from deltacat.compute.compactor import DeltaAnnotated
from deltacat.utils.ray_utils.concurrency import (
invoke_parallel,
round_robin_options_provider,
)

from deltacat.compute.compactor.model.repartition_result import RepartitionResult
from deltacat.utils.placement import PlacementGroupConfig
from typing import List, Optional, Dict, Any
from deltacat.utils.ray_utils.runtime import live_node_resource_keys
from deltacat.compute.compactor.utils import io
from deltacat.compute.compactor.utils import round_completion_file as rcf
from deltacat.compute.compactor.steps import repartition as repar
from deltacat.compute.compactor.steps.repartition import RepartitionType
from deltacat.storage import (
Delta,
DeltaLocator,
PartitionLocator,
interface as unimplemented_deltacat_storage,
)
from deltacat.utils.metrics import MetricsConfig

logger = logs.configure_deltacat_logger(logging.getLogger(__name__))


# TODO:(rootliu) move this repartition function to a separate module under compute
def repartition(
source_partition_locator: PartitionLocator,
destination_partition_locator: PartitionLocator,
repartition_args: Any,
compaction_artifact_s3_bucket: str,
last_stream_position_to_compact: int,
repartition_type: RepartitionType = RepartitionType.RANGE,
rebase_source_partition_locator: Optional[PartitionLocator] = None,
rebase_source_partition_high_watermark: Optional[int] = None,
sort_keys: List[SortKey] = None,
records_per_repartitioned_file: int = 4_000_000,
min_file_count: int = 1000,
min_delta_bytes: int = 200 * 2**20,
repartitioned_file_content_type: ContentType = ContentType.PARQUET,
enable_profiler: bool = False,
metrics_config: Optional[MetricsConfig] = None,
pg_config: Optional[PlacementGroupConfig] = None,
list_deltas_kwargs: Optional[Dict[str, Any]] = None,
read_kwargs_provider: Optional[ReadKwargsProvider] = None,
deltacat_storage=unimplemented_deltacat_storage,
**kwargs,
) -> Optional[str]:

node_resource_keys = None
if pg_config: # use resource in each placement group
cluster_resources = pg_config.resource
cluster_cpus = cluster_resources["CPU"]
else: # use all cluster resource
cluster_resources = ray.cluster_resources()
logger.info(f"Total cluster resources: {cluster_resources}")
logger.info(f"Available cluster resources: {ray.available_resources()}")
cluster_cpus = int(cluster_resources["CPU"])
logger.info(f"Total cluster CPUs: {cluster_cpus}")
node_resource_keys = live_node_resource_keys()
logger.info(
f"Found {len(node_resource_keys)} live cluster nodes: "
f"{node_resource_keys}"
)

# create a remote options provider to round-robin tasks across all nodes or allocated bundles
logger.info(f"Setting round robin scheduling with node id:{node_resource_keys}")
round_robin_opt_provider = functools.partial(
round_robin_options_provider,
resource_keys=node_resource_keys,
pg_config=pg_config.opts if pg_config else None,
)

(deltas, _,) = io.discover_deltas(
source_partition_locator,
None,
last_stream_position_to_compact,
destination_partition_locator,
rebase_source_partition_locator,
rebase_source_partition_high_watermark,
deltacat_storage,
**list_deltas_kwargs,
)

uniform_deltas = []
for delta in deltas:
uniform_deltas_part = DeltaAnnotated.rebatch(
[DeltaAnnotated.of(delta)],
min_delta_bytes=min_delta_bytes,
min_file_counts=min_file_count,
)
uniform_deltas.extend(uniform_deltas_part)

logger.info(f"Retrieved a total of {len(uniform_deltas)} uniform deltas.")

max_parallelism = cluster_cpus
# create a new stream for this round
compacted_stream_locator = destination_partition_locator.stream_locator
stream = deltacat_storage.get_stream(
compacted_stream_locator.namespace,
compacted_stream_locator.table_name,
compacted_stream_locator.table_version,
)
partition = deltacat_storage.stage_partition(
stream,
destination_partition_locator.partition_values,
)
new_compacted_partition_locator = partition.locator
repar_start = time.time()
repar_tasks_pending = invoke_parallel(
items=uniform_deltas,
ray_task=repar.repartition,
max_parallelism=max_parallelism,
options_provider=round_robin_opt_provider,
repartition_type=repartition_type,
repartition_args=repartition_args,
max_records_per_output_file=records_per_repartitioned_file,
destination_partition=partition,
enable_profiler=enable_profiler,
metrics_config=metrics_config,
read_kwargs_provider=read_kwargs_provider,
repartitioned_file_content_type=repartitioned_file_content_type,
deltacat_storage=deltacat_storage,
)
logger.info(f"Getting {len(repar_tasks_pending)} task results...")
repar_results: List[RepartitionResult] = ray.get(repar_tasks_pending)
repar_results: List[Delta] = [rp.range_deltas for rp in repar_results]
transposed = list(itertools.zip_longest(*repar_results, fillvalue=None))
ordered_deltas: List[Delta] = [
i for sublist in transposed for i in sublist if i is not None
]
repar_end = time.time()
logger.info(f"repartition {repar_end - repar_start} seconds")
logger.info(f"Got {len(ordered_deltas)} task results.")
# ordered_deltas are ordered as [cold1, cold2, coldN, hot1, hot2, hotN]
merged_delta = Delta.merge_deltas(ordered_deltas)
compacted_delta = deltacat_storage.commit_delta(
merged_delta, properties=kwargs.get("properties", {})
)
deltacat_storage.commit_partition(partition)
logger.info(f"Committed final delta: {compacted_delta}")
logger.info(f"Job run completed successfully!")
new_compacted_delta_locator = DeltaLocator.of(
new_compacted_partition_locator,
compacted_delta.stream_position,
)
bit_width_of_sort_keys = SortKey.validate_sort_keys(
source_partition_locator,
sort_keys,
deltacat_storage,
)
new_round_completion_info = RoundCompletionInfo.of(
last_stream_position_to_compact,
new_compacted_delta_locator,
None,
bit_width_of_sort_keys,
None,
)
rcf_source_partition_locator = source_partition_locator
round_completion_file_s3_url = None
round_completion_file_s3_url = rcf.write_round_completion_file(
compaction_artifact_s3_bucket,
rcf_source_partition_locator,
new_round_completion_info,
)
return round_completion_file_s3_url
Loading

0 comments on commit c633130

Please sign in to comment.