Skip to content

Commit

Permalink
Fix placement group to resolve variables (#68)
Browse files Browse the repository at this point in the history
* Fix placement group to resolve variables

* fix the exception in _config
  • Loading branch information
raghumdani authored Feb 4, 2023
1 parent d4b307c commit 3196cef
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 66 deletions.
6 changes: 3 additions & 3 deletions deltacat/compute/compactor/compaction_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from deltacat.compute.compactor.utils import round_completion_file as rcf, io, \
primary_key_index as pki
from deltacat.types.media import ContentType
from deltacat.utils.placement import PlacementGroupResource
from deltacat.utils.placement import PlacementGroupConfig
from typing import List, Set, Optional, Tuple, Dict

import pyarrow as pa
Expand Down Expand Up @@ -69,7 +69,7 @@ def compact_partition(
compacted_file_content_type: ContentType = ContentType.PARQUET,
delete_prev_primary_key_index: bool = False,
read_round_completion: bool = False,
pg_config: Optional[PlacementGroupResource] = None,
pg_config: Optional[PlacementGroupConfig] = None,
schema_on_read: Optional[pa.schema] = None, # TODO (ricmiyam): Remove this and retrieve schema from storage API
deltacat_storage=unimplemented_deltacat_storage):

Expand Down Expand Up @@ -134,7 +134,7 @@ def _execute_compaction_round(
read_round_completion: bool,
schema_on_read: Optional[pa.schema],
deltacat_storage = unimplemented_deltacat_storage,
pg_config: Optional[PlacementGroupResource] = None) \
pg_config: Optional[PlacementGroupConfig] = None) \
-> Tuple[bool, Optional[Partition], Optional[RoundCompletionInfo]]:


