diff --git a/deltacat/compute/compactor/README.md b/deltacat/compute/compactor/README.md new file mode 100644 index 00000000..aa1a8ebf --- /dev/null +++ b/deltacat/compute/compactor/README.md @@ -0,0 +1,111 @@ +# DeltaCAT Compactor + +The DeltaCAT compactor provides a fast, scalable, and efficient +Log-Structure-Merge (LSM) based Change-Data-Capture (CDC) implementation using +Ray. It implements [The Flash Compactor Design](TheFlashCompactorDesign.pdf) +using DeltaCAT's portable delta catalog storage APIs. + +## User Guide +### Migrating to the DeltaCAT Compactor +Migration from your old copy-on-write CDC framework to DeltaCAT is typically +done by first running a rebase on top of your old copy-on-write compacted +results. + +A _rebase_ allows you to run compaction from source **S1** to destination **D** +on behalf of source **S2**, where **S1** and **S2** can either be the same or +different tables. More specifically, it: +1. Discards (does not read) any prior compatible round completion info and primary key indices associated with **S1**/**S2** and **D**. +2. Writes a round completion file associated with **S2** and **D** (including a primary key index for **D** and an optional rebased high watermark stream position for **S1**). +3. Saves and propagates the last-used rebase source across all subsequent round completion files. + +As part of a rebase from an alternate source or as an independent operation, +you can optionally set a rebase source high watermark stream position that will +be used as the starting stream position (exclusive) for the next compaction +round. + +For example, a table rebase can be used to more easily transition from an old +copy-on-write compactor to the DeltaCAT compactor by rebasing on top of the +results of the old copy-on-write compactor. + +If we assume `delta_source` refers to the table that both the old compactor and +the new compactor will read and merge deltas from, then your first call to +`compact_partition` should look something like this: +```python +from deltacat.compute.compactor.compaction_session import compact_partition +compact_partition( + source_partition_locator=old_compacted_partition, # S1 + destination_partition_locator=deltacat_compacted_partition, # D + primary_keys=delta_source_primary_keys, + compaction_artifact_s3_bucket=deltacat_s3_bucket, + last_stream_position_to_compact=delta_source_last_stream_position, + rebase_source_partition_locator=delta_source_partition, # S2 + rebase_source_partition_high_watermark=delta_source_last_compacted_stream_position, +) +``` + +Note that, in the above call, `delta_source_last_compacted_stream_position` +refers to the last stream position compacted into `old_compacted_partition`. + +Then, to compact subsequent incremental deltas from `delta_source` on top of +`deltacat_compacted_partition`, you simply set `source_partition_locator` to +the last rebase source: +```python +from deltacat.compute.compactor.compaction_session import compact_partition +compact_partition( + source_partition_locator=delta_source_partition, # S2 + destination_partition_locator=deltacat_compacted_partition, # D + primary_keys=delta_source_primary_keys, + compaction_artifact_s3_bucket=deltacat_s3_bucket, + last_stream_position_to_compact=delta_source_last_stream_position, +) +``` + +The first call will run an incremental compaction from +`rebase_source_partition_high_watermark` (exclusive) to +`last_stream_position_to_compact` (inclusive) while re-using the round +completion file and primary key index created during the rebased compaction. +All subsequent incremental compactions can be run the same way, and will +continue compacting from the old last stream position to compact up to the new +last stream position to compact while re-using the last compaction round's +round completion file and primary key index. + +### Discarding Cached Compaction Results +Another use-case for a compaction rebase is to ignore and overwrite any cached +results persisted from prior compaction job runs, perhaps because 1 or more +cached files were corrupted, or for testing purposes. In this case, simply set +`source_partition_locator` and `rebase_source_partition_locator` to the same +value: +```python +from deltacat.compute.compactor.compaction_session import compact_partition +compact_partition( + source_partition_locator=source_partition_to_compact, + destination_partition_locator=deltacat_compacted_partition, + primary_keys=delta_source_primary_keys, + compaction_artifact_s3_bucket=deltacat_s3_bucket, + last_stream_position_to_compact=delta_source_last_stream_position, + rebase_source_partition_locator=source_partition_to_compact, +) +``` + +This will ignore any existing round completion file or primary key index +previously produced by prior compaction rounds, and force a backfill compaction +job to run from the first delta stream position in `source_partition_locator` +up to `last_stream_position_to_compact` (inclusive). + +All subsequent incremental compactions can now run as usual by simply omitting +`rebase_source_partition_locator`: +```python +from deltacat.compute.compactor.compaction_session import compact_partition +compact_partition( + source_partition_locator=source_partition_to_compact, + destination_partition_locator=deltacat_compacted_partition, + primary_keys=delta_source_primary_keys, + compaction_artifact_s3_bucket=deltacat_s3_bucket, + last_stream_position_to_compact=delta_source_last_stream_position, +) +``` + +This will re-use the round completion file and primary key index produced by +the last compaction round, and compact all source partition deltas between +the prior invocation's `last_stream_position_to_compact` (exclusive) and this +invocation's `last_stream_position_to_compact` (inclusive). diff --git a/deltacat/compute/compactor/TheFlashCompactorDesign.pdf b/deltacat/compute/compactor/TheFlashCompactorDesign.pdf new file mode 100644 index 00000000..c5f46b10 Binary files /dev/null and b/deltacat/compute/compactor/TheFlashCompactorDesign.pdf differ diff --git a/deltacat/compute/compactor/compaction_session.py b/deltacat/compute/compactor/compaction_session.py index b90f3d17..9f473879 100644 --- a/deltacat/compute/compactor/compaction_session.py +++ b/deltacat/compute/compactor/compaction_session.py @@ -39,8 +39,8 @@ def check_preconditions( assert source_partition_locator.partition_values \ == compacted_partition_locator.partition_values, \ - "In-place compaction must use the same partition values for the " \ - "source and destination." + "In-place compaction must use the same partition values for the " \ + "source and destination." assert max_records_per_output_file >= 1, \ "Max records per output file must be a positive value" if new_hash_bucket_count is not None: @@ -55,10 +55,11 @@ def check_preconditions( def compact_partition( source_partition_locator: PartitionLocator, - compacted_partition_locator: PartitionLocator, + destination_partition_locator: PartitionLocator, primary_keys: Set[str], compaction_artifact_s3_bucket: str, last_stream_position_to_compact: int, + *, hash_bucket_count: Optional[int] = None, sort_keys: List[SortKey] = None, records_per_primary_key_index_file: int = 38_000_000, @@ -68,20 +69,22 @@ def compact_partition( min_hash_bucket_chunk_size: int = 0, compacted_file_content_type: ContentType = ContentType.PARQUET, delete_prev_primary_key_index: bool = False, - read_round_completion: bool = False, pg_config: Optional[PlacementGroupConfig] = None, schema_on_read: Optional[pa.schema] = None, # TODO (ricmiyam): Remove this and retrieve schema from storage API - deltacat_storage=unimplemented_deltacat_storage): + rebase_source_partition_locator: Optional[PartitionLocator] = None, + rebase_source_partition_high_watermark: Optional[int] = None, + deltacat_storage=unimplemented_deltacat_storage) -> Optional[str]: logger.info(f"Starting compaction session for: {source_partition_locator}") partition = None compaction_rounds_executed = 0 has_next_compaction_round = True + new_rcf_s3_url = None while has_next_compaction_round: - has_next_compaction_round, new_partition, new_rci = \ + has_next_compaction_round, new_partition, new_rci, new_rcf_s3_url = \ _execute_compaction_round( source_partition_locator, - compacted_partition_locator, + destination_partition_locator, primary_keys, compaction_artifact_s3_bucket, last_stream_position_to_compact, @@ -94,14 +97,15 @@ def compact_partition( min_hash_bucket_chunk_size, compacted_file_content_type, delete_prev_primary_key_index, - read_round_completion, + pg_config, schema_on_read, - deltacat_storage=deltacat_storage, - pg_config=pg_config + rebase_source_partition_locator, + rebase_source_partition_high_watermark, + deltacat_storage, ) if new_partition: partition = new_partition - compacted_partition_locator = new_partition.locator + destination_partition_locator = new_partition.locator compaction_rounds_executed += 1 # Take new primary key index sizes into account for subsequent compaction rounds and their dedupe steps if new_rci: @@ -114,6 +118,7 @@ def compact_partition( partition = deltacat_storage.commit_partition(partition) logger.info(f"Committed compacted partition: {partition}") logger.info(f"Completed compaction session for: {source_partition_locator}") + return new_rcf_s3_url def _execute_compaction_round( @@ -131,12 +136,16 @@ def _execute_compaction_round( min_hash_bucket_chunk_size: int, compacted_file_content_type: ContentType, delete_prev_primary_key_index: bool, - read_round_completion: bool, + pg_config: Optional[PlacementGroupConfig], schema_on_read: Optional[pa.schema], - deltacat_storage = unimplemented_deltacat_storage, - pg_config: Optional[PlacementGroupConfig] = None) \ - -> Tuple[bool, Optional[Partition], Optional[RoundCompletionInfo]]: - + rebase_source_partition_locator: Optional[PartitionLocator], + rebase_source_partition_high_watermark: Optional[int], + deltacat_storage=unimplemented_deltacat_storage) \ + -> Tuple[ + bool, + Optional[Partition], + Optional[RoundCompletionInfo], + Optional[str]]: if not primary_keys: # TODO (pdames): run simple rebatch to reduce all deltas into 1 delta @@ -166,23 +175,23 @@ def _execute_compaction_round( cluster_resources = ray.cluster_resources() logger.info(f"Total cluster resources: {cluster_resources}") node_resource_keys = None - if pg_config: # use resource in each placement group + if pg_config: # use resource in each placement group cluster_resources = pg_config.resource cluster_cpus = cluster_resources['CPU'] - else: # use all cluster resource + else: # use all cluster resource logger.info(f"Available cluster resources: {ray.available_resources()}") cluster_cpus = int(cluster_resources["CPU"]) logger.info(f"Total cluster CPUs: {cluster_cpus}") node_resource_keys = live_node_resource_keys() logger.info(f"Found {len(node_resource_keys)} live cluster nodes: " - f"{node_resource_keys}") + f"{node_resource_keys}") # create a remote options provider to round-robin tasks across all nodes or allocated bundles logger.info(f"Setting round robin scheduling with node id:{node_resource_keys}") round_robin_opt_provider = functools.partial( round_robin_options_provider, resource_keys=node_resource_keys, - pg_config = pg_config.opts if pg_config else None + pg_config=pg_config.opts if pg_config else None ) # assign a distinct index to each node in the cluster @@ -213,9 +222,10 @@ def _execute_compaction_round( # read the results from any previously completed compaction round that used # a compatible primary key index round_completion_info = None - if read_round_completion: - logger.info(f"Reading round completion file for compatible " - f"primary key index root path {compatible_primary_key_index_root_path}") + if not rebase_source_partition_locator: + logger.info( + f"Reading round completion file for compatible " + f"primary key index root path: {compatible_primary_key_index_root_path}") round_completion_info = rcf.read_round_completion_file( compaction_artifact_s3_bucket, source_partition_locator, @@ -232,6 +242,11 @@ def _execute_compaction_round( .primary_key_index_version_meta \ .hash_bucket_count min_pk_index_pa_bytes = round_completion_info.pk_index_pyarrow_write_result.pyarrow_bytes + else: + logger.info(f"No prior round info read. Source partition: " + f"{source_partition_locator}. Primary key index locator: " + f"{compatible_primary_key_index_locator}. Rebase source " + f"partition locator: {rebase_source_partition_locator}") # use the new hash bucket count if provided, or fall back to old count hash_bucket_count = new_hash_bucket_count \ @@ -251,7 +266,7 @@ def _execute_compaction_round( if not input_deltas: logger.info("No input deltas found to compact.") - return False, None, None + return False, None, None, None # limit the input deltas to fit on this cluster and convert them to # annotated deltas of equivalent size for easy parallel distribution @@ -268,8 +283,8 @@ def _execute_compaction_round( ) assert hash_bucket_count is not None and hash_bucket_count > 0, \ - f"Unexpected Error: Default hash bucket count ({hash_bucket_count}) " \ - f"is invalid." + f"Expected hash bucket count to be a positive integer, but found " \ + f"`{hash_bucket_count}`" # rehash the primary key index if necessary if round_completion_info: @@ -278,7 +293,7 @@ def _execute_compaction_round( # will need to be rehashed if the hash bucket count has changed if hash_bucket_count != old_hash_bucket_count: # TODO(draghave): manually test the path after prior primary key - # index was already built + # index was already built round_completion_info = pki.rehash( round_robin_opt_provider, compaction_artifact_s3_bucket, @@ -289,10 +304,6 @@ def _execute_compaction_round( records_per_primary_key_index_file, delete_prev_primary_key_index, ) - else: - logger.info(f"No prior round completion file found. Source partition: " - f"{source_partition_locator}. Primary key index locator: " - f"{compatible_primary_key_index_locator}") # parallel step 1: # group like primary keys together by hashing them into buckets @@ -315,7 +326,7 @@ def _execute_compaction_round( for hash_group_index, object_id in enumerate(hash_group_idx_to_obj_id): if object_id: all_hash_group_idx_to_obj_id[hash_group_index].append(object_id) - hash_group_count = dedupe_task_count = len(all_hash_group_idx_to_obj_id) + hash_group_count = len(all_hash_group_idx_to_obj_id) logger.info(f"Hash bucket groups created: {hash_group_count}") # TODO (pdames): when resources are freed during the last round of hash @@ -355,7 +366,6 @@ def _execute_compaction_round( new_pki_version_locator = PrimaryKeyIndexVersionLocator.generate( new_primary_key_index_version_meta) - # parallel step 2: # discover records with duplicate primary keys in each hash bucket, and # identify the index of records to keep or drop based on sort keys @@ -408,9 +418,9 @@ def _execute_compaction_round( ray_task=mat.materialize, max_parallelism=max_parallelism, options_provider=round_robin_opt_provider, - kwargs_provider=lambda index, mat_bucket_idx_to_obj_id: { - "mat_bucket_index": mat_bucket_idx_to_obj_id[0], - "dedupe_task_idx_and_obj_id_tuples": mat_bucket_idx_to_obj_id[1], + kwargs_provider=lambda index, mat_bucket_index_to_obj_id: { + "mat_bucket_index": mat_bucket_index_to_obj_id[0], + "dedupe_task_idx_and_obj_id_tuples": mat_bucket_index_to_obj_id[1], }, schema=schema_on_read, round_completion_info=round_completion_info, @@ -435,24 +445,34 @@ def _execute_compaction_round( compacted_delta.stream_position, ) - round_completion_info = RoundCompletionInfo.of( - last_stream_position_compacted, + rci_high_watermark = rebase_source_partition_high_watermark \ + if rebase_source_partition_high_watermark \ + else last_stream_position_compacted + new_round_completion_info = RoundCompletionInfo.of( + rci_high_watermark, new_compacted_delta_locator, - PyArrowWriteResult.union([m.pyarrow_write_result - for m in mat_results]), + PyArrowWriteResult.union([m.pyarrow_write_result for m in mat_results]), PyArrowWriteResult.union(pki_stats), bit_width_of_sort_keys, new_pki_version_locator, + rebase_source_partition_locator + or round_completion_info.rebase_source_partition_locator, ) - rcf.write_round_completion_file( + rcf_source_partition_locator = rebase_source_partition_locator \ + if rebase_source_partition_locator \ + else source_partition_locator + round_completion_file_s3_url = rcf.write_round_completion_file( compaction_artifact_s3_bucket, - source_partition_locator, + rcf_source_partition_locator, new_primary_key_index_root_path, - round_completion_info, + new_round_completion_info, ) - logger.info(f"partition-{source_partition_locator.partition_values},compacted at:{last_stream_position_compacted}, last position:{last_stream_position_to_compact}") + logger.info( + f"partition-{source_partition_locator.partition_values}," + f"compacted at: {last_stream_position_compacted}," + f"last position: {last_stream_position_to_compact}") return \ (last_stream_position_compacted < last_stream_position_to_compact), \ partition, \ - round_completion_info - + new_round_completion_info, \ + round_completion_file_s3_url diff --git a/deltacat/compute/compactor/model/round_completion_info.py b/deltacat/compute/compactor/model/round_completion_info.py index d7710d4d..c140f9bf 100644 --- a/deltacat/compute/compactor/model/round_completion_info.py +++ b/deltacat/compute/compactor/model/round_completion_info.py @@ -1,13 +1,13 @@ # Allow classes to use self-referencing Type hints in Python 3.7. from __future__ import annotations -from deltacat.storage import DeltaLocator +from deltacat.storage import DeltaLocator, PartitionLocator from deltacat.compute.compactor.model.pyarrow_write_result import \ PyArrowWriteResult from deltacat.compute.compactor.model.primary_key_index import \ PrimaryKeyIndexVersionLocator -from typing import Any, Dict +from typing import Any, Dict, Optional class RoundCompletionInfo(dict): @@ -17,7 +17,8 @@ def of(high_watermark: int, compacted_pyarrow_write_result: PyArrowWriteResult, pk_index_pyarrow_write_result: PyArrowWriteResult, sort_keys_bit_width: int, - primary_key_index_version_locator: PrimaryKeyIndexVersionLocator) \ + primary_key_index_version_locator: PrimaryKeyIndexVersionLocator, + rebase_source_partition_locator: Optional[PartitionLocator]) \ -> RoundCompletionInfo: rci = RoundCompletionInfo() @@ -27,6 +28,7 @@ def of(high_watermark: int, rci["pkIndexPyarrowWriteResult"] = pk_index_pyarrow_write_result rci["sortKeysBitWidth"] = sort_keys_bit_width rci["primaryKeyIndexVersionLocator"] = primary_key_index_version_locator + rci["rebaseSourcePartitionLocator"] = rebase_source_partition_locator return rci @property @@ -67,3 +69,7 @@ def primary_key_index_version_locator(self) \ self["primaryKeyIndexVersionLocator"] = val = \ PrimaryKeyIndexVersionLocator(val) return val + + @property + def rebase_source_partition_locator(self) -> Optional[PartitionLocator]: + return self.get("rebaseSourcePartitionLocator") diff --git a/deltacat/compute/compactor/utils/primary_key_index.py b/deltacat/compute/compactor/utils/primary_key_index.py index c4d923f4..3f9fad55 100644 --- a/deltacat/compute/compactor/utils/primary_key_index.py +++ b/deltacat/compute/compactor/utils/primary_key_index.py @@ -124,6 +124,7 @@ def rehash( PyArrowWriteResult.union(pki_stats), old_rci.sort_keys_bit_width, rehashed_pki_version_locator, + old_rci.rebase_source_partition_locator, ) rcf.write_round_completion_file( s3_bucket, diff --git a/deltacat/compute/compactor/utils/round_completion_file.py b/deltacat/compute/compactor/utils/round_completion_file.py index 2346b69e..a9289c8c 100644 --- a/deltacat/compute/compactor/utils/round_completion_file.py +++ b/deltacat/compute/compactor/utils/round_completion_file.py @@ -43,7 +43,7 @@ def write_round_completion_file( bucket: str, source_partition_locator: PartitionLocator, primary_key_index_root_path: str, - round_completion_info: RoundCompletionInfo): + round_completion_info: RoundCompletionInfo) -> str: from deltacat.aws import s3u as s3_utils logger.info( @@ -61,3 +61,4 @@ def write_round_completion_file( ) logger.info( f"round completion file written to: {round_completion_file_s3_url}") + return round_completion_file_s3_url