Skip to content

Commit

Permalink
Improve compaction fault-tolerance with refactored placement group api (
Browse files Browse the repository at this point in the history
#58)

* update

* update

* remove print

* update pg with bundle id

* merge in changes in refactor execute compaction round

* revert s3fs versions

* Update setup.py

Signed-off-by: Patrick Ames <[email protected]>

* Update deltacat/utils/ray_utils/concurrency.py

Signed-off-by: Patrick Ames <[email protected]>

---------

Signed-off-by: Patrick Ames <[email protected]>
Signed-off-by: Patrick Ames <[email protected]>
Co-authored-by: Jialin Liu <[email protected]>
Co-authored-by: Patrick Ames <[email protected]>
  • Loading branch information
3 people authored Feb 4, 2023
1 parent fbb5e86 commit d4b307c
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 48 deletions.
2 changes: 1 addition & 1 deletion deltacat/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

logs.configure_deltacat_logger(logging.getLogger(__name__))

__version__ = "0.1.6"
__version__ = "0.1.9"


__all__ = [
Expand Down
44 changes: 17 additions & 27 deletions deltacat/compute/compactor/compaction_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
from deltacat.compute.compactor.utils import round_completion_file as rcf, io, \
primary_key_index as pki
from deltacat.types.media import ContentType

from typing import List, Set, Optional, Tuple, Dict, Any
from deltacat.utils.placement import PlacementGroupResource
from typing import List, Set, Optional, Tuple, Dict

import pyarrow as pa
logger = logs.configure_deltacat_logger(logging.getLogger(__name__))
Expand Down Expand Up @@ -69,20 +69,17 @@ def compact_partition(
compacted_file_content_type: ContentType = ContentType.PARQUET,
delete_prev_primary_key_index: bool = False,
read_round_completion: bool = False,
pg_config: Optional[List[Dict[str, Any]]] = None,
pg_config: Optional[PlacementGroupResource] = None,
schema_on_read: Optional[pa.schema] = None, # TODO (ricmiyam): Remove this and retrieve schema from storage API
deltacat_storage=unimplemented_deltacat_storage):

logger.info(f"Starting compaction session for: {source_partition_locator}")
partition = None
compaction_rounds_executed = 0
has_next_compaction_round = True
opts={}
if pg_config:
opts=pg_config[0]
while has_next_compaction_round:
has_next_compaction_round_obj, new_partition_obj, new_rci_obj = \
_execute_compaction_round.options(**opts).remote(
has_next_compaction_round, new_partition, new_rci = \
_execute_compaction_round(
source_partition_locator,
compacted_partition_locator,
primary_keys,
Expand All @@ -102,9 +99,6 @@ def compact_partition(
deltacat_storage=deltacat_storage,
pg_config=pg_config
)
has_next_compaction_round = ray.get(has_next_compaction_round_obj)
new_partition = ray.get(new_partition_obj)
new_rci = ray.get(new_rci_obj)
if new_partition:
partition = new_partition
compacted_partition_locator = new_partition.locator
Expand All @@ -121,7 +115,7 @@ def compact_partition(
logger.info(f"Committed compacted partition: {partition}")
logger.info(f"Completed compaction session for: {source_partition_locator}")

@ray.remote(num_cpus=0.1,num_returns=3)

def _execute_compaction_round(
source_partition_locator: PartitionLocator,
compacted_partition_locator: PartitionLocator,
Expand All @@ -140,7 +134,7 @@ def _execute_compaction_round(
read_round_completion: bool,
schema_on_read: Optional[pa.schema],
deltacat_storage = unimplemented_deltacat_storage,
pg_config: Optional[List[Dict[str, Any]]] = None) \
pg_config: Optional[PlacementGroupResource] = None) \
-> Tuple[bool, Optional[Partition], Optional[RoundCompletionInfo]]:


Expand Down Expand Up @@ -169,13 +163,11 @@ def _execute_compaction_round(
# sort primary keys to produce the same pk digest regardless of input order
primary_keys = sorted(primary_keys)

# collect node group resources

cluster_resources = ray.cluster_resources()
logger.info(f"Total cluster resources: {cluster_resources}")
node_resource_keys = None
if pg_config: # use resource in each placement group
node_resource_keys=None
cluster_resources = pg_config[1]
cluster_resources = pg_config.resource
cluster_cpus = cluster_resources['CPU']
else: # use all cluster resource
logger.info(f"Available cluster resources: {ray.available_resources()}")
Expand All @@ -185,16 +177,14 @@ def _execute_compaction_round(
logger.info(f"Found {len(node_resource_keys)} live cluster nodes: "
f"{node_resource_keys}")

if node_resource_keys:
# create a remote options provider to round-robin tasks across all nodes
logger.info(f"Setting round robin scheduling with node id:{node_resource_keys}")
round_robin_opt_provider = functools.partial(
round_robin_options_provider,
resource_keys=node_resource_keys,
)
else:
logger.info("Setting round robin scheduling to None")
round_robin_opt_provider = None
# create a remote options provider to round-robin tasks across all nodes or allocated bundles
logger.info(f"Setting round robin scheduling with node id:{node_resource_keys}")
round_robin_opt_provider = functools.partial(
round_robin_options_provider,
resource_keys=node_resource_keys,
pg_config = pg_config.opts if pg_config else None
)

# assign a distinct index to each node in the cluster
# head_node_ip = urllib.request.urlopen(
# "http://169.254.169.254/latest/meta-data/local-ipv4"
Expand Down
49 changes: 33 additions & 16 deletions deltacat/utils/placement.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@


from deltacat import logs
from deltacat.utils.ray_utils.runtime import live_node_resource_keys
logger = logs.configure_deltacat_logger(logging.getLogger(__name__))

#Limitation of current node group or placement group manager
Expand Down Expand Up @@ -158,7 +159,17 @@ def get_group_by_name(self, gname: str) -> Optional[Dict[str, Union[str, float]]
logger.info(f"There is no available resources for {gname}")
return None
return group_res


class PlacementGroupResource():
def __init__(self, opts, resource):
self.opts = opts
self.resource = resource
@property
def opts(self):
return self.opts
@property
def resource(self):
return self.resource

class PlacementGroupManager():
"""Placement Group Manager
Expand All @@ -177,58 +188,64 @@ class PlacementGroupManager():
num_pgs: number of placement groups to be created
instance_cpus: number of cpus per instance
"""
def __init__(self, num_pgs: int, instance_cpus: int, instance_type: int = 8, time_out: Optional[float] = None):
head_res_key = self.get_current_node_resource_key()
self._pg_configs = ray.get([_config.options(resources={head_res_key:0.01}).remote(instance_cpus, instance_type) for _ in range(num_pgs)])
def __init__(self, num_pgs: int, total_cpus_per_pg: int, cpu_per_bundle: int, strategy="SPREAD",capture_child_tasks=True, time_out: Optional[float] = None):
head_res_key = get_current_node_resource_key()
#run the task on head and consume a fractional cpu, so that pg can be created on non-head node
#if cpu_per_bundle is less than the cpus per node, the pg can still be created on head
#curent assumption is that the cpu_per_bundle = cpus per node
#TODO: figure out how to create pg on non-head explicitly
self._pg_configs = ray.get([_config.options(resources={head_res_key:0.01}).remote(total_cpus_per_pg, cpu_per_bundle,strategy,capture_child_tasks) for i in range(num_pgs)])
#TODO: handle the cases where cpu_per_bundle is larger than max cpus per node, support it on ec2/flex/manta

@property
def pgs(self):
return self._pg_configs

def get_current_node_resource_key(self) -> str:
current_node_id = ray.get_runtime_context().node_id.hex()
#on ec2: address="172.31.34.51:6379"
#on manta: address = "2600:1f10:4674:6815:aadb:2dc8:de61:bc8e:6379"
current_node_name = ray.experimental.internal_kv.global_gcs_client.address[:-5]
for node in ray.nodes():
if node["NodeID"] == current_node_id:
if node["NodeName"] == current_node_name:
# Found the node.
for key in node["Resources"].keys():
if key.startswith("node:"):
return key

@ray.remote(num_cpus=0.01)
def _config(instance_cpus: int, instance_type: int, time_out: Optional[float] = None) -> Tuple[Dict[str,Any], Dict[str,Any]]:
def _config(total_cpus_per_pg: int, cpu_per_node: int, strategy="SPREAD",capture_child_tasks=True,time_out: Optional[float] = None) -> Tuple[Dict[str,Any], Dict[str,Any]]:
pg_config = None
try:
opts ={}
cluster_resources={}
num_bundles = (int)(instance_cpus/instance_type)
bundles = [{'CPU':instance_type} for _ in range(num_bundles)]
pg = placement_group(bundles, strategy="SPREAD")
num_bundles = (int)(total_cpus_per_pg/cpu_per_node)
bundles = [{'CPU':instance_type} for i in range(num_bundles)]
pg = placement_group(bundles, strategy=strategy)
ray.get(pg.ready(), timeout=time_out)
if not pg:
return None
opts = {"scheduling_strategy":PlacementGroupSchedulingStrategy(
placement_group=pg, placement_group_capture_child_tasks=True)
placement_group=pg, placement_group_capture_child_tasks=capture_child_tasks)
}
pg_id = placement_group_table(pg)['placement_group_id']
pg_details = get_placement_group(pg_id)
bundles = pg_details['bundles']
node_ids =[]
for bd in bundles:
node_ids.append(bd['node_id'])
#query available resources given list of node id
all_nodes_available_res = ray._private.state.state._available_resources_per_node()
pg_res = {'CPU':0,'memory':0,'object_store_memory':0,'node_id':[]}
pg_res = {'CPU':0,'memory':0,'object_store_memory':0}
for node_id in node_ids:
if node_id in all_nodes_available_res:
v = all_nodes_available_res[node_id]
node_detail = get_node(node_id)
pg_res['CPU']+=node_detail['resources_total']['CPU']
pg_res['memory']+=v['memory']
pg_res['object_store_memory']+=v['object_store_memory']
pg_res['node_id'].append(node_id)
cluster_resources['CPU'] = int(pg_res['CPU'])
cluster_resources['memory'] = float(pg_res['memory'])
cluster_resources['object_store_memory'] = float(pg_res['object_store_memory'])
cluster_resources['node_id'] = pg_res['node_id']
pg_config=[opts,cluster_resources]
pg_config=PlacementGroupResource(opts,cluster_resources)
logger.info(f"pg has resources:{cluster_resources}")

except Exception as e:
Expand Down
14 changes: 10 additions & 4 deletions deltacat/utils/ray_utils/concurrency.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,13 @@ def round_robin_options_provider(
foo.options(**opt).remote()
```
"""
assert resource_keys, f"No resource keys given to round robin!"
resource_key_index = i % len(resource_keys)
key = resource_keys[resource_key_index]
return {"resources": {key: resource_amount_provider(resource_key_index)}}
opts = kwargs.get("pg_config")
if opts: # use pg and bundle id for fault-tolerant round-robin
bundle_key_index = i % len(opts['scheduling_strategy'].placement_group.bundle_specs)
opts['scheduling_strategy'].placement_group_bundle_index = bundle_key_index
return opts
else: # use node id for round-robin
assert resource_keys, f"No resource keys given to round robin!"
resource_key_index = i % len(resource_keys)
key = resource_keys[resource_key_index]
return {"resources": {key: resource_amount_provider(resource_key_index)}}

0 comments on commit d4b307c

Please sign in to comment.