Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Koordinator as one batch scheduler option #2572

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion helm-chart/kuberay-operator/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,15 @@ readinessProbe:
# batchScheduler:
# name: yunikorn
#
# 4. Use koordinator
# batchScheduler:
# name: koordinator
#
batchScheduler:
# Deprecated. This option will be removed in the future.
# Note, for backwards compatibility. When it sets to true, it enables volcano scheduler integration.
enabled: false
# Set the customized scheduler name, supported values are "volcano" or "yunikorn", do not set
# Set the customized scheduler name, supported values are "volcano", "yunikorn" or "koordinator", do not set
# "batchScheduler.enabled=true" at the same time as it will override this option.
name: ""

Expand Down
2 changes: 1 addition & 1 deletion ray-operator/apis/config/v1alpha1/configuration_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type Configuration struct {
LogStdoutEncoder string `json:"logStdoutEncoder,omitempty"`

// BatchScheduler enables the batch scheduler integration with a specific scheduler
// based on the given name, currently, supported values are volcano and yunikorn.
// based on the given name, currently, supported values are volcano, yunikorn and koordinator.
BatchScheduler string `json:"batchScheduler,omitempty"`

// HeadSidecarContainers includes specification for a sidecar container
Expand Down
40 changes: 40 additions & 0 deletions ray-operator/config/samples/ray-cluster.koordinator-scheduler.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
apiVersion: ray.io/v1
kind: RayCluster
metadata:
name: test-koordinator-0
labels:
ray.io/gang-scheduling-enabled: "true"
spec:
rayVersion: "2.9.0"
headGroupSpec:
rayStartParams: {}
template:
spec:
containers:
- name: ray-head
image: rayproject/ray:2.9.0
resources:
limits:
cpu: "1"
memory: "2Gi"
requests:
cpu: "1"
memory: "2Gi"
workerGroupSpecs:
- groupName: worker
rayStartParams: {}
replicas: 2
minReplicas: 2
maxReplicas: 2
template:
spec:
containers:
- name: ray-head
image: rayproject/ray:2.9.0
resources:
limits:
cpu: "1"
memory: "1Gi"
requests:
cpu: "1"
memory: "1Gi"
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package koordinator

import (
rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
"github.com/ray-project/kuberay/ray-operator/controllers/ray/utils"
)

const (
workerOffset = 1
)

func generateGangGroupName(app *rayv1.RayCluster, namespace, groupName string) string {
if namespace == "" {
namespace = app.Namespace
}
if namespace == "" {
namespace = "default"
}
return namespace + "/" + getAppPodGroupName(app, groupName)
}

type wokerGroupReplicas struct {
Replicas int32
MinReplicas int32
}

func analyzeGangGroupsFromApp(app *rayv1.RayCluster) ([]string, map[string]wokerGroupReplicas) {
gangGroups := make([]string, len(app.Spec.WorkerGroupSpecs)+workerOffset)
minMemberMap := map[string]wokerGroupReplicas{}

gangGroups[0] = generateGangGroupName(app, app.Spec.HeadGroupSpec.Template.Namespace, utils.RayNodeHeadGroupLabelValue)
minMemberMap[utils.RayNodeHeadGroupLabelValue] = wokerGroupReplicas{
Replicas: 1,
MinReplicas: 1,
}

for i, workerGroupSepc := range app.Spec.WorkerGroupSpecs {
gangGroups[i+workerOffset] = generateGangGroupName(app, workerGroupSepc.Template.Namespace, workerGroupSepc.GroupName)
minMemberMap[workerGroupSepc.GroupName] = wokerGroupReplicas{
Replicas: *workerGroupSepc.Replicas,
MinReplicas: *workerGroupSepc.MinReplicas,
}
}

return gangGroups, minMemberMap
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package koordinator

import (
"context"
"encoding/json"
"strconv"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/rest"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"

rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
schedulerinterface "github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler/interface"
"github.com/ray-project/kuberay/ray-operator/controllers/ray/utils"
)

const (
SchedulerName string = "koordinator"
KoordinatorGangAnnotationName string = "gang.scheduling.koordinator.sh/name"
KoordinatorGangMinAvailableAnnotationName string = "gang.scheduling.koordinator.sh/min-available"
KoordinatorGangTotalNumberAnnotationName string = "gang.scheduling.koordinator.sh/total-number"
KoordinatorGangModeAnnotationName string = "gang.scheduling.koordinator.sh/mode"
KoordinatorGangGroupsAnnotationName string = "gang.scheduling.koordinator.sh/groups"

KoordinatorGangModeStrict string = "Strict"
)

type KoodinatorScheduler struct{}

type KoordinatorSchedulerFactory struct{}

func GetPluginName() string {
return SchedulerName
}

func (y *KoodinatorScheduler) Name() string {
return GetPluginName()
}

func (y *KoodinatorScheduler) DoBatchSchedulingOnSubmission(_ context.Context, _ *rayv1.RayCluster) error {
// koordinator doesn't require any resources to be created upfront
// this is a no-opt for this implementation
return nil
}

// AddMetadataToPod adds essential labels and annotations to the Ray pods
// the koordinator scheduler needs these labels and annotations in order to do the scheduling properly
func (y *KoodinatorScheduler) AddMetadataToPod(ctx context.Context, app *rayv1.RayCluster, groupName string, pod *corev1.Pod) {
pod.Spec.SchedulerName = y.Name()
if !y.isGangSchedulingEnabled(app) {
return
}

logger := ctrl.LoggerFrom(ctx).WithName(SchedulerName)
// when gang scheduling is enabled, extra annotations need to be added to all pods
if pod.Annotations == nil {
pod.Annotations = make(map[string]string)
}

// set the pod group name based on the head or worker group name
// the group name for the head and each of the worker group should be different
// the api is define here https://koordinator.sh/docs/designs/gang-scheduling/#annotation-way

gangGroups, minMemberMap := analyzeGangGroupsFromApp(app)

pod.Annotations[KoordinatorGangAnnotationName] = getAppPodGroupName(app, groupName)
pod.Annotations[KoordinatorGangMinAvailableAnnotationName] = strconv.Itoa(int(minMemberMap[groupName].MinReplicas))
pod.Annotations[KoordinatorGangTotalNumberAnnotationName] = strconv.Itoa(int(minMemberMap[groupName].Replicas))
pod.Annotations[KoordinatorGangModeAnnotationName] = KoordinatorGangModeStrict

gangGroupAnnotationValueBytes, err := json.Marshal(gangGroups)
if err != nil {
logger.Error(err, "failed to add gang group scheduling related annotations to pod, "+
"gang scheduling will not be enabled for this workload",
"name", pod.Name, "namespace", pod.Namespace)
return
}

gangGroupAnnotationValue := string(gangGroupAnnotationValueBytes)
logger.Info("add task groups info to pod's annotation",
"key", KoordinatorGangGroupsAnnotationName,
"value", gangGroupAnnotationValue,
"group", pod.Annotations[KoordinatorGangAnnotationName],
"min-available", pod.Annotations[KoordinatorGangMinAvailableAnnotationName])

pod.Annotations[KoordinatorGangGroupsAnnotationName] = gangGroupAnnotationValue

logger.Info("Gang Group Scheduling enabled for RayCluster")
}

func (y *KoodinatorScheduler) isGangSchedulingEnabled(app *rayv1.RayCluster) bool {
_, exist := app.Labels[utils.RayClusterGangSchedulingEnabled]
return exist
}

func (yf *KoordinatorSchedulerFactory) New(_ *rest.Config) (schedulerinterface.BatchScheduler, error) {
return &KoodinatorScheduler{}, nil
}

func (yf *KoordinatorSchedulerFactory) AddToScheme(_ *runtime.Scheme) {
// No extra scheme needs to be registered
}

func (yf *KoordinatorSchedulerFactory) ConfigureReconciler(b *builder.Builder) *builder.Builder {
return b
}

func getAppPodGroupName(app *rayv1.RayCluster, groupName string) string {
return "ray-" + app.Name + "-" + groupName
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package koordinator

import (
"context"
"testing"

rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
"github.com/ray-project/kuberay/ray-operator/controllers/ray/utils"
"github.com/stretchr/testify/assert"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func createRayClusterWithLabels(namespace, name string, labels map[string]string) *rayv1.RayCluster {
rayCluster := &rayv1.RayCluster{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
Labels: labels,
},
}

return rayCluster
}

func setHeadPodNamespace(app *rayv1.RayCluster, namespace string) {
app.Spec.HeadGroupSpec.Template.Namespace = namespace
}

func addWorkerPodSpec(app *rayv1.RayCluster, workerGroupName string,
namespace string, replicas, minReplicas int32) {
workerGroupSpec := rayv1.WorkerGroupSpec{
GroupName: workerGroupName,
Replicas: &replicas,
MinReplicas: &minReplicas,
}
workerGroupSpec.Template.Namespace = namespace

app.Spec.WorkerGroupSpecs = append(app.Spec.WorkerGroupSpecs, workerGroupSpec)
}

func createPod(namespace, name string) *v1.Pod {
return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
Labels: make(map[string]string),
Annotations: make(map[string]string),
},
}
}

func TestAddMetadataToPod(t *testing.T) {
ks := &KoodinatorScheduler{}
ctx := context.Background()
// test the case when gang-scheduling is enabled
rayClusterWithGangScheduling := createRayClusterWithLabels(
"ray-ns",
"koord",
map[string]string{
utils.RayClusterGangSchedulingEnabled: "true",
},
)

setHeadPodNamespace(rayClusterWithGangScheduling, "")
addWorkerPodSpec(rayClusterWithGangScheduling, "workergroup1", "", 4, 2)
addWorkerPodSpec(rayClusterWithGangScheduling, "workergroup2", "", 5, 3)

gangGroupValue := `["ray-ns/ray-koord-headgroup","ray-ns/ray-koord-workergroup1","ray-ns/ray-koord-workergroup2"]`

// case 1: head pod
headPod := createPod("", "head-pod")
ks.AddMetadataToPod(context.Background(), rayClusterWithGangScheduling,
utils.RayNodeHeadGroupLabelValue, headPod)
// verify the correctness of head pod
assert.Equal(t, "ray-koord-headgroup", headPod.Annotations[KoordinatorGangAnnotationName])
assert.Equal(t, "1", headPod.Annotations[KoordinatorGangMinAvailableAnnotationName])
assert.Equal(t, "1", headPod.Annotations[KoordinatorGangTotalNumberAnnotationName])
assert.Equal(t, KoordinatorGangModeStrict, headPod.Annotations[KoordinatorGangModeAnnotationName])
assert.Equal(t, gangGroupValue, headPod.Annotations[KoordinatorGangGroupsAnnotationName])

// case2: woker pod 1
workerpod1 := createPod("n", "workerpod1")
ks.AddMetadataToPod(ctx, rayClusterWithGangScheduling, "workergroup1", workerpod1)
// verify the correctness of woker pod 1
assert.Equal(t, "ray-koord-workergroup1", workerpod1.Annotations[KoordinatorGangAnnotationName])
assert.Equal(t, "2", workerpod1.Annotations[KoordinatorGangMinAvailableAnnotationName])
assert.Equal(t, "4", workerpod1.Annotations[KoordinatorGangTotalNumberAnnotationName])
assert.Equal(t, KoordinatorGangModeStrict, workerpod1.Annotations[KoordinatorGangModeAnnotationName])
assert.Equal(t, gangGroupValue, workerpod1.Annotations[KoordinatorGangGroupsAnnotationName])

// case3: woker pod 2
workerpod2 := createPod("", "workerpod2")
ks.AddMetadataToPod(ctx, rayClusterWithGangScheduling, "workergroup2", workerpod2)
// verify the correctness of woker pod 2
assert.Equal(t, "ray-koord-workergroup2", workerpod2.Annotations[KoordinatorGangAnnotationName])
assert.Equal(t, "3", workerpod2.Annotations[KoordinatorGangMinAvailableAnnotationName])
assert.Equal(t, "5", workerpod2.Annotations[KoordinatorGangTotalNumberAnnotationName])
assert.Equal(t, KoordinatorGangModeStrict, workerpod2.Annotations[KoordinatorGangModeAnnotationName])
assert.Equal(t, gangGroupValue, workerpod2.Annotations[KoordinatorGangGroupsAnnotationName])

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/builder"

configapi "github.com/ray-project/kuberay/ray-operator/apis/config/v1alpha1"
"github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler/koordinator"
"github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler/volcano"
"github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler/yunikorn"

Expand Down Expand Up @@ -59,6 +60,8 @@ func getSchedulerFactory(rayConfigs configapi.Configuration) (schedulerinterface
factory = &volcano.VolcanoBatchSchedulerFactory{}
case yunikorn.GetPluginName():
factory = &yunikorn.YuniKornSchedulerFactory{}
case koordinator.GetPluginName():
factory = &koordinator.KoordinatorSchedulerFactory{}
default:
return nil, fmt.Errorf("the scheduler is not supported, name=%s", rayConfigs.BatchScheduler)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/ray-project/kuberay/ray-operator/apis/config/v1alpha1"
schedulerinterface "github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler/interface"
"github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler/koordinator"
"github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler/volcano"
"github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler/yunikorn"
)
Expand All @@ -16,6 +17,7 @@ func TestGetSchedulerFactory(t *testing.T) {
DefaultFactory := &schedulerinterface.DefaultBatchSchedulerFactory{}
VolcanoFactory := &volcano.VolcanoBatchSchedulerFactory{}
YuniKornFactory := &yunikorn.YuniKornSchedulerFactory{}
koordFactory := &koordinator.KoordinatorSchedulerFactory{}

type args struct {
rayConfigs v1alpha1.Configuration
Expand Down Expand Up @@ -74,6 +76,15 @@ func TestGetSchedulerFactory(t *testing.T) {
},
want: reflect.TypeOf(YuniKornFactory),
},
{
name: "enableBatchScheduler set, batchScheduler set to koordinator",
args: args{
rayConfigs: v1alpha1.Configuration{
BatchScheduler: koordinator.GetPluginName(),
},
},
want: reflect.TypeOf(koordFactory),
},
{
name: "enableBatchScheduler not set, batchScheduler set to volcano",
args: args{
Expand Down
2 changes: 1 addition & 1 deletion ray-operator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func main() {
flag.BoolVar(&enableBatchScheduler, "enable-batch-scheduler", false,
"(Deprecated) Enable batch scheduler. Currently is volcano, which supports gang scheduler policy. Please use --batch-scheduler instead.")
flag.StringVar(&batchScheduler, "batch-scheduler", "",
"Batch scheduler name, supported values are volcano and yunikorn.")
"Batch scheduler name, supported values are volcano, yunikorn and koordinator.")
flag.StringVar(&configFile, "config", "", "Path to structured config file. Flags are ignored if config file is set.")
flag.BoolVar(&useKubernetesProxy, "use-kubernetes-proxy", false,
"Use Kubernetes proxy subresource when connecting to the Ray Head node.")
Expand Down