From f1a129718ef0c406265609d127f7c00a1d93543e Mon Sep 17 00:00:00 2001 From: kingeasternsun Date: Wed, 27 Nov 2024 01:10:36 +0800 Subject: [PATCH 1/4] :spark: Support Koordinator as one batch scheduler option --- .../koordinator/koordinator_scheduler.go | 83 +++++++++++++++++++ .../ray/batchscheduler/schedulermanager.go | 3 + 2 files changed, 86 insertions(+) create mode 100644 ray-operator/controllers/ray/batchscheduler/koordinator/koordinator_scheduler.go diff --git a/ray-operator/controllers/ray/batchscheduler/koordinator/koordinator_scheduler.go b/ray-operator/controllers/ray/batchscheduler/koordinator/koordinator_scheduler.go new file mode 100644 index 0000000000..7099532be9 --- /dev/null +++ b/ray-operator/controllers/ray/batchscheduler/koordinator/koordinator_scheduler.go @@ -0,0 +1,83 @@ +package koordinator + +import ( + "context" + "fmt" + "strconv" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/rest" + "sigs.k8s.io/controller-runtime/pkg/builder" + + rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" + schedulerinterface "github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler/interface" + "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils" +) + +const ( + SchedulerName string = "koordinator" + KoordinatorGangMinAvailableAnnotationName string = "gang.scheduling.koordinator.sh/min-available" + KoordinatorGangAnnotationName string = "gang.scheduling.koordinator.sh/name" +) + +type KoodinatorScheduler struct{} + +type KoordinatorSchedulerFactory struct{} + +func GetPluginName() string { + return SchedulerName +} + +func (y *KoodinatorScheduler) Name() string { + return GetPluginName() +} + +func (y *KoodinatorScheduler) DoBatchSchedulingOnSubmission(_ context.Context, _ *rayv1.RayCluster) error { + // koordinator doesn't require any resources to be created upfront + // this is a no-opt for this implementation + return nil +} + +// AddMetadataToPod adds essential labels and annotations to the Ray pods +// the koordinator scheduler needs these labels and annotations in order to do the scheduling properly +func (y *KoodinatorScheduler) AddMetadataToPod(ctx context.Context, app *rayv1.RayCluster, groupName string, pod *corev1.Pod) { + + // when gang scheduling is enabled, extra annotations need to be added to all pods + if y.isGangSchedulingEnabled(app) { + // set the task group name based on the head or worker group name + // the group name for the head and each of the worker group should be different + pod.Annotations[KoordinatorGangAnnotationName] = getAppPodGroupName(app) + pod.Annotations[KoordinatorGangMinAvailableAnnotationName] = strconv.Itoa(int(getMinAvailable(app))) + } +} + +func (y *KoodinatorScheduler) isGangSchedulingEnabled(app *rayv1.RayCluster) bool { + _, exist := app.Labels[utils.RayClusterGangSchedulingEnabled] + return exist +} + +func (yf *KoordinatorSchedulerFactory) New(_ *rest.Config) (schedulerinterface.BatchScheduler, error) { + return &KoodinatorScheduler{}, nil +} + +func (yf *KoordinatorSchedulerFactory) AddToScheme(_ *runtime.Scheme) { + // No extra scheme needs to be registered +} + +func (yf *KoordinatorSchedulerFactory) ConfigureReconciler(b *builder.Builder) *builder.Builder { + return b +} + +func getAppPodGroupName(app *rayv1.RayCluster) string { + return fmt.Sprintf("ray-%s-pg", app.Name) +} +func getMinAvailable(app *rayv1.RayCluster) int32 { + + var minAvailable int32 + for _, workerGroupSpec := range app.Spec.WorkerGroupSpecs { + minAvailable += *workerGroupSpec.MinReplicas + } + return minAvailable + 1 + +} diff --git a/ray-operator/controllers/ray/batchscheduler/schedulermanager.go b/ray-operator/controllers/ray/batchscheduler/schedulermanager.go index 8501949b7e..5976c5ae25 100644 --- a/ray-operator/controllers/ray/batchscheduler/schedulermanager.go +++ b/ray-operator/controllers/ray/batchscheduler/schedulermanager.go @@ -8,6 +8,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/builder" configapi "github.com/ray-project/kuberay/ray-operator/apis/config/v1alpha1" + "github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler/koordinator" "github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler/volcano" "github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler/yunikorn" @@ -59,6 +60,8 @@ func getSchedulerFactory(rayConfigs configapi.Configuration) (schedulerinterface factory = &volcano.VolcanoBatchSchedulerFactory{} case yunikorn.GetPluginName(): factory = &yunikorn.YuniKornSchedulerFactory{} + case koordinator.GetPluginName(): + factory = &koordinator.KoordinatorSchedulerFactory{} default: return nil, fmt.Errorf("the scheduler is not supported, name=%s", rayConfigs.BatchScheduler) } From b3e2847890cfc5632a4574e1c0414593c4341a60 Mon Sep 17 00:00:00 2001 From: kingeasternsun Date: Sat, 7 Dec 2024 16:25:00 +0800 Subject: [PATCH 2/4] support koordinator gang group with annotation Signed-off-by: kingeasternsun --- .../koordinator/koordinator_gang_groups.go | 30 +++++++++++ .../koordinator/koordinator_scheduler.go | 53 ++++++++++++++++--- 2 files changed, 76 insertions(+), 7 deletions(-) create mode 100644 ray-operator/controllers/ray/batchscheduler/koordinator/koordinator_gang_groups.go diff --git a/ray-operator/controllers/ray/batchscheduler/koordinator/koordinator_gang_groups.go b/ray-operator/controllers/ray/batchscheduler/koordinator/koordinator_gang_groups.go new file mode 100644 index 0000000000..40db010242 --- /dev/null +++ b/ray-operator/controllers/ray/batchscheduler/koordinator/koordinator_gang_groups.go @@ -0,0 +1,30 @@ +package koordinator + +import ( + rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" + "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils" +) + +func generateGangGroupName(app *rayv1.RayCluster, namespace, groupName string) string { + if namespace == "" { + namespace = "default" + } + return namespace + "/" + app.Name + "-" + groupName +} + +func newGangGroupsFromApp(app *rayv1.RayCluster) ([]string, map[string]int32) { + gangGroups := make([]string, 1+len(app.Spec.WorkerGroupSpecs)) + minMemberMap := map[string]int32{} + + gangGroups[0] = generateGangGroupName(app, app.Spec.HeadGroupSpec.Template.Namespace, utils.RayNodeHeadGroupLabelValue) + minMemberMap[utils.RayNodeHeadGroupLabelValue] = 1 + + for i, workerGroupSepc := range app.Spec.WorkerGroupSpecs { + minWorkers := workerGroupSepc.MinReplicas + gangGroups[1+i] = generateGangGroupName(app, workerGroupSepc.Template.Namespace, workerGroupSepc.GroupName) + minMemberMap[workerGroupSepc.GroupName] = *minWorkers + + } + + return gangGroups, minMemberMap +} diff --git a/ray-operator/controllers/ray/batchscheduler/koordinator/koordinator_scheduler.go b/ray-operator/controllers/ray/batchscheduler/koordinator/koordinator_scheduler.go index 7099532be9..fd420a87c7 100644 --- a/ray-operator/controllers/ray/batchscheduler/koordinator/koordinator_scheduler.go +++ b/ray-operator/controllers/ray/batchscheduler/koordinator/koordinator_scheduler.go @@ -2,12 +2,13 @@ package koordinator import ( "context" - "fmt" + "encoding/json" "strconv" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/rest" + ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" @@ -17,8 +18,13 @@ import ( const ( SchedulerName string = "koordinator" - KoordinatorGangMinAvailableAnnotationName string = "gang.scheduling.koordinator.sh/min-available" KoordinatorGangAnnotationName string = "gang.scheduling.koordinator.sh/name" + KoordinatorGangMinAvailableAnnotationName string = "gang.scheduling.koordinator.sh/min-available" + KoordinatorGangTotalNumberAnnotationName string = "gang.scheduling.koordinator.sh/total-number" + KoordinatorGangModeAnnotationName string = "gang.scheduling.koordinator.sh/mode" + KoordinatorGangGroupsAnnotationName string = "gang.scheduling.koordinator.sh/groups" + + KoordinatorGangModeStrict string = "Strict" ) type KoodinatorScheduler struct{} @@ -42,13 +48,44 @@ func (y *KoodinatorScheduler) DoBatchSchedulingOnSubmission(_ context.Context, _ // AddMetadataToPod adds essential labels and annotations to the Ray pods // the koordinator scheduler needs these labels and annotations in order to do the scheduling properly func (y *KoodinatorScheduler) AddMetadataToPod(ctx context.Context, app *rayv1.RayCluster, groupName string, pod *corev1.Pod) { + logger := ctrl.LoggerFrom(ctx).WithName(SchedulerName) // when gang scheduling is enabled, extra annotations need to be added to all pods if y.isGangSchedulingEnabled(app) { - // set the task group name based on the head or worker group name + + if pod.Annotations == nil { + pod.Annotations = make(map[string]string) + } + + // set the pod group name based on the head or worker group name // the group name for the head and each of the worker group should be different - pod.Annotations[KoordinatorGangAnnotationName] = getAppPodGroupName(app) - pod.Annotations[KoordinatorGangMinAvailableAnnotationName] = strconv.Itoa(int(getMinAvailable(app))) + // the api is define here https://koordinator.sh/docs/designs/gang-scheduling/#annotation-way + + gangGroups, minMemberMap := newGangGroupsFromApp(app) + + pod.Annotations[KoordinatorGangAnnotationName] = getAppPodGroupName(app, groupName) + pod.Annotations[KoordinatorGangMinAvailableAnnotationName] = strconv.Itoa(int(minMemberMap[groupName])) + pod.Annotations[KoordinatorGangTotalNumberAnnotationName] = pod.Annotations[KoordinatorGangMinAvailableAnnotationName] + pod.Annotations[KoordinatorGangModeAnnotationName] = KoordinatorGangModeStrict + + gangGroupAnnotationValueBytes, err := json.Marshal(gangGroups) + if err != nil { + logger.Error(err, "failed to add gang group scheduling related annotations to pod, "+ + "gang scheduling will not be enabled for this workload", + "name", pod.Name, "namespace", pod.Namespace) + return + } + + gangGroupAnnotationValue := string(gangGroupAnnotationValueBytes) + logger.Info("add task groups info to pod's annotation", + "key", KoordinatorGangGroupsAnnotationName, + "value", gangGroupAnnotationValue, + "group", pod.Annotations[KoordinatorGangAnnotationName], + "min-available", pod.Annotations[KoordinatorGangMinAvailableAnnotationName]) + + pod.Annotations[KoordinatorGangGroupsAnnotationName] = gangGroupAnnotationValue + + logger.Info("Gang Group Scheduling enabled for RayCluster") } } @@ -69,9 +106,11 @@ func (yf *KoordinatorSchedulerFactory) ConfigureReconciler(b *builder.Builder) * return b } -func getAppPodGroupName(app *rayv1.RayCluster) string { - return fmt.Sprintf("ray-%s-pg", app.Name) +func getAppPodGroupName(app *rayv1.RayCluster, groupName string) string { + + return app.Name + "-" + groupName } + func getMinAvailable(app *rayv1.RayCluster) int32 { var minAvailable int32 From d7690fa6ba937c010b3178773fc06ff0cbf16a04 Mon Sep 17 00:00:00 2001 From: kingeasternsun Date: Sat, 7 Dec 2024 22:52:16 +0800 Subject: [PATCH 3/4] add koordinator gang group ut Signed-off-by: kingeasternsun --- .../koordinator/koordinator_gang_groups.go | 26 +++-- .../koordinator/koordinator_scheduler.go | 19 +--- .../koordinator/koordinator_scheduler_test.go | 103 ++++++++++++++++++ 3 files changed, 126 insertions(+), 22 deletions(-) create mode 100644 ray-operator/controllers/ray/batchscheduler/koordinator/koordinator_scheduler_test.go diff --git a/ray-operator/controllers/ray/batchscheduler/koordinator/koordinator_gang_groups.go b/ray-operator/controllers/ray/batchscheduler/koordinator/koordinator_gang_groups.go index 40db010242..ced1f4385a 100644 --- a/ray-operator/controllers/ray/batchscheduler/koordinator/koordinator_gang_groups.go +++ b/ray-operator/controllers/ray/batchscheduler/koordinator/koordinator_gang_groups.go @@ -6,24 +6,36 @@ import ( ) func generateGangGroupName(app *rayv1.RayCluster, namespace, groupName string) string { + if namespace == "" { + namespace = app.Namespace + } if namespace == "" { namespace = "default" } - return namespace + "/" + app.Name + "-" + groupName + return namespace + "/" + getAppPodGroupName(app, groupName) +} + +type wokerGroupReplicas struct { + Replicas int32 + MinReplicas int32 } -func newGangGroupsFromApp(app *rayv1.RayCluster) ([]string, map[string]int32) { +func analyzeGangGroupsFromApp(app *rayv1.RayCluster) ([]string, map[string]wokerGroupReplicas) { gangGroups := make([]string, 1+len(app.Spec.WorkerGroupSpecs)) - minMemberMap := map[string]int32{} + minMemberMap := map[string]wokerGroupReplicas{} gangGroups[0] = generateGangGroupName(app, app.Spec.HeadGroupSpec.Template.Namespace, utils.RayNodeHeadGroupLabelValue) - minMemberMap[utils.RayNodeHeadGroupLabelValue] = 1 + minMemberMap[utils.RayNodeHeadGroupLabelValue] = wokerGroupReplicas{ + Replicas: 1, + MinReplicas: 1, + } for i, workerGroupSepc := range app.Spec.WorkerGroupSpecs { - minWorkers := workerGroupSepc.MinReplicas gangGroups[1+i] = generateGangGroupName(app, workerGroupSepc.Template.Namespace, workerGroupSepc.GroupName) - minMemberMap[workerGroupSepc.GroupName] = *minWorkers - + minMemberMap[workerGroupSepc.GroupName] = wokerGroupReplicas{ + Replicas: *(workerGroupSepc.Replicas), + MinReplicas: *(workerGroupSepc.MinReplicas), + } } return gangGroups, minMemberMap diff --git a/ray-operator/controllers/ray/batchscheduler/koordinator/koordinator_scheduler.go b/ray-operator/controllers/ray/batchscheduler/koordinator/koordinator_scheduler.go index fd420a87c7..ee84d2e32a 100644 --- a/ray-operator/controllers/ray/batchscheduler/koordinator/koordinator_scheduler.go +++ b/ray-operator/controllers/ray/batchscheduler/koordinator/koordinator_scheduler.go @@ -61,11 +61,11 @@ func (y *KoodinatorScheduler) AddMetadataToPod(ctx context.Context, app *rayv1.R // the group name for the head and each of the worker group should be different // the api is define here https://koordinator.sh/docs/designs/gang-scheduling/#annotation-way - gangGroups, minMemberMap := newGangGroupsFromApp(app) + gangGroups, minMemberMap := analyzeGangGroupsFromApp(app) pod.Annotations[KoordinatorGangAnnotationName] = getAppPodGroupName(app, groupName) - pod.Annotations[KoordinatorGangMinAvailableAnnotationName] = strconv.Itoa(int(minMemberMap[groupName])) - pod.Annotations[KoordinatorGangTotalNumberAnnotationName] = pod.Annotations[KoordinatorGangMinAvailableAnnotationName] + pod.Annotations[KoordinatorGangMinAvailableAnnotationName] = strconv.Itoa(int(minMemberMap[groupName].MinReplicas)) + pod.Annotations[KoordinatorGangTotalNumberAnnotationName] = strconv.Itoa(int(minMemberMap[groupName].Replicas)) pod.Annotations[KoordinatorGangModeAnnotationName] = KoordinatorGangModeStrict gangGroupAnnotationValueBytes, err := json.Marshal(gangGroups) @@ -107,16 +107,5 @@ func (yf *KoordinatorSchedulerFactory) ConfigureReconciler(b *builder.Builder) * } func getAppPodGroupName(app *rayv1.RayCluster, groupName string) string { - - return app.Name + "-" + groupName -} - -func getMinAvailable(app *rayv1.RayCluster) int32 { - - var minAvailable int32 - for _, workerGroupSpec := range app.Spec.WorkerGroupSpecs { - minAvailable += *workerGroupSpec.MinReplicas - } - return minAvailable + 1 - + return "ray-" + app.Name + "-" + groupName } diff --git a/ray-operator/controllers/ray/batchscheduler/koordinator/koordinator_scheduler_test.go b/ray-operator/controllers/ray/batchscheduler/koordinator/koordinator_scheduler_test.go new file mode 100644 index 0000000000..56e1813a5e --- /dev/null +++ b/ray-operator/controllers/ray/batchscheduler/koordinator/koordinator_scheduler_test.go @@ -0,0 +1,103 @@ +package koordinator + +import ( + "context" + "testing" + + rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" + "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils" + "github.com/stretchr/testify/assert" + + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func createRayClusterWithLabels(namespace, name string, labels map[string]string) *rayv1.RayCluster { + rayCluster := &rayv1.RayCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + Labels: labels, + }, + } + + return rayCluster +} + +func setHeadPodNamespace(app *rayv1.RayCluster, namespace string) { + app.Spec.HeadGroupSpec.Template.Namespace = namespace +} + +func addWorkerPodSpec(app *rayv1.RayCluster, workerGroupName string, + namespace string, replicas, minReplicas int32) { + workerGroupSpec := rayv1.WorkerGroupSpec{ + GroupName: workerGroupName, + Replicas: &replicas, + MinReplicas: &minReplicas, + } + workerGroupSpec.Template.Namespace = namespace + + app.Spec.WorkerGroupSpecs = append(app.Spec.WorkerGroupSpecs, workerGroupSpec) +} + +func createPod(namespace, name string) *v1.Pod { + return &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + Labels: make(map[string]string), + Annotations: make(map[string]string), + }, + } +} + +func TestAddMetadataToPod(t *testing.T) { + ks := &KoodinatorScheduler{} + ctx := context.Background() + // test the case when gang-scheduling is enabled + rayClusterWithGangScheduling := createRayClusterWithLabels( + "ray-namespace", + "koord", + map[string]string{ + utils.RayClusterGangSchedulingEnabled: "true", + }, + ) + + setHeadPodNamespace(rayClusterWithGangScheduling, "ns0") + addWorkerPodSpec(rayClusterWithGangScheduling, "workergroup1", "ns1", 4, 2) + addWorkerPodSpec(rayClusterWithGangScheduling, "workergroup2", "ns2", 5, 3) + + gangGroupValue := `["ns0/ray-koord-headgroup","ns1/ray-koord-workergroup1","ns2/ray-koord-workergroup2"]` + + // case 1: head pod + headPod := createPod("ns0", "head-pod") + ks.AddMetadataToPod(context.Background(), rayClusterWithGangScheduling, + utils.RayNodeHeadGroupLabelValue, headPod) + // verify the correctness of head pod + assert.Equal(t, "ray-koord-headgroup", headPod.Annotations[KoordinatorGangAnnotationName]) + assert.Equal(t, "1", headPod.Annotations[KoordinatorGangMinAvailableAnnotationName]) + assert.Equal(t, "1", headPod.Annotations[KoordinatorGangTotalNumberAnnotationName]) + assert.Equal(t, KoordinatorGangModeStrict, headPod.Annotations[KoordinatorGangModeAnnotationName]) + assert.Equal(t, gangGroupValue, headPod.Annotations[KoordinatorGangGroupsAnnotationName]) + + // case2: woker pod 1 + workerpod1 := createPod("ns1", "workerpod1") + ks.AddMetadataToPod(ctx, rayClusterWithGangScheduling, "workergroup1", workerpod1) + // verify the correctness of woker pod 1 + assert.Equal(t, "ray-koord-workergroup1", workerpod1.Annotations[KoordinatorGangAnnotationName]) + assert.Equal(t, "2", workerpod1.Annotations[KoordinatorGangMinAvailableAnnotationName]) + assert.Equal(t, "4", workerpod1.Annotations[KoordinatorGangTotalNumberAnnotationName]) + assert.Equal(t, KoordinatorGangModeStrict, workerpod1.Annotations[KoordinatorGangModeAnnotationName]) + assert.Equal(t, gangGroupValue, workerpod1.Annotations[KoordinatorGangGroupsAnnotationName]) + + // case3: woker pod 2 + workerpod2 := createPod("ns2", "workerpod2") + ks.AddMetadataToPod(ctx, rayClusterWithGangScheduling, "workergroup2", workerpod2) + // verify the correctness of woker pod 2 + assert.Equal(t, "ray-koord-workergroup2", workerpod2.Annotations[KoordinatorGangAnnotationName]) + assert.Equal(t, "3", workerpod2.Annotations[KoordinatorGangMinAvailableAnnotationName]) + assert.Equal(t, "5", workerpod2.Annotations[KoordinatorGangTotalNumberAnnotationName]) + assert.Equal(t, KoordinatorGangModeStrict, workerpod2.Annotations[KoordinatorGangModeAnnotationName]) + assert.Equal(t, gangGroupValue, workerpod2.Annotations[KoordinatorGangGroupsAnnotationName]) + +} From 440879e11d3d5ca0b8fd6072cc6478ebecd101b0 Mon Sep 17 00:00:00 2001 From: kingeasternsun Date: Sat, 21 Dec 2024 20:45:03 +0800 Subject: [PATCH 4/4] fix the nit, and add the sample of koordinator Signed-off-by: kingeasternsun --- helm-chart/kuberay-operator/values.yaml | 6 +- .../config/v1alpha1/configuration_types.go | 2 +- .../ray-cluster.koordinator-scheduler.yaml | 40 ++++++++++++ .../koordinator/koordinator_gang_groups.go | 12 ++-- .../koordinator/koordinator_scheduler.go | 61 ++++++++++--------- .../koordinator/koordinator_scheduler_test.go | 16 ++--- .../batchscheduler/schedulermanager_test.go | 11 ++++ ray-operator/main.go | 2 +- 8 files changed, 105 insertions(+), 45 deletions(-) create mode 100644 ray-operator/config/samples/ray-cluster.koordinator-scheduler.yaml diff --git a/helm-chart/kuberay-operator/values.yaml b/helm-chart/kuberay-operator/values.yaml index 7b2cc7601c..68f0c8f900 100644 --- a/helm-chart/kuberay-operator/values.yaml +++ b/helm-chart/kuberay-operator/values.yaml @@ -78,11 +78,15 @@ readinessProbe: # batchScheduler: # name: yunikorn # +# 4. Use koordinator +# batchScheduler: +# name: koordinator +# batchScheduler: # Deprecated. This option will be removed in the future. # Note, for backwards compatibility. When it sets to true, it enables volcano scheduler integration. enabled: false - # Set the customized scheduler name, supported values are "volcano" or "yunikorn", do not set + # Set the customized scheduler name, supported values are "volcano", "yunikorn" or "koordinator", do not set # "batchScheduler.enabled=true" at the same time as it will override this option. name: "" diff --git a/ray-operator/apis/config/v1alpha1/configuration_types.go b/ray-operator/apis/config/v1alpha1/configuration_types.go index 92a8f00fff..4eca45f972 100644 --- a/ray-operator/apis/config/v1alpha1/configuration_types.go +++ b/ray-operator/apis/config/v1alpha1/configuration_types.go @@ -44,7 +44,7 @@ type Configuration struct { LogStdoutEncoder string `json:"logStdoutEncoder,omitempty"` // BatchScheduler enables the batch scheduler integration with a specific scheduler - // based on the given name, currently, supported values are volcano and yunikorn. + // based on the given name, currently, supported values are volcano, yunikorn and koordinator. BatchScheduler string `json:"batchScheduler,omitempty"` // HeadSidecarContainers includes specification for a sidecar container diff --git a/ray-operator/config/samples/ray-cluster.koordinator-scheduler.yaml b/ray-operator/config/samples/ray-cluster.koordinator-scheduler.yaml new file mode 100644 index 0000000000..f0fc1353ba --- /dev/null +++ b/ray-operator/config/samples/ray-cluster.koordinator-scheduler.yaml @@ -0,0 +1,40 @@ +apiVersion: ray.io/v1 +kind: RayCluster +metadata: + name: test-koordinator-0 + labels: + ray.io/gang-scheduling-enabled: "true" +spec: + rayVersion: "2.9.0" + headGroupSpec: + rayStartParams: {} + template: + spec: + containers: + - name: ray-head + image: rayproject/ray:2.9.0 + resources: + limits: + cpu: "1" + memory: "2Gi" + requests: + cpu: "1" + memory: "2Gi" + workerGroupSpecs: + - groupName: worker + rayStartParams: {} + replicas: 2 + minReplicas: 2 + maxReplicas: 2 + template: + spec: + containers: + - name: ray-head + image: rayproject/ray:2.9.0 + resources: + limits: + cpu: "1" + memory: "1Gi" + requests: + cpu: "1" + memory: "1Gi" diff --git a/ray-operator/controllers/ray/batchscheduler/koordinator/koordinator_gang_groups.go b/ray-operator/controllers/ray/batchscheduler/koordinator/koordinator_gang_groups.go index ced1f4385a..8088ad3654 100644 --- a/ray-operator/controllers/ray/batchscheduler/koordinator/koordinator_gang_groups.go +++ b/ray-operator/controllers/ray/batchscheduler/koordinator/koordinator_gang_groups.go @@ -5,6 +5,10 @@ import ( "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils" ) +const ( + workerOffset = 1 +) + func generateGangGroupName(app *rayv1.RayCluster, namespace, groupName string) string { if namespace == "" { namespace = app.Namespace @@ -21,7 +25,7 @@ type wokerGroupReplicas struct { } func analyzeGangGroupsFromApp(app *rayv1.RayCluster) ([]string, map[string]wokerGroupReplicas) { - gangGroups := make([]string, 1+len(app.Spec.WorkerGroupSpecs)) + gangGroups := make([]string, len(app.Spec.WorkerGroupSpecs)+workerOffset) minMemberMap := map[string]wokerGroupReplicas{} gangGroups[0] = generateGangGroupName(app, app.Spec.HeadGroupSpec.Template.Namespace, utils.RayNodeHeadGroupLabelValue) @@ -31,10 +35,10 @@ func analyzeGangGroupsFromApp(app *rayv1.RayCluster) ([]string, map[string]woker } for i, workerGroupSepc := range app.Spec.WorkerGroupSpecs { - gangGroups[1+i] = generateGangGroupName(app, workerGroupSepc.Template.Namespace, workerGroupSepc.GroupName) + gangGroups[i+workerOffset] = generateGangGroupName(app, workerGroupSepc.Template.Namespace, workerGroupSepc.GroupName) minMemberMap[workerGroupSepc.GroupName] = wokerGroupReplicas{ - Replicas: *(workerGroupSepc.Replicas), - MinReplicas: *(workerGroupSepc.MinReplicas), + Replicas: *workerGroupSepc.Replicas, + MinReplicas: *workerGroupSepc.MinReplicas, } } diff --git a/ray-operator/controllers/ray/batchscheduler/koordinator/koordinator_scheduler.go b/ray-operator/controllers/ray/batchscheduler/koordinator/koordinator_scheduler.go index ee84d2e32a..91ffc292b0 100644 --- a/ray-operator/controllers/ray/batchscheduler/koordinator/koordinator_scheduler.go +++ b/ray-operator/controllers/ray/batchscheduler/koordinator/koordinator_scheduler.go @@ -48,45 +48,46 @@ func (y *KoodinatorScheduler) DoBatchSchedulingOnSubmission(_ context.Context, _ // AddMetadataToPod adds essential labels and annotations to the Ray pods // the koordinator scheduler needs these labels and annotations in order to do the scheduling properly func (y *KoodinatorScheduler) AddMetadataToPod(ctx context.Context, app *rayv1.RayCluster, groupName string, pod *corev1.Pod) { - logger := ctrl.LoggerFrom(ctx).WithName(SchedulerName) + pod.Spec.SchedulerName = y.Name() + if !y.isGangSchedulingEnabled(app) { + return + } + logger := ctrl.LoggerFrom(ctx).WithName(SchedulerName) // when gang scheduling is enabled, extra annotations need to be added to all pods - if y.isGangSchedulingEnabled(app) { - - if pod.Annotations == nil { - pod.Annotations = make(map[string]string) - } + if pod.Annotations == nil { + pod.Annotations = make(map[string]string) + } - // set the pod group name based on the head or worker group name - // the group name for the head and each of the worker group should be different - // the api is define here https://koordinator.sh/docs/designs/gang-scheduling/#annotation-way + // set the pod group name based on the head or worker group name + // the group name for the head and each of the worker group should be different + // the api is define here https://koordinator.sh/docs/designs/gang-scheduling/#annotation-way - gangGroups, minMemberMap := analyzeGangGroupsFromApp(app) + gangGroups, minMemberMap := analyzeGangGroupsFromApp(app) - pod.Annotations[KoordinatorGangAnnotationName] = getAppPodGroupName(app, groupName) - pod.Annotations[KoordinatorGangMinAvailableAnnotationName] = strconv.Itoa(int(minMemberMap[groupName].MinReplicas)) - pod.Annotations[KoordinatorGangTotalNumberAnnotationName] = strconv.Itoa(int(minMemberMap[groupName].Replicas)) - pod.Annotations[KoordinatorGangModeAnnotationName] = KoordinatorGangModeStrict + pod.Annotations[KoordinatorGangAnnotationName] = getAppPodGroupName(app, groupName) + pod.Annotations[KoordinatorGangMinAvailableAnnotationName] = strconv.Itoa(int(minMemberMap[groupName].MinReplicas)) + pod.Annotations[KoordinatorGangTotalNumberAnnotationName] = strconv.Itoa(int(minMemberMap[groupName].Replicas)) + pod.Annotations[KoordinatorGangModeAnnotationName] = KoordinatorGangModeStrict - gangGroupAnnotationValueBytes, err := json.Marshal(gangGroups) - if err != nil { - logger.Error(err, "failed to add gang group scheduling related annotations to pod, "+ - "gang scheduling will not be enabled for this workload", - "name", pod.Name, "namespace", pod.Namespace) - return - } + gangGroupAnnotationValueBytes, err := json.Marshal(gangGroups) + if err != nil { + logger.Error(err, "failed to add gang group scheduling related annotations to pod, "+ + "gang scheduling will not be enabled for this workload", + "name", pod.Name, "namespace", pod.Namespace) + return + } - gangGroupAnnotationValue := string(gangGroupAnnotationValueBytes) - logger.Info("add task groups info to pod's annotation", - "key", KoordinatorGangGroupsAnnotationName, - "value", gangGroupAnnotationValue, - "group", pod.Annotations[KoordinatorGangAnnotationName], - "min-available", pod.Annotations[KoordinatorGangMinAvailableAnnotationName]) + gangGroupAnnotationValue := string(gangGroupAnnotationValueBytes) + logger.Info("add task groups info to pod's annotation", + "key", KoordinatorGangGroupsAnnotationName, + "value", gangGroupAnnotationValue, + "group", pod.Annotations[KoordinatorGangAnnotationName], + "min-available", pod.Annotations[KoordinatorGangMinAvailableAnnotationName]) - pod.Annotations[KoordinatorGangGroupsAnnotationName] = gangGroupAnnotationValue + pod.Annotations[KoordinatorGangGroupsAnnotationName] = gangGroupAnnotationValue - logger.Info("Gang Group Scheduling enabled for RayCluster") - } + logger.Info("Gang Group Scheduling enabled for RayCluster") } func (y *KoodinatorScheduler) isGangSchedulingEnabled(app *rayv1.RayCluster) bool { diff --git a/ray-operator/controllers/ray/batchscheduler/koordinator/koordinator_scheduler_test.go b/ray-operator/controllers/ray/batchscheduler/koordinator/koordinator_scheduler_test.go index 56e1813a5e..cec380690f 100644 --- a/ray-operator/controllers/ray/batchscheduler/koordinator/koordinator_scheduler_test.go +++ b/ray-operator/controllers/ray/batchscheduler/koordinator/koordinator_scheduler_test.go @@ -56,21 +56,21 @@ func TestAddMetadataToPod(t *testing.T) { ctx := context.Background() // test the case when gang-scheduling is enabled rayClusterWithGangScheduling := createRayClusterWithLabels( - "ray-namespace", + "ray-ns", "koord", map[string]string{ utils.RayClusterGangSchedulingEnabled: "true", }, ) - setHeadPodNamespace(rayClusterWithGangScheduling, "ns0") - addWorkerPodSpec(rayClusterWithGangScheduling, "workergroup1", "ns1", 4, 2) - addWorkerPodSpec(rayClusterWithGangScheduling, "workergroup2", "ns2", 5, 3) + setHeadPodNamespace(rayClusterWithGangScheduling, "") + addWorkerPodSpec(rayClusterWithGangScheduling, "workergroup1", "", 4, 2) + addWorkerPodSpec(rayClusterWithGangScheduling, "workergroup2", "", 5, 3) - gangGroupValue := `["ns0/ray-koord-headgroup","ns1/ray-koord-workergroup1","ns2/ray-koord-workergroup2"]` + gangGroupValue := `["ray-ns/ray-koord-headgroup","ray-ns/ray-koord-workergroup1","ray-ns/ray-koord-workergroup2"]` // case 1: head pod - headPod := createPod("ns0", "head-pod") + headPod := createPod("", "head-pod") ks.AddMetadataToPod(context.Background(), rayClusterWithGangScheduling, utils.RayNodeHeadGroupLabelValue, headPod) // verify the correctness of head pod @@ -81,7 +81,7 @@ func TestAddMetadataToPod(t *testing.T) { assert.Equal(t, gangGroupValue, headPod.Annotations[KoordinatorGangGroupsAnnotationName]) // case2: woker pod 1 - workerpod1 := createPod("ns1", "workerpod1") + workerpod1 := createPod("n", "workerpod1") ks.AddMetadataToPod(ctx, rayClusterWithGangScheduling, "workergroup1", workerpod1) // verify the correctness of woker pod 1 assert.Equal(t, "ray-koord-workergroup1", workerpod1.Annotations[KoordinatorGangAnnotationName]) @@ -91,7 +91,7 @@ func TestAddMetadataToPod(t *testing.T) { assert.Equal(t, gangGroupValue, workerpod1.Annotations[KoordinatorGangGroupsAnnotationName]) // case3: woker pod 2 - workerpod2 := createPod("ns2", "workerpod2") + workerpod2 := createPod("", "workerpod2") ks.AddMetadataToPod(ctx, rayClusterWithGangScheduling, "workergroup2", workerpod2) // verify the correctness of woker pod 2 assert.Equal(t, "ray-koord-workergroup2", workerpod2.Annotations[KoordinatorGangAnnotationName]) diff --git a/ray-operator/controllers/ray/batchscheduler/schedulermanager_test.go b/ray-operator/controllers/ray/batchscheduler/schedulermanager_test.go index 1eb18255f1..a58a742378 100644 --- a/ray-operator/controllers/ray/batchscheduler/schedulermanager_test.go +++ b/ray-operator/controllers/ray/batchscheduler/schedulermanager_test.go @@ -8,6 +8,7 @@ import ( "github.com/ray-project/kuberay/ray-operator/apis/config/v1alpha1" schedulerinterface "github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler/interface" + "github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler/koordinator" "github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler/volcano" "github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler/yunikorn" ) @@ -16,6 +17,7 @@ func TestGetSchedulerFactory(t *testing.T) { DefaultFactory := &schedulerinterface.DefaultBatchSchedulerFactory{} VolcanoFactory := &volcano.VolcanoBatchSchedulerFactory{} YuniKornFactory := &yunikorn.YuniKornSchedulerFactory{} + koordFactory := &koordinator.KoordinatorSchedulerFactory{} type args struct { rayConfigs v1alpha1.Configuration @@ -74,6 +76,15 @@ func TestGetSchedulerFactory(t *testing.T) { }, want: reflect.TypeOf(YuniKornFactory), }, + { + name: "enableBatchScheduler set, batchScheduler set to koordinator", + args: args{ + rayConfigs: v1alpha1.Configuration{ + BatchScheduler: koordinator.GetPluginName(), + }, + }, + want: reflect.TypeOf(koordFactory), + }, { name: "enableBatchScheduler not set, batchScheduler set to volcano", args: args{ diff --git a/ray-operator/main.go b/ray-operator/main.go index 0446ec49e6..035407686f 100644 --- a/ray-operator/main.go +++ b/ray-operator/main.go @@ -94,7 +94,7 @@ func main() { flag.BoolVar(&enableBatchScheduler, "enable-batch-scheduler", false, "(Deprecated) Enable batch scheduler. Currently is volcano, which supports gang scheduler policy. Please use --batch-scheduler instead.") flag.StringVar(&batchScheduler, "batch-scheduler", "", - "Batch scheduler name, supported values are volcano and yunikorn.") + "Batch scheduler name, supported values are volcano, yunikorn and koordinator.") flag.StringVar(&configFile, "config", "", "Path to structured config file. Flags are ignored if config file is set.") flag.BoolVar(&useKubernetesProxy, "use-kubernetes-proxy", false, "Use Kubernetes proxy subresource when connecting to the Ray Head node.")