Expand Down
118 changes: 60 additions & 58 deletions deltacat/utils/placement.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import time
import yaml
import logging
from dataclasses import dataclass
from typing import Optional, Union, List, Dict, Any, Callable, Tuple
from ray.util.placement_group import (
placement_group,
Expand All @@ -22,6 +23,12 @@
#Must run on driver or head node bc state.api needs to query dashboard api server at 127.0.0.1.
#Issue: https://github.com/ray-project/ray/issues/29959

@dataclass
class PlacementGroupConfig():
def __init__(self, opts, resource):
self.opts = opts
self.resource = resource

class NodeGroupManager():

def __init__(self,path: str, gname: str):
Expand Down Expand Up @@ -159,17 +166,6 @@ def get_group_by_name(self, gname: str) -> Optional[Dict[str, Union[str, float]]
logger.info(f"There is no available resources for {gname}")
return None
return group_res

class PlacementGroupResource():
def __init__(self, opts, resource):
self.opts = opts
self.resource = resource
@property
def opts(self):
return self.opts
@property
def resource(self):
return self.resource

class PlacementGroupManager():
"""Placement Group Manager
Expand All @@ -188,13 +184,18 @@ class PlacementGroupManager():
num_pgs: number of placement groups to be created
instance_cpus: number of cpus per instance
"""
def __init__(self, num_pgs: int, total_cpus_per_pg: int, cpu_per_bundle: int, strategy="SPREAD",capture_child_tasks=True, time_out: Optional[float] = None):
head_res_key = get_current_node_resource_key()
def __init__(self, num_pgs: int,
total_cpus_per_pg: int,
cpu_per_bundle: int,
strategy="SPREAD",
capture_child_tasks=True):
head_res_key = self.get_current_node_resource_key()
#run the task on head and consume a fractional cpu, so that pg can be created on non-head node
#if cpu_per_bundle is less than the cpus per node, the pg can still be created on head
#curent assumption is that the cpu_per_bundle = cpus per node
#TODO: figure out how to create pg on non-head explicitly
self._pg_configs = ray.get([_config.options(resources={head_res_key:0.01}).remote(total_cpus_per_pg, cpu_per_bundle,strategy,capture_child_tasks) for i in range(num_pgs)])
self._pg_configs = ray.get([_config.options(resources={head_res_key:0.01}).remote(total_cpus_per_pg, \
cpu_per_bundle, strategy, capture_child_tasks) for i in range(num_pgs)])
#TODO: handle the cases where cpu_per_bundle is larger than max cpus per node, support it on ec2/flex/manta

@property
Expand All @@ -204,52 +205,53 @@ def pgs(self):
def get_current_node_resource_key(self) -> str:
#on ec2: address="172.31.34.51:6379"
#on manta: address = "2600:1f10:4674:6815:aadb:2dc8:de61:bc8e:6379"
current_node_name = ray.experimental.internal_kv.global_gcs_client.address[:-5]
for node in ray.nodes():
if node["NodeName"] == current_node_name:
# Found the node.
for key in node["Resources"].keys():
if key.startswith("node:"):
return key
current_node_name = ray.experimental.internal_kv.global_gcs_client.address[:-5]
for node in ray.nodes():
if node["NodeName"] == current_node_name:
# Found the node.
for key in node["Resources"].keys():
if key.startswith("node:"):
return key

@ray.remote(num_cpus=0.01)
def _config(total_cpus_per_pg: int, cpu_per_node: int, strategy="SPREAD",capture_child_tasks=True,time_out: Optional[float] = None) -> Tuple[Dict[str,Any], Dict[str,Any]]:
def _config(total_cpus_per_pg: int,
cpu_per_node: int,
strategy="SPREAD",
capture_child_tasks=True,
time_out: Optional[float] = None) -> Tuple[Dict[str,Any], Dict[str,Any]]:
pg_config = None
try:
opts ={}
cluster_resources={}
num_bundles = (int)(total_cpus_per_pg/cpu_per_node)
bundles = [{'CPU':instance_type} for i in range(num_bundles)]
pg = placement_group(bundles, strategy=strategy)
ray.get(pg.ready(), timeout=time_out)
if not pg:
return None
opts = {"scheduling_strategy":PlacementGroupSchedulingStrategy(
placement_group=pg, placement_group_capture_child_tasks=capture_child_tasks)
}
pg_id = placement_group_table(pg)['placement_group_id']
pg_details = get_placement_group(pg_id)
bundles = pg_details['bundles']
for bd in bundles:
node_ids.append(bd['node_id'])
#query available resources given list of node id
all_nodes_available_res = ray._private.state.state._available_resources_per_node()
pg_res = {'CPU':0,'memory':0,'object_store_memory':0}
for node_id in node_ids:
if node_id in all_nodes_available_res:
v = all_nodes_available_res[node_id]
node_detail = get_node(node_id)
pg_res['CPU']+=node_detail['resources_total']['CPU']
pg_res['memory']+=v['memory']
pg_res['object_store_memory']+=v['object_store_memory']
cluster_resources['CPU'] = int(pg_res['CPU'])
cluster_resources['memory'] = float(pg_res['memory'])
cluster_resources['object_store_memory'] = float(pg_res['object_store_memory'])
pg_config=PlacementGroupResource(opts,cluster_resources)
logger.info(f"pg has resources:{cluster_resources}")

except Exception as e:
logger.error(f"placement group error:{e}")
pass
opts ={}
cluster_resources={}
num_bundles = (int)(total_cpus_per_pg/cpu_per_node)
bundles = [{'CPU':cpu_per_node} for i in range(num_bundles)]
pg = placement_group(bundles, strategy=strategy)
ray.get(pg.ready(), timeout=time_out)
if not pg:
return None
opts = {"scheduling_strategy":PlacementGroupSchedulingStrategy(
placement_group=pg, placement_group_capture_child_tasks=capture_child_tasks)
}
pg_id = placement_group_table(pg)['placement_group_id']
pg_details = get_placement_group(pg_id)
bundles = pg_details['bundles']
node_ids = []
for bd in bundles:
node_ids.append(bd['node_id'])
#query available resources given list of node id
all_nodes_available_res = ray._private.state.state._available_resources_per_node()
pg_res = {'CPU':0,'memory':0,'object_store_memory':0}
for node_id in node_ids:
if node_id in all_nodes_available_res:
v = all_nodes_available_res[node_id]
node_detail = get_node(node_id)
pg_res['CPU']+=node_detail['resources_total']['CPU']
pg_res['memory']+=v['memory']
pg_res['object_store_memory']+=v['object_store_memory']
cluster_resources['CPU'] = int(pg_res['CPU'])
cluster_resources['memory'] = float(pg_res['memory'])
cluster_resources['object_store_memory'] = float(pg_res['object_store_memory'])
pg_config=PlacementGroupConfig(opts,cluster_resources)
logger.info(f"pg has resources:{cluster_resources}")

return pg_config

12 changes: 7 additions & 5 deletions deltacat/utils/ray_utils/concurrency.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from ray.types import ObjectRef

from deltacat.utils.ray_utils.runtime import current_node_resource_key
import copy

from typing import Any, Iterable, Callable, Dict, List, Tuple, Union, Optional
import itertools
Expand Down Expand Up @@ -105,11 +106,12 @@ def round_robin_options_provider(
```
"""
opts = kwargs.get("pg_config")
if opts: # use pg and bundle id for fault-tolerant round-robin
bundle_key_index = i % len(opts['scheduling_strategy'].placement_group.bundle_specs)
opts['scheduling_strategy'].placement_group_bundle_index = bundle_key_index
return opts
else: # use node id for round-robin
if opts:
new_opts = copy.deepcopy(opts)
bundle_key_index = i % len(new_opts['scheduling_strategy'].placement_group.bundle_specs)
new_opts['scheduling_strategy'].placement_group_bundle_index = bundle_key_index
return new_opts
else:
assert resource_keys, f"No resource keys given to round robin!"
resource_key_index = i % len(resource_keys)
key = resource_keys[resource_key_index]
Expand Down

0 comments on commit 3196cef

Please sign in to comment.