Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support table compaction source rebases and refactor compact_partition #70

Merged
merged 3 commits into from
Feb 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 111 additions & 0 deletions deltacat/compute/compactor/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
# DeltaCAT Compactor

The DeltaCAT compactor provides a fast, scalable, and efficient
Log-Structure-Merge (LSM) based Change-Data-Capture (CDC) implementation using
Ray. It implements [The Flash Compactor Design](TheFlashCompactorDesign.pdf)
using DeltaCAT's portable delta catalog storage APIs.

## User Guide
### Migrating to the DeltaCAT Compactor
Migration from your old copy-on-write CDC framework to DeltaCAT is typically
done by first running a rebase on top of your old copy-on-write compacted
results.

A _rebase_ allows you to run compaction from source **S1** to destination **D**
on behalf of source **S2**, where **S1** and **S2** can either be the same or
different tables. More specifically, it:
1. Discards (does not read) any prior compatible round completion info and primary key indices associated with **S1**/**S2** and **D**.
2. Writes a round completion file associated with **S2** and **D** (including a primary key index for **D** and an optional rebased high watermark stream position for **S1**).
3. Saves and propagates the last-used rebase source across all subsequent round completion files.

As part of a rebase from an alternate source or as an independent operation,
you can optionally set a rebase source high watermark stream position that will
be used as the starting stream position (exclusive) for the next compaction
round.

For example, a table rebase can be used to more easily transition from an old
copy-on-write compactor to the DeltaCAT compactor by rebasing on top of the
results of the old copy-on-write compactor.

If we assume `delta_source` refers to the table that both the old compactor and
the new compactor will read and merge deltas from, then your first call to
`compact_partition` should look something like this:
```python
from deltacat.compute.compactor.compaction_session import compact_partition
compact_partition(
source_partition_locator=old_compacted_partition, # S1
destination_partition_locator=deltacat_compacted_partition, # D
primary_keys=delta_source_primary_keys,
compaction_artifact_s3_bucket=deltacat_s3_bucket,
last_stream_position_to_compact=delta_source_last_stream_position,
rebase_source_partition_locator=delta_source_partition, # S2
rebase_source_partition_high_watermark=delta_source_last_compacted_stream_position,
)
```

Note that, in the above call, `delta_source_last_compacted_stream_position`
refers to the last stream position compacted into `old_compacted_partition`.

Then, to compact subsequent incremental deltas from `delta_source` on top of
`deltacat_compacted_partition`, you simply set `source_partition_locator` to
the last rebase source:
```python
from deltacat.compute.compactor.compaction_session import compact_partition
compact_partition(
source_partition_locator=delta_source_partition, # S2
destination_partition_locator=deltacat_compacted_partition, # D
primary_keys=delta_source_primary_keys,
compaction_artifact_s3_bucket=deltacat_s3_bucket,
last_stream_position_to_compact=delta_source_last_stream_position,
)
```

The first call will run an incremental compaction from
`rebase_source_partition_high_watermark` (exclusive) to
`last_stream_position_to_compact` (inclusive) while re-using the round
completion file and primary key index created during the rebased compaction.
All subsequent incremental compactions can be run the same way, and will
continue compacting from the old last stream position to compact up to the new
last stream position to compact while re-using the last compaction round's
round completion file and primary key index.

### Discarding Cached Compaction Results
Another use-case for a compaction rebase is to ignore and overwrite any cached
results persisted from prior compaction job runs, perhaps because 1 or more
cached files were corrupted, or for testing purposes. In this case, simply set
`source_partition_locator` and `rebase_source_partition_locator` to the same
value:
```python
from deltacat.compute.compactor.compaction_session import compact_partition
compact_partition(
source_partition_locator=source_partition_to_compact,
destination_partition_locator=deltacat_compacted_partition,
primary_keys=delta_source_primary_keys,
compaction_artifact_s3_bucket=deltacat_s3_bucket,
last_stream_position_to_compact=delta_source_last_stream_position,
rebase_source_partition_locator=source_partition_to_compact,
pdames marked this conversation as resolved.
Show resolved Hide resolved
)
```

This will ignore any existing round completion file or primary key index
previously produced by prior compaction rounds, and force a backfill compaction
job to run from the first delta stream position in `source_partition_locator`
up to `last_stream_position_to_compact` (inclusive).

All subsequent incremental compactions can now run as usual by simply omitting
`rebase_source_partition_locator`:
```python
from deltacat.compute.compactor.compaction_session import compact_partition
compact_partition(
source_partition_locator=source_partition_to_compact,
destination_partition_locator=deltacat_compacted_partition,
primary_keys=delta_source_primary_keys,
compaction_artifact_s3_bucket=deltacat_s3_bucket,
last_stream_position_to_compact=delta_source_last_stream_position,
)
```

This will re-use the round completion file and primary key index produced by
the last compaction round, and compact all source partition deltas between
the prior invocation's `last_stream_position_to_compact` (exclusive) and this
invocation's `last_stream_position_to_compact` (inclusive).
Binary file not shown.
112 changes: 66 additions & 46 deletions deltacat/compute/compactor/compaction_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ def check_preconditions(

assert source_partition_locator.partition_values \
== compacted_partition_locator.partition_values, \
"In-place compaction must use the same partition values for the " \
"source and destination."
"In-place compaction must use the same partition values for the " \
"source and destination."
assert max_records_per_output_file >= 1, \
"Max records per output file must be a positive value"
if new_hash_bucket_count is not None:
Expand All @@ -55,10 +55,11 @@ def check_preconditions(

def compact_partition(
source_partition_locator: PartitionLocator,
compacted_partition_locator: PartitionLocator,
destination_partition_locator: PartitionLocator,
primary_keys: Set[str],
compaction_artifact_s3_bucket: str,
last_stream_position_to_compact: int,
*,
hash_bucket_count: Optional[int] = None,
sort_keys: List[SortKey] = None,
records_per_primary_key_index_file: int = 38_000_000,
Expand All @@ -68,20 +69,22 @@ def compact_partition(
min_hash_bucket_chunk_size: int = 0,
compacted_file_content_type: ContentType = ContentType.PARQUET,
delete_prev_primary_key_index: bool = False,
read_round_completion: bool = False,
pg_config: Optional[PlacementGroupConfig] = None,
schema_on_read: Optional[pa.schema] = None, # TODO (ricmiyam): Remove this and retrieve schema from storage API
deltacat_storage=unimplemented_deltacat_storage):
rebase_source_partition_locator: Optional[PartitionLocator] = None,
rebase_source_partition_high_watermark: Optional[int] = None,
pdames marked this conversation as resolved.
Show resolved Hide resolved
deltacat_storage=unimplemented_deltacat_storage) -> Optional[str]:

logger.info(f"Starting compaction session for: {source_partition_locator}")
partition = None
compaction_rounds_executed = 0
has_next_compaction_round = True
new_rcf_s3_url = None
while has_next_compaction_round:
has_next_compaction_round, new_partition, new_rci = \
has_next_compaction_round, new_partition, new_rci, new_rcf_s3_url = \
_execute_compaction_round(
source_partition_locator,
compacted_partition_locator,
destination_partition_locator,
primary_keys,
compaction_artifact_s3_bucket,
last_stream_position_to_compact,
Expand All @@ -94,14 +97,15 @@ def compact_partition(
min_hash_bucket_chunk_size,
compacted_file_content_type,
delete_prev_primary_key_index,
read_round_completion,
pg_config,
schema_on_read,
deltacat_storage=deltacat_storage,
pg_config=pg_config
rebase_source_partition_locator,
rebase_source_partition_high_watermark,
deltacat_storage,
)
if new_partition:
partition = new_partition
compacted_partition_locator = new_partition.locator
destination_partition_locator = new_partition.locator
compaction_rounds_executed += 1
# Take new primary key index sizes into account for subsequent compaction rounds and their dedupe steps
if new_rci:
Expand All @@ -114,6 +118,7 @@ def compact_partition(
partition = deltacat_storage.commit_partition(partition)
logger.info(f"Committed compacted partition: {partition}")
logger.info(f"Completed compaction session for: {source_partition_locator}")
return new_rcf_s3_url


def _execute_compaction_round(
Expand All @@ -131,12 +136,16 @@ def _execute_compaction_round(
min_hash_bucket_chunk_size: int,
compacted_file_content_type: ContentType,
delete_prev_primary_key_index: bool,
read_round_completion: bool,
pg_config: Optional[PlacementGroupConfig],
schema_on_read: Optional[pa.schema],
deltacat_storage = unimplemented_deltacat_storage,
pg_config: Optional[PlacementGroupConfig] = None) \
-> Tuple[bool, Optional[Partition], Optional[RoundCompletionInfo]]:

rebase_source_partition_locator: Optional[PartitionLocator],
rebase_source_partition_high_watermark: Optional[int],
deltacat_storage=unimplemented_deltacat_storage) \
-> Tuple[
bool,
Optional[Partition],
Optional[RoundCompletionInfo],
Optional[str]]:

if not primary_keys:
# TODO (pdames): run simple rebatch to reduce all deltas into 1 delta
Expand Down Expand Up @@ -166,23 +175,23 @@ def _execute_compaction_round(
cluster_resources = ray.cluster_resources()
logger.info(f"Total cluster resources: {cluster_resources}")
node_resource_keys = None
if pg_config: # use resource in each placement group
if pg_config: # use resource in each placement group
cluster_resources = pg_config.resource
cluster_cpus = cluster_resources['CPU']
else: # use all cluster resource
else: # use all cluster resource
logger.info(f"Available cluster resources: {ray.available_resources()}")
cluster_cpus = int(cluster_resources["CPU"])
logger.info(f"Total cluster CPUs: {cluster_cpus}")
node_resource_keys = live_node_resource_keys()
logger.info(f"Found {len(node_resource_keys)} live cluster nodes: "
f"{node_resource_keys}")
f"{node_resource_keys}")

# create a remote options provider to round-robin tasks across all nodes or allocated bundles
logger.info(f"Setting round robin scheduling with node id:{node_resource_keys}")
round_robin_opt_provider = functools.partial(
round_robin_options_provider,
resource_keys=node_resource_keys,
pg_config = pg_config.opts if pg_config else None
pg_config=pg_config.opts if pg_config else None
)

# assign a distinct index to each node in the cluster
Expand Down Expand Up @@ -213,9 +222,10 @@ def _execute_compaction_round(
# read the results from any previously completed compaction round that used
# a compatible primary key index
round_completion_info = None
if read_round_completion:
logger.info(f"Reading round completion file for compatible "
f"primary key index root path {compatible_primary_key_index_root_path}")
if not rebase_source_partition_locator:
logger.info(
f"Reading round completion file for compatible "
f"primary key index root path: {compatible_primary_key_index_root_path}")
round_completion_info = rcf.read_round_completion_file(
pdames marked this conversation as resolved.
Show resolved Hide resolved
compaction_artifact_s3_bucket,
source_partition_locator,
Expand All @@ -232,6 +242,11 @@ def _execute_compaction_round(
.primary_key_index_version_meta \
.hash_bucket_count
min_pk_index_pa_bytes = round_completion_info.pk_index_pyarrow_write_result.pyarrow_bytes
else:
logger.info(f"No prior round info read. Source partition: "
f"{source_partition_locator}. Primary key index locator: "
f"{compatible_primary_key_index_locator}. Rebase source "
f"partition locator: {rebase_source_partition_locator}")

# use the new hash bucket count if provided, or fall back to old count
hash_bucket_count = new_hash_bucket_count \
Expand All @@ -251,7 +266,7 @@ def _execute_compaction_round(

if not input_deltas:
logger.info("No input deltas found to compact.")
return False, None, None
return False, None, None, None

# limit the input deltas to fit on this cluster and convert them to
# annotated deltas of equivalent size for easy parallel distribution
Expand All @@ -268,8 +283,8 @@ def _execute_compaction_round(
)

assert hash_bucket_count is not None and hash_bucket_count > 0, \
f"Unexpected Error: Default hash bucket count ({hash_bucket_count}) " \
f"is invalid."
f"Expected hash bucket count to be a positive integer, but found " \
f"`{hash_bucket_count}`"

# rehash the primary key index if necessary
if round_completion_info:
Expand All @@ -278,7 +293,7 @@ def _execute_compaction_round(
# will need to be rehashed if the hash bucket count has changed
if hash_bucket_count != old_hash_bucket_count:
# TODO(draghave): manually test the path after prior primary key
# index was already built
# index was already built
round_completion_info = pki.rehash(
round_robin_opt_provider,
compaction_artifact_s3_bucket,
Expand All @@ -289,10 +304,6 @@ def _execute_compaction_round(
records_per_primary_key_index_file,
delete_prev_primary_key_index,
)
else:
logger.info(f"No prior round completion file found. Source partition: "
f"{source_partition_locator}. Primary key index locator: "
f"{compatible_primary_key_index_locator}")

# parallel step 1:
# group like primary keys together by hashing them into buckets
Expand All @@ -315,7 +326,7 @@ def _execute_compaction_round(
for hash_group_index, object_id in enumerate(hash_group_idx_to_obj_id):
if object_id:
all_hash_group_idx_to_obj_id[hash_group_index].append(object_id)
hash_group_count = dedupe_task_count = len(all_hash_group_idx_to_obj_id)
hash_group_count = len(all_hash_group_idx_to_obj_id)
logger.info(f"Hash bucket groups created: {hash_group_count}")

# TODO (pdames): when resources are freed during the last round of hash
Expand Down Expand Up @@ -355,7 +366,6 @@ def _execute_compaction_round(
new_pki_version_locator = PrimaryKeyIndexVersionLocator.generate(
new_primary_key_index_version_meta)


# parallel step 2:
# discover records with duplicate primary keys in each hash bucket, and
# identify the index of records to keep or drop based on sort keys
Expand Down Expand Up @@ -408,9 +418,9 @@ def _execute_compaction_round(
ray_task=mat.materialize,
max_parallelism=max_parallelism,
options_provider=round_robin_opt_provider,
kwargs_provider=lambda index, mat_bucket_idx_to_obj_id: {
"mat_bucket_index": mat_bucket_idx_to_obj_id[0],
"dedupe_task_idx_and_obj_id_tuples": mat_bucket_idx_to_obj_id[1],
kwargs_provider=lambda index, mat_bucket_index_to_obj_id: {
"mat_bucket_index": mat_bucket_index_to_obj_id[0],
"dedupe_task_idx_and_obj_id_tuples": mat_bucket_index_to_obj_id[1],
},
schema=schema_on_read,
round_completion_info=round_completion_info,
Expand All @@ -435,24 +445,34 @@ def _execute_compaction_round(
compacted_delta.stream_position,
)

round_completion_info = RoundCompletionInfo.of(
last_stream_position_compacted,
rci_high_watermark = rebase_source_partition_high_watermark \
if rebase_source_partition_high_watermark \
else last_stream_position_compacted
new_round_completion_info = RoundCompletionInfo.of(
rci_high_watermark,
new_compacted_delta_locator,
PyArrowWriteResult.union([m.pyarrow_write_result
for m in mat_results]),
PyArrowWriteResult.union([m.pyarrow_write_result for m in mat_results]),
PyArrowWriteResult.union(pki_stats),
bit_width_of_sort_keys,
new_pki_version_locator,
rebase_source_partition_locator
or round_completion_info.rebase_source_partition_locator,
)
rcf.write_round_completion_file(
rcf_source_partition_locator = rebase_source_partition_locator \
if rebase_source_partition_locator \
else source_partition_locator
round_completion_file_s3_url = rcf.write_round_completion_file(
compaction_artifact_s3_bucket,
source_partition_locator,
rcf_source_partition_locator,
new_primary_key_index_root_path,
round_completion_info,
new_round_completion_info,
)
logger.info(f"partition-{source_partition_locator.partition_values},compacted at:{last_stream_position_compacted}, last position:{last_stream_position_to_compact}")
logger.info(
f"partition-{source_partition_locator.partition_values},"
f"compacted at: {last_stream_position_compacted},"
f"last position: {last_stream_position_to_compact}")
return \
(last_stream_position_compacted < last_stream_position_to_compact), \
partition, \
round_completion_info

new_round_completion_info, \
round_completion_file_s3_url
Loading