From 3196ceff82346db3b4ca8b4cfb88b86c25f9a77d Mon Sep 17 00:00:00 2001 From: Raghavendra M Dani Date: Sat, 4 Feb 2023 12:10:47 -0800 Subject: [PATCH] Fix placement group to resolve variables (#68) * Fix placement group to resolve variables * fix the exception in _config --- .../compute/compactor/compaction_session.py | 6 +- deltacat/utils/placement.py | 118 +++++++++--------- deltacat/utils/ray_utils/concurrency.py | 12 +- 3 files changed, 70 insertions(+), 66 deletions(-) diff --git a/deltacat/compute/compactor/compaction_session.py b/deltacat/compute/compactor/compaction_session.py index 00b25730..b90f3d17 100644 --- a/deltacat/compute/compactor/compaction_session.py +++ b/deltacat/compute/compactor/compaction_session.py @@ -20,7 +20,7 @@ from deltacat.compute.compactor.utils import round_completion_file as rcf, io, \ primary_key_index as pki from deltacat.types.media import ContentType -from deltacat.utils.placement import PlacementGroupResource +from deltacat.utils.placement import PlacementGroupConfig from typing import List, Set, Optional, Tuple, Dict import pyarrow as pa @@ -69,7 +69,7 @@ def compact_partition( compacted_file_content_type: ContentType = ContentType.PARQUET, delete_prev_primary_key_index: bool = False, read_round_completion: bool = False, - pg_config: Optional[PlacementGroupResource] = None, + pg_config: Optional[PlacementGroupConfig] = None, schema_on_read: Optional[pa.schema] = None, # TODO (ricmiyam): Remove this and retrieve schema from storage API deltacat_storage=unimplemented_deltacat_storage): @@ -134,7 +134,7 @@ def _execute_compaction_round( read_round_completion: bool, schema_on_read: Optional[pa.schema], deltacat_storage = unimplemented_deltacat_storage, - pg_config: Optional[PlacementGroupResource] = None) \ + pg_config: Optional[PlacementGroupConfig] = None) \ -> Tuple[bool, Optional[Partition], Optional[RoundCompletionInfo]]: diff --git a/deltacat/utils/placement.py b/deltacat/utils/placement.py index 53c65cce..c4585fb0 100644 --- a/deltacat/utils/placement.py +++ b/deltacat/utils/placement.py @@ -3,6 +3,7 @@ import time import yaml import logging +from dataclasses import dataclass from typing import Optional, Union, List, Dict, Any, Callable, Tuple from ray.util.placement_group import ( placement_group, @@ -22,6 +23,12 @@ #Must run on driver or head node bc state.api needs to query dashboard api server at 127.0.0.1. #Issue: https://github.com/ray-project/ray/issues/29959 +@dataclass +class PlacementGroupConfig(): + def __init__(self, opts, resource): + self.opts = opts + self.resource = resource + class NodeGroupManager(): def __init__(self,path: str, gname: str): @@ -159,17 +166,6 @@ def get_group_by_name(self, gname: str) -> Optional[Dict[str, Union[str, float]] logger.info(f"There is no available resources for {gname}") return None return group_res - -class PlacementGroupResource(): - def __init__(self, opts, resource): - self.opts = opts - self.resource = resource - @property - def opts(self): - return self.opts - @property - def resource(self): - return self.resource class PlacementGroupManager(): """Placement Group Manager @@ -188,13 +184,18 @@ class PlacementGroupManager(): num_pgs: number of placement groups to be created instance_cpus: number of cpus per instance """ - def __init__(self, num_pgs: int, total_cpus_per_pg: int, cpu_per_bundle: int, strategy="SPREAD",capture_child_tasks=True, time_out: Optional[float] = None): - head_res_key = get_current_node_resource_key() + def __init__(self, num_pgs: int, + total_cpus_per_pg: int, + cpu_per_bundle: int, + strategy="SPREAD", + capture_child_tasks=True): + head_res_key = self.get_current_node_resource_key() #run the task on head and consume a fractional cpu, so that pg can be created on non-head node #if cpu_per_bundle is less than the cpus per node, the pg can still be created on head #curent assumption is that the cpu_per_bundle = cpus per node #TODO: figure out how to create pg on non-head explicitly - self._pg_configs = ray.get([_config.options(resources={head_res_key:0.01}).remote(total_cpus_per_pg, cpu_per_bundle,strategy,capture_child_tasks) for i in range(num_pgs)]) + self._pg_configs = ray.get([_config.options(resources={head_res_key:0.01}).remote(total_cpus_per_pg, \ + cpu_per_bundle, strategy, capture_child_tasks) for i in range(num_pgs)]) #TODO: handle the cases where cpu_per_bundle is larger than max cpus per node, support it on ec2/flex/manta @property @@ -204,52 +205,53 @@ def pgs(self): def get_current_node_resource_key(self) -> str: #on ec2: address="172.31.34.51:6379" #on manta: address = "2600:1f10:4674:6815:aadb:2dc8:de61:bc8e:6379" - current_node_name = ray.experimental.internal_kv.global_gcs_client.address[:-5] - for node in ray.nodes(): - if node["NodeName"] == current_node_name: - # Found the node. - for key in node["Resources"].keys(): - if key.startswith("node:"): - return key + current_node_name = ray.experimental.internal_kv.global_gcs_client.address[:-5] + for node in ray.nodes(): + if node["NodeName"] == current_node_name: + # Found the node. + for key in node["Resources"].keys(): + if key.startswith("node:"): + return key @ray.remote(num_cpus=0.01) -def _config(total_cpus_per_pg: int, cpu_per_node: int, strategy="SPREAD",capture_child_tasks=True,time_out: Optional[float] = None) -> Tuple[Dict[str,Any], Dict[str,Any]]: +def _config(total_cpus_per_pg: int, + cpu_per_node: int, + strategy="SPREAD", + capture_child_tasks=True, + time_out: Optional[float] = None) -> Tuple[Dict[str,Any], Dict[str,Any]]: pg_config = None - try: - opts ={} - cluster_resources={} - num_bundles = (int)(total_cpus_per_pg/cpu_per_node) - bundles = [{'CPU':instance_type} for i in range(num_bundles)] - pg = placement_group(bundles, strategy=strategy) - ray.get(pg.ready(), timeout=time_out) - if not pg: - return None - opts = {"scheduling_strategy":PlacementGroupSchedulingStrategy( - placement_group=pg, placement_group_capture_child_tasks=capture_child_tasks) - } - pg_id = placement_group_table(pg)['placement_group_id'] - pg_details = get_placement_group(pg_id) - bundles = pg_details['bundles'] - for bd in bundles: - node_ids.append(bd['node_id']) - #query available resources given list of node id - all_nodes_available_res = ray._private.state.state._available_resources_per_node() - pg_res = {'CPU':0,'memory':0,'object_store_memory':0} - for node_id in node_ids: - if node_id in all_nodes_available_res: - v = all_nodes_available_res[node_id] - node_detail = get_node(node_id) - pg_res['CPU']+=node_detail['resources_total']['CPU'] - pg_res['memory']+=v['memory'] - pg_res['object_store_memory']+=v['object_store_memory'] - cluster_resources['CPU'] = int(pg_res['CPU']) - cluster_resources['memory'] = float(pg_res['memory']) - cluster_resources['object_store_memory'] = float(pg_res['object_store_memory']) - pg_config=PlacementGroupResource(opts,cluster_resources) - logger.info(f"pg has resources:{cluster_resources}") - - except Exception as e: - logger.error(f"placement group error:{e}") - pass + opts ={} + cluster_resources={} + num_bundles = (int)(total_cpus_per_pg/cpu_per_node) + bundles = [{'CPU':cpu_per_node} for i in range(num_bundles)] + pg = placement_group(bundles, strategy=strategy) + ray.get(pg.ready(), timeout=time_out) + if not pg: + return None + opts = {"scheduling_strategy":PlacementGroupSchedulingStrategy( + placement_group=pg, placement_group_capture_child_tasks=capture_child_tasks) + } + pg_id = placement_group_table(pg)['placement_group_id'] + pg_details = get_placement_group(pg_id) + bundles = pg_details['bundles'] + node_ids = [] + for bd in bundles: + node_ids.append(bd['node_id']) + #query available resources given list of node id + all_nodes_available_res = ray._private.state.state._available_resources_per_node() + pg_res = {'CPU':0,'memory':0,'object_store_memory':0} + for node_id in node_ids: + if node_id in all_nodes_available_res: + v = all_nodes_available_res[node_id] + node_detail = get_node(node_id) + pg_res['CPU']+=node_detail['resources_total']['CPU'] + pg_res['memory']+=v['memory'] + pg_res['object_store_memory']+=v['object_store_memory'] + cluster_resources['CPU'] = int(pg_res['CPU']) + cluster_resources['memory'] = float(pg_res['memory']) + cluster_resources['object_store_memory'] = float(pg_res['object_store_memory']) + pg_config=PlacementGroupConfig(opts,cluster_resources) + logger.info(f"pg has resources:{cluster_resources}") + return pg_config diff --git a/deltacat/utils/ray_utils/concurrency.py b/deltacat/utils/ray_utils/concurrency.py index cf3b8b3b..47fcea90 100644 --- a/deltacat/utils/ray_utils/concurrency.py +++ b/deltacat/utils/ray_utils/concurrency.py @@ -4,6 +4,7 @@ from ray.types import ObjectRef from deltacat.utils.ray_utils.runtime import current_node_resource_key +import copy from typing import Any, Iterable, Callable, Dict, List, Tuple, Union, Optional import itertools @@ -105,11 +106,12 @@ def round_robin_options_provider( ``` """ opts = kwargs.get("pg_config") - if opts: # use pg and bundle id for fault-tolerant round-robin - bundle_key_index = i % len(opts['scheduling_strategy'].placement_group.bundle_specs) - opts['scheduling_strategy'].placement_group_bundle_index = bundle_key_index - return opts - else: # use node id for round-robin + if opts: + new_opts = copy.deepcopy(opts) + bundle_key_index = i % len(new_opts['scheduling_strategy'].placement_group.bundle_specs) + new_opts['scheduling_strategy'].placement_group_bundle_index = bundle_key_index + return new_opts + else: assert resource_keys, f"No resource keys given to round robin!" resource_key_index = i % len(resource_keys) key = resource_keys[resource_key_index]