diff --git a/helm-chart/kuberay-operator/values.yaml b/helm-chart/kuberay-operator/values.yaml index 7b2cc7601c..68f0c8f900 100644 --- a/helm-chart/kuberay-operator/values.yaml +++ b/helm-chart/kuberay-operator/values.yaml @@ -78,11 +78,15 @@ readinessProbe: # batchScheduler: # name: yunikorn # +# 4. Use koordinator +# batchScheduler: +# name: koordinator +# batchScheduler: # Deprecated. This option will be removed in the future. # Note, for backwards compatibility. When it sets to true, it enables volcano scheduler integration. enabled: false - # Set the customized scheduler name, supported values are "volcano" or "yunikorn", do not set + # Set the customized scheduler name, supported values are "volcano", "yunikorn" or "koordinator", do not set # "batchScheduler.enabled=true" at the same time as it will override this option. name: "" diff --git a/ray-operator/apis/config/v1alpha1/configuration_types.go b/ray-operator/apis/config/v1alpha1/configuration_types.go index 92a8f00fff..4eca45f972 100644 --- a/ray-operator/apis/config/v1alpha1/configuration_types.go +++ b/ray-operator/apis/config/v1alpha1/configuration_types.go @@ -44,7 +44,7 @@ type Configuration struct { LogStdoutEncoder string `json:"logStdoutEncoder,omitempty"` // BatchScheduler enables the batch scheduler integration with a specific scheduler - // based on the given name, currently, supported values are volcano and yunikorn. + // based on the given name, currently, supported values are volcano, yunikorn and koordinator. BatchScheduler string `json:"batchScheduler,omitempty"` // HeadSidecarContainers includes specification for a sidecar container diff --git a/ray-operator/config/samples/ray-cluster.koordinator-scheduler.yaml b/ray-operator/config/samples/ray-cluster.koordinator-scheduler.yaml new file mode 100644 index 0000000000..f0fc1353ba --- /dev/null +++ b/ray-operator/config/samples/ray-cluster.koordinator-scheduler.yaml @@ -0,0 +1,40 @@ +apiVersion: ray.io/v1 +kind: RayCluster +metadata: + name: test-koordinator-0 + labels: + ray.io/gang-scheduling-enabled: "true" +spec: + rayVersion: "2.9.0" + headGroupSpec: + rayStartParams: {} + template: + spec: + containers: + - name: ray-head + image: rayproject/ray:2.9.0 + resources: + limits: + cpu: "1" + memory: "2Gi" + requests: + cpu: "1" + memory: "2Gi" + workerGroupSpecs: + - groupName: worker + rayStartParams: {} + replicas: 2 + minReplicas: 2 + maxReplicas: 2 + template: + spec: + containers: + - name: ray-head + image: rayproject/ray:2.9.0 + resources: + limits: + cpu: "1" + memory: "1Gi" + requests: + cpu: "1" + memory: "1Gi" diff --git a/ray-operator/controllers/ray/batchscheduler/koordinator/koordinator_gang_groups.go b/ray-operator/controllers/ray/batchscheduler/koordinator/koordinator_gang_groups.go new file mode 100644 index 0000000000..8088ad3654 --- /dev/null +++ b/ray-operator/controllers/ray/batchscheduler/koordinator/koordinator_gang_groups.go @@ -0,0 +1,46 @@ +package koordinator + +import ( + rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" + "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils" +) + +const ( + workerOffset = 1 +) + +func generateGangGroupName(app *rayv1.RayCluster, namespace, groupName string) string { + if namespace == "" { + namespace = app.Namespace + } + if namespace == "" { + namespace = "default" + } + return namespace + "/" + getAppPodGroupName(app, groupName) +} + +type wokerGroupReplicas struct { + Replicas int32 + MinReplicas int32 +} + +func analyzeGangGroupsFromApp(app *rayv1.RayCluster) ([]string, map[string]wokerGroupReplicas) { + gangGroups := make([]string, len(app.Spec.WorkerGroupSpecs)+workerOffset) + minMemberMap := map[string]wokerGroupReplicas{} + + gangGroups[0] = generateGangGroupName(app, app.Spec.HeadGroupSpec.Template.Namespace, utils.RayNodeHeadGroupLabelValue) + minMemberMap[utils.RayNodeHeadGroupLabelValue] = wokerGroupReplicas{ + Replicas: 1, + MinReplicas: 1, + } + + for i, workerGroupSepc := range app.Spec.WorkerGroupSpecs { + gangGroups[i+workerOffset] = generateGangGroupName(app, workerGroupSepc.Template.Namespace, workerGroupSepc.GroupName) + minMemberMap[workerGroupSepc.GroupName] = wokerGroupReplicas{ + Replicas: *workerGroupSepc.Replicas, + MinReplicas: *workerGroupSepc.MinReplicas, + } + } + + return gangGroups, minMemberMap +} diff --git a/ray-operator/controllers/ray/batchscheduler/koordinator/koordinator_scheduler.go b/ray-operator/controllers/ray/batchscheduler/koordinator/koordinator_scheduler.go new file mode 100644 index 0000000000..91ffc292b0 --- /dev/null +++ b/ray-operator/controllers/ray/batchscheduler/koordinator/koordinator_scheduler.go @@ -0,0 +1,112 @@ +package koordinator + +import ( + "context" + "encoding/json" + "strconv" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/rest" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" + + rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" + schedulerinterface "github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler/interface" + "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils" +) + +const ( + SchedulerName string = "koordinator" + KoordinatorGangAnnotationName string = "gang.scheduling.koordinator.sh/name" + KoordinatorGangMinAvailableAnnotationName string = "gang.scheduling.koordinator.sh/min-available" + KoordinatorGangTotalNumberAnnotationName string = "gang.scheduling.koordinator.sh/total-number" + KoordinatorGangModeAnnotationName string = "gang.scheduling.koordinator.sh/mode" + KoordinatorGangGroupsAnnotationName string = "gang.scheduling.koordinator.sh/groups" + + KoordinatorGangModeStrict string = "Strict" +) + +type KoodinatorScheduler struct{} + +type KoordinatorSchedulerFactory struct{} + +func GetPluginName() string { + return SchedulerName +} + +func (y *KoodinatorScheduler) Name() string { + return GetPluginName() +} + +func (y *KoodinatorScheduler) DoBatchSchedulingOnSubmission(_ context.Context, _ *rayv1.RayCluster) error { + // koordinator doesn't require any resources to be created upfront + // this is a no-opt for this implementation + return nil +} + +// AddMetadataToPod adds essential labels and annotations to the Ray pods +// the koordinator scheduler needs these labels and annotations in order to do the scheduling properly +func (y *KoodinatorScheduler) AddMetadataToPod(ctx context.Context, app *rayv1.RayCluster, groupName string, pod *corev1.Pod) { + pod.Spec.SchedulerName = y.Name() + if !y.isGangSchedulingEnabled(app) { + return + } + + logger := ctrl.LoggerFrom(ctx).WithName(SchedulerName) + // when gang scheduling is enabled, extra annotations need to be added to all pods + if pod.Annotations == nil { + pod.Annotations = make(map[string]string) + } + + // set the pod group name based on the head or worker group name + // the group name for the head and each of the worker group should be different + // the api is define here https://koordinator.sh/docs/designs/gang-scheduling/#annotation-way + + gangGroups, minMemberMap := analyzeGangGroupsFromApp(app) + + pod.Annotations[KoordinatorGangAnnotationName] = getAppPodGroupName(app, groupName) + pod.Annotations[KoordinatorGangMinAvailableAnnotationName] = strconv.Itoa(int(minMemberMap[groupName].MinReplicas)) + pod.Annotations[KoordinatorGangTotalNumberAnnotationName] = strconv.Itoa(int(minMemberMap[groupName].Replicas)) + pod.Annotations[KoordinatorGangModeAnnotationName] = KoordinatorGangModeStrict + + gangGroupAnnotationValueBytes, err := json.Marshal(gangGroups) + if err != nil { + logger.Error(err, "failed to add gang group scheduling related annotations to pod, "+ + "gang scheduling will not be enabled for this workload", + "name", pod.Name, "namespace", pod.Namespace) + return + } + + gangGroupAnnotationValue := string(gangGroupAnnotationValueBytes) + logger.Info("add task groups info to pod's annotation", + "key", KoordinatorGangGroupsAnnotationName, + "value", gangGroupAnnotationValue, + "group", pod.Annotations[KoordinatorGangAnnotationName], + "min-available", pod.Annotations[KoordinatorGangMinAvailableAnnotationName]) + + pod.Annotations[KoordinatorGangGroupsAnnotationName] = gangGroupAnnotationValue + + logger.Info("Gang Group Scheduling enabled for RayCluster") +} + +func (y *KoodinatorScheduler) isGangSchedulingEnabled(app *rayv1.RayCluster) bool { + _, exist := app.Labels[utils.RayClusterGangSchedulingEnabled] + return exist +} + +func (yf *KoordinatorSchedulerFactory) New(_ *rest.Config) (schedulerinterface.BatchScheduler, error) { + return &KoodinatorScheduler{}, nil +} + +func (yf *KoordinatorSchedulerFactory) AddToScheme(_ *runtime.Scheme) { + // No extra scheme needs to be registered +} + +func (yf *KoordinatorSchedulerFactory) ConfigureReconciler(b *builder.Builder) *builder.Builder { + return b +} + +func getAppPodGroupName(app *rayv1.RayCluster, groupName string) string { + return "ray-" + app.Name + "-" + groupName +} diff --git a/ray-operator/controllers/ray/batchscheduler/koordinator/koordinator_scheduler_test.go b/ray-operator/controllers/ray/batchscheduler/koordinator/koordinator_scheduler_test.go new file mode 100644 index 0000000000..cec380690f --- /dev/null +++ b/ray-operator/controllers/ray/batchscheduler/koordinator/koordinator_scheduler_test.go @@ -0,0 +1,103 @@ +package koordinator + +import ( + "context" + "testing" + + rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" + "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils" + "github.com/stretchr/testify/assert" + + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func createRayClusterWithLabels(namespace, name string, labels map[string]string) *rayv1.RayCluster { + rayCluster := &rayv1.RayCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + Labels: labels, + }, + } + + return rayCluster +} + +func setHeadPodNamespace(app *rayv1.RayCluster, namespace string) { + app.Spec.HeadGroupSpec.Template.Namespace = namespace +} + +func addWorkerPodSpec(app *rayv1.RayCluster, workerGroupName string, + namespace string, replicas, minReplicas int32) { + workerGroupSpec := rayv1.WorkerGroupSpec{ + GroupName: workerGroupName, + Replicas: &replicas, + MinReplicas: &minReplicas, + } + workerGroupSpec.Template.Namespace = namespace + + app.Spec.WorkerGroupSpecs = append(app.Spec.WorkerGroupSpecs, workerGroupSpec) +} + +func createPod(namespace, name string) *v1.Pod { + return &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + Labels: make(map[string]string), + Annotations: make(map[string]string), + }, + } +} + +func TestAddMetadataToPod(t *testing.T) { + ks := &KoodinatorScheduler{} + ctx := context.Background() + // test the case when gang-scheduling is enabled + rayClusterWithGangScheduling := createRayClusterWithLabels( + "ray-ns", + "koord", + map[string]string{ + utils.RayClusterGangSchedulingEnabled: "true", + }, + ) + + setHeadPodNamespace(rayClusterWithGangScheduling, "") + addWorkerPodSpec(rayClusterWithGangScheduling, "workergroup1", "", 4, 2) + addWorkerPodSpec(rayClusterWithGangScheduling, "workergroup2", "", 5, 3) + + gangGroupValue := `["ray-ns/ray-koord-headgroup","ray-ns/ray-koord-workergroup1","ray-ns/ray-koord-workergroup2"]` + + // case 1: head pod + headPod := createPod("", "head-pod") + ks.AddMetadataToPod(context.Background(), rayClusterWithGangScheduling, + utils.RayNodeHeadGroupLabelValue, headPod) + // verify the correctness of head pod + assert.Equal(t, "ray-koord-headgroup", headPod.Annotations[KoordinatorGangAnnotationName]) + assert.Equal(t, "1", headPod.Annotations[KoordinatorGangMinAvailableAnnotationName]) + assert.Equal(t, "1", headPod.Annotations[KoordinatorGangTotalNumberAnnotationName]) + assert.Equal(t, KoordinatorGangModeStrict, headPod.Annotations[KoordinatorGangModeAnnotationName]) + assert.Equal(t, gangGroupValue, headPod.Annotations[KoordinatorGangGroupsAnnotationName]) + + // case2: woker pod 1 + workerpod1 := createPod("n", "workerpod1") + ks.AddMetadataToPod(ctx, rayClusterWithGangScheduling, "workergroup1", workerpod1) + // verify the correctness of woker pod 1 + assert.Equal(t, "ray-koord-workergroup1", workerpod1.Annotations[KoordinatorGangAnnotationName]) + assert.Equal(t, "2", workerpod1.Annotations[KoordinatorGangMinAvailableAnnotationName]) + assert.Equal(t, "4", workerpod1.Annotations[KoordinatorGangTotalNumberAnnotationName]) + assert.Equal(t, KoordinatorGangModeStrict, workerpod1.Annotations[KoordinatorGangModeAnnotationName]) + assert.Equal(t, gangGroupValue, workerpod1.Annotations[KoordinatorGangGroupsAnnotationName]) + + // case3: woker pod 2 + workerpod2 := createPod("", "workerpod2") + ks.AddMetadataToPod(ctx, rayClusterWithGangScheduling, "workergroup2", workerpod2) + // verify the correctness of woker pod 2 + assert.Equal(t, "ray-koord-workergroup2", workerpod2.Annotations[KoordinatorGangAnnotationName]) + assert.Equal(t, "3", workerpod2.Annotations[KoordinatorGangMinAvailableAnnotationName]) + assert.Equal(t, "5", workerpod2.Annotations[KoordinatorGangTotalNumberAnnotationName]) + assert.Equal(t, KoordinatorGangModeStrict, workerpod2.Annotations[KoordinatorGangModeAnnotationName]) + assert.Equal(t, gangGroupValue, workerpod2.Annotations[KoordinatorGangGroupsAnnotationName]) + +} diff --git a/ray-operator/controllers/ray/batchscheduler/schedulermanager.go b/ray-operator/controllers/ray/batchscheduler/schedulermanager.go index 8501949b7e..5976c5ae25 100644 --- a/ray-operator/controllers/ray/batchscheduler/schedulermanager.go +++ b/ray-operator/controllers/ray/batchscheduler/schedulermanager.go @@ -8,6 +8,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/builder" configapi "github.com/ray-project/kuberay/ray-operator/apis/config/v1alpha1" + "github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler/koordinator" "github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler/volcano" "github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler/yunikorn" @@ -59,6 +60,8 @@ func getSchedulerFactory(rayConfigs configapi.Configuration) (schedulerinterface factory = &volcano.VolcanoBatchSchedulerFactory{} case yunikorn.GetPluginName(): factory = &yunikorn.YuniKornSchedulerFactory{} + case koordinator.GetPluginName(): + factory = &koordinator.KoordinatorSchedulerFactory{} default: return nil, fmt.Errorf("the scheduler is not supported, name=%s", rayConfigs.BatchScheduler) } diff --git a/ray-operator/controllers/ray/batchscheduler/schedulermanager_test.go b/ray-operator/controllers/ray/batchscheduler/schedulermanager_test.go index 1eb18255f1..a58a742378 100644 --- a/ray-operator/controllers/ray/batchscheduler/schedulermanager_test.go +++ b/ray-operator/controllers/ray/batchscheduler/schedulermanager_test.go @@ -8,6 +8,7 @@ import ( "github.com/ray-project/kuberay/ray-operator/apis/config/v1alpha1" schedulerinterface "github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler/interface" + "github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler/koordinator" "github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler/volcano" "github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler/yunikorn" ) @@ -16,6 +17,7 @@ func TestGetSchedulerFactory(t *testing.T) { DefaultFactory := &schedulerinterface.DefaultBatchSchedulerFactory{} VolcanoFactory := &volcano.VolcanoBatchSchedulerFactory{} YuniKornFactory := &yunikorn.YuniKornSchedulerFactory{} + koordFactory := &koordinator.KoordinatorSchedulerFactory{} type args struct { rayConfigs v1alpha1.Configuration @@ -74,6 +76,15 @@ func TestGetSchedulerFactory(t *testing.T) { }, want: reflect.TypeOf(YuniKornFactory), }, + { + name: "enableBatchScheduler set, batchScheduler set to koordinator", + args: args{ + rayConfigs: v1alpha1.Configuration{ + BatchScheduler: koordinator.GetPluginName(), + }, + }, + want: reflect.TypeOf(koordFactory), + }, { name: "enableBatchScheduler not set, batchScheduler set to volcano", args: args{ diff --git a/ray-operator/main.go b/ray-operator/main.go index 0446ec49e6..035407686f 100644 --- a/ray-operator/main.go +++ b/ray-operator/main.go @@ -94,7 +94,7 @@ func main() { flag.BoolVar(&enableBatchScheduler, "enable-batch-scheduler", false, "(Deprecated) Enable batch scheduler. Currently is volcano, which supports gang scheduler policy. Please use --batch-scheduler instead.") flag.StringVar(&batchScheduler, "batch-scheduler", "", - "Batch scheduler name, supported values are volcano and yunikorn.") + "Batch scheduler name, supported values are volcano, yunikorn and koordinator.") flag.StringVar(&configFile, "config", "", "Path to structured config file. Flags are ignored if config file is set.") flag.BoolVar(&useKubernetesProxy, "use-kubernetes-proxy", false, "Use Kubernetes proxy subresource when connecting to the Ray Head node.")