Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[skip untouched files]Enable skipping untouched files during materialize #137

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion deltacat/compute/compactor/compaction_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,25 @@ def _execute_compaction_round(
)
logger.info(f"Getting {len(mat_tasks_pending)} materialize result(s)...")
mat_results = ray.get(mat_tasks_pending)
total_count_of_src_dfl_not_touched = sum(
m.count_of_src_dfl_not_touched for m in mat_results
)
total_length_src_dfl = sum(m.count_of_src_dfl for m in mat_results)
logger.info(
f"Got total of {total_count_of_src_dfl_not_touched} manifest files not touched."
)
logger.info(
f"Got total of {total_length_src_dfl} manifest files during compaction."
)
manifest_entry_copied_by_reference_ratio = (
(round(total_count_of_src_dfl_not_touched / total_length_src_dfl, 4) * 100)
if total_length_src_dfl != 0
else None
)
logger.info(
f"{manifest_entry_copied_by_reference_ratio} percent of manifest files are copied by reference during materialize."
)

logger.info(f"Got {len(mat_results)} materialize result(s).")

mat_results = sorted(mat_results, key=lambda m: m.task_index)
Expand Down Expand Up @@ -491,13 +510,13 @@ def _execute_compaction_round(
PyArrowWriteResult.union([m.pyarrow_write_result for m in mat_results]),
bit_width_of_sort_keys,
last_rebase_source_partition_locator,
manifest_entry_copied_by_reference_ratio,
)
rcf_source_partition_locator = (
rebase_source_partition_locator
if rebase_source_partition_locator
else source_partition_locator
)

logger.info(
f"partition-{source_partition_locator.partition_values},"
f"compacted at: {last_stream_position_compacted},"
Expand Down
18 changes: 16 additions & 2 deletions deltacat/compute/compactor/model/materialize_result.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Allow classes to use self-referencing Type hints in Python 3.7.
from __future__ import annotations

from typing import Any, Dict
from typing import Any, Dict, Optional

from deltacat.compute.compactor.model.pyarrow_write_result import PyArrowWriteResult
from deltacat.storage import Delta
Expand All @@ -10,12 +10,18 @@
class MaterializeResult(dict):
@staticmethod
def of(
delta: Delta, task_index: int, pyarrow_write_result: PyArrowWriteResult
delta: Delta,
task_index: int,
pyarrow_write_result: PyArrowWriteResult,
count_of_src_dfl_not_touched: Optional[int] = 0,
count_of_src_dfl: Optional[int] = 0,
) -> MaterializeResult:
materialize_result = MaterializeResult()
materialize_result["delta"] = delta
materialize_result["taskIndex"] = task_index
materialize_result["paWriteResult"] = pyarrow_write_result
materialize_result["countOfSrcFileNotTouched"] = count_of_src_dfl_not_touched
materialize_result["countOfSrcFile"] = count_of_src_dfl
return materialize_result

@property
Expand All @@ -35,3 +41,11 @@ def pyarrow_write_result(self) -> PyArrowWriteResult:
if val is not None and not isinstance(val, PyArrowWriteResult):
self["paWriteResult"] = val = PyArrowWriteResult(val)
return val

@property
def count_of_src_dfl_not_touched(self) -> int:
return self["countOfSrcFileNotTouched"]

@property
def count_of_src_dfl(self) -> int:
return self["countOfSrcFile"]
8 changes: 8 additions & 0 deletions deltacat/compute/compactor/model/round_completion_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ def of(
compacted_pyarrow_write_result: PyArrowWriteResult,
sort_keys_bit_width: int,
rebase_source_partition_locator: Optional[PartitionLocator],
manifest_entry_copied_by_reference_ratio: Optional[float] = None,
) -> RoundCompletionInfo:

rci = RoundCompletionInfo()
Expand All @@ -46,6 +47,9 @@ def of(
rci["compactedPyarrowWriteResult"] = compacted_pyarrow_write_result
rci["sortKeysBitWidth"] = sort_keys_bit_width
rci["rebaseSourcePartitionLocator"] = rebase_source_partition_locator
rci[
"manifestEntryCopiedByReferenceRatio"
] = manifest_entry_copied_by_reference_ratio
return rci

@property
Expand Down Expand Up @@ -80,3 +84,7 @@ def sort_keys_bit_width(self) -> int:
@property
def rebase_source_partition_locator(self) -> Optional[PartitionLocator]:
return self.get("rebaseSourcePartitionLocator")

@property
def manifest_entry_copied_by_reference_ratio(self) -> Optional[float]:
return self["manifestEntryCopiedByReferenceRatio"]
143 changes: 116 additions & 27 deletions deltacat/compute/compactor/steps/materialize.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import importlib
import logging
import time
from uuid import uuid4
from collections import defaultdict
from contextlib import nullcontext
from itertools import chain, repeat
from typing import List, Optional, Tuple, Dict, Any
from typing import List, Optional, Tuple, Dict, Any, Union
import pyarrow as pa
import ray
from ray import cloudpickle
Expand All @@ -18,7 +19,18 @@
DedupeTaskIndexWithObjectId,
DeltaFileLocatorToRecords,
)
from deltacat.storage import Delta, DeltaLocator, Partition, PartitionLocator
from deltacat.storage import (
Delta,
DeltaLocator,
DeltaType,
Partition,
PartitionLocator,
Manifest,
ManifestEntry,
LocalDataset,
LocalTable,
DistributedDataset,
)
from deltacat.storage import interface as unimplemented_deltacat_storage
from deltacat.utils.common import ReadKwargsProvider
from deltacat.types.media import DELIMITED_TEXT_CONTENT_TYPES, ContentType
Expand Down Expand Up @@ -56,12 +68,44 @@ def materialize(
read_kwargs_provider: Optional[ReadKwargsProvider] = None,
s3_table_writer_kwargs: Optional[Dict[str, Any]] = None,
deltacat_storage=unimplemented_deltacat_storage,
) -> MaterializeResult:
):
def _stage_delta_implementation(
data: Union[LocalTable, LocalDataset, DistributedDataset, Manifest],
partition: Partition,
stage_delta_from_existing_manifest: Optional[bool],
) -> Delta:
if stage_delta_from_existing_manifest:
delta = Delta.of(
locator=DeltaLocator.of(partition.locator),
delta_type=DeltaType.UPSERT,
meta=manifest.meta,
manifest=data,
previous_stream_position=partition.stream_position,
properties={},
)
return delta

def _stage_delta_from_manifest_entry_reference_list(
manifest_entry_list_reference: List[ManifestEntry],
partition: Partition,
delta_type: DeltaType = DeltaType.UPSERT,
) -> Delta:
assert (
delta_type == DeltaType.UPSERT
), "Stage delta with existing manifest entries only supports UPSERT delta type!"
manifest = Manifest.of(entries=manifest_entry_list_reference, uuid=str(uuid4()))
delta = _stage_delta_implementation(
data=manifest,
partition=partition,
delta_type=delta_type,
stage_delta_from_existing_manifest=True,
)
return delta

# TODO (rkenmi): Add docstrings for the steps in the compaction workflow
# https://github.com/ray-project/deltacat/issues/79
def _materialize(compacted_tables: List[pa.Table]) -> MaterializeResult:
compacted_table = pa.concat_tables(compacted_tables)

if compacted_file_content_type in DELIMITED_TEXT_CONTENT_TYPES:
# TODO (rkenmi): Investigate if we still need to convert this table to pandas DataFrame
# TODO (pdames): compare performance to pandas-native materialize path
Expand Down Expand Up @@ -92,11 +136,11 @@ def _materialize(compacted_tables: List[pa.Table]) -> MaterializeResult:
f"({len(compacted_table)})",
)
materialize_result = MaterializeResult.of(
delta,
mat_bucket_index,
delta=delta,
task_index=mat_bucket_index,
# TODO (pdames): Generalize WriteResult to contain in-memory-table-type
# and in-memory-table-bytes instead of tight coupling to paBytes
PyArrowWriteResult.of(
pyarrow_write_result=PyArrowWriteResult.of(
len(manifest.entries),
TABLE_CLASS_TO_SIZE_FUNC[type(compacted_table)](compacted_table),
manifest.meta.content_length,
Expand Down Expand Up @@ -138,6 +182,9 @@ def _materialize(compacted_tables: List[pa.Table]) -> MaterializeResult:
manifest_cache = {}
materialized_results: List[MaterializeResult] = []
record_batch_tables = RecordBatchTables(max_records_per_output_file)
count_of_src_dfl = 0
manifest_entry_list_reference = []
referenced_pyarrow_write_results = []
for src_dfl in sorted(all_src_file_records.keys()):
record_numbers_dd_task_idx_tpl_list: List[
Tuple[DeltaFileLocatorToRecords, repeat]
Expand All @@ -148,11 +195,13 @@ def _materialize(compacted_tables: List[pa.Table]) -> MaterializeResult:
is_src_partition_file_np = src_dfl.is_source_delta
src_stream_position_np = src_dfl.stream_position
src_file_idx_np = src_dfl.file_index
count_of_src_dfl += 1
src_file_partition_locator = (
source_partition_locator
if is_src_partition_file_np
else round_completion_info.compacted_delta_locator.partition_locator
)

delta_locator = DeltaLocator.of(
src_file_partition_locator,
src_stream_position_np.item(),
Expand Down Expand Up @@ -185,39 +234,79 @@ def _materialize(compacted_tables: List[pa.Table]) -> MaterializeResult:
f" to download delta locator {delta_locator} with entry ID {src_file_idx_np.item()}"
f" is: {download_delta_manifest_entry_time}s"
)
mask_pylist = list(repeat(False, len(pa_table)))
record_numbers = chain.from_iterable(record_numbers_tpl)
# TODO(raghumdani): reference the same file URIs while writing the files
# instead of copying the data over and creating new files.
record_numbers_length = 0
mask_pylist = list(repeat(False, len(pa_table)))
for record_number in record_numbers:
record_numbers_length += 1
mask_pylist[record_number] = True
mask = pa.array(mask_pylist)
pa_table = pa_table.filter(mask)
record_batch_tables.append(pa_table)
if record_batch_tables.has_batches():
batched_tables = record_batch_tables.evict()
materialized_results.append(_materialize(batched_tables))
if (
record_numbers_length == len(pa_table)
and src_file_partition_locator
== round_completion_info.compacted_delta_locator.partition_locator
):
logger.debug(
f"Untouched manifest file found, "
f"record numbers length: {record_numbers_length} "
f"same as downloaded table length: {len(pa_table)}"
)
untouched_src_manifest_entry = manifest.entries[src_file_idx_np.item()]
manifest_entry_list_reference.append(untouched_src_manifest_entry)
referenced_pyarrow_write_result = PyArrowWriteResult.of(
len(untouched_src_manifest_entry.entries),
TABLE_CLASS_TO_SIZE_FUNC[type(pa_table)](pa_table),
manifest.meta.content_length,
len(pa_table),
)
referenced_pyarrow_write_results.append(referenced_pyarrow_write_result)
else:
mask = pa.array(mask_pylist)
pa_table = pa_table.filter(mask)
record_batch_tables.append(pa_table)
if record_batch_tables.has_batches():
batched_tables = record_batch_tables.evict()
materialized_results.append(_materialize(batched_tables))

if record_batch_tables.has_remaining():
materialized_results.append(_materialize(record_batch_tables.remaining))

merged_delta = Delta.merge_deltas([mr.delta for mr in materialized_results])
assert (
materialized_results and len(materialized_results) > 0
), f"Expected at least one materialized result in materialize step."
logger.info(f"Got {count_of_src_dfl} source delta files during materialize")

referenced_manifest_delta = (
_stage_delta_from_manifest_entry_reference_list(
manifest_entry_list_reference
)
if manifest_entry_list_reference
else None
)
if referenced_manifest_delta:
logger.info(
f"Got delta with {len(referenced_manifest_delta.manifest.entries)} referenced manifest entries"
)

merged_materialized_delta = [mr.delta for mr in materialized_results]
merged_materialized_delta.append(referenced_manifest_delta)
merged_delta = Delta.merge_deltas(
[d for d in merged_materialized_delta if d is not None]
)

write_results_union = referenced_pyarrow_write_results
if materialized_results:
for mr in materialized_results:
write_results_union.append(mr.pyarrow_write_result)
write_result = PyArrowWriteResult.union(write_results_union)

write_results = [mr.pyarrow_write_result for mr in materialized_results]
logger.debug(
f"{len(write_results)} files written"
f" with records: {[wr.records for wr in write_results]}"
f"{len(write_results_union)} files written"
f" with records: {[wr.records for wr in write_results_union]}"
)
# Merge all new deltas into one for this materialize bucket index
merged_materialize_result = MaterializeResult.of(
merged_delta,
materialized_results[0].task_index,
PyArrowWriteResult.union(
[mr.pyarrow_write_result for mr in materialized_results]
),
mat_bucket_index,
write_result,
len(manifest_entry_list_reference),
count_of_src_dfl,
)

logger.info(f"Finished materialize task...")
Expand Down
2 changes: 1 addition & 1 deletion deltacat/storage/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,7 @@ def get_partition(


def stage_delta(
data: Union[LocalTable, LocalDataset, DistributedDataset],
data: Union[LocalTable, LocalDataset, DistributedDataset, Manifest],
partition: Partition,
delta_type: DeltaType = DeltaType.UPSERT,
max_records_per_entry: Optional[int] = None,
Expand Down
You are viewing a condensed version of this merge commit. You can view the full changes here.