From 848f3d77091c790925057a05349686e98f9a6466 Mon Sep 17 00:00:00 2001 From: Andrew Sy Kim Date: Fri, 20 Dec 2024 00:26:15 +0000 Subject: [PATCH] [kubectl-plugin] add create workergroup command Signed-off-by: Andrew Sy Kim --- kubectl-plugin/pkg/cmd/create/create.go | 1 + .../pkg/cmd/create/create_cluster.go | 8 +- .../pkg/cmd/create/create_workergroup.go | 164 ++++++++++++++++++ kubectl-plugin/pkg/cmd/job/job_submit.go | 2 - kubectl-plugin/pkg/util/client/client.go | 2 + .../pkg/util/generation/generation.go | 33 +--- 6 files changed, 179 insertions(+), 31 deletions(-) create mode 100644 kubectl-plugin/pkg/cmd/create/create_workergroup.go diff --git a/kubectl-plugin/pkg/cmd/create/create.go b/kubectl-plugin/pkg/cmd/create/create.go index a38bb626af..c3812d24c5 100644 --- a/kubectl-plugin/pkg/cmd/create/create.go +++ b/kubectl-plugin/pkg/cmd/create/create.go @@ -22,5 +22,6 @@ func NewCreateCommand(streams genericclioptions.IOStreams) *cobra.Command { } cmd.AddCommand(NewCreateClusterCommand(streams)) + cmd.AddCommand(NewCreateWorkerGroupCommand(streams)) return cmd } diff --git a/kubectl-plugin/pkg/cmd/create/create_cluster.go b/kubectl-plugin/pkg/cmd/create/create_cluster.go index 3434e93642..3de9695421 100644 --- a/kubectl-plugin/pkg/cmd/create/create_cluster.go +++ b/kubectl-plugin/pkg/cmd/create/create_cluster.go @@ -22,7 +22,6 @@ type CreateClusterOptions struct { image string headCPU string headMemory string - workerGrpName string workerCPU string workerMemory string workerReplicas int32 @@ -35,8 +34,11 @@ var ( `) createClusterExample = templates.Examples(` + # Create a Ray cluster using default values + kubectl ray create cluster sample-cluster + # Creates Ray Cluster from flags input - kubectl ray create cluster sample-cluster --ray-version 2.39.0 --image rayproject/ray:2.39.0 --head-cpu 1 --head-memory 5Gi --worker-grp-name worker-group1 --worker-replicas 3 --worker-cpu 1 --worker-memory 5Gi + kubectl ray create cluster sample-cluster --ray-version 2.39.0 --image rayproject/ray:2.39.0 --head-cpu 1 --head-memory 5Gi --worker-replicas 3 --worker-cpu 1 --worker-memory 5Gi `) ) @@ -72,7 +74,6 @@ func NewCreateClusterCommand(streams genericclioptions.IOStreams) *cobra.Command cmd.Flags().StringVar(&options.image, "image", options.image, "Ray image to use in the Ray Cluster yaml") cmd.Flags().StringVar(&options.headCPU, "head-cpu", "2", "Number of CPU for the ray head. Default to 2") cmd.Flags().StringVar(&options.headMemory, "head-memory", "4Gi", "Amount of memory to use for the ray head. Default to 4Gi") - cmd.Flags().StringVar(&options.workerGrpName, "worker-grp-name", "default-group", "Name of the worker group for the Ray Cluster") cmd.Flags().Int32Var(&options.workerReplicas, "worker-replicas", 1, "Number of the worker group replicas. Default of 1") cmd.Flags().StringVar(&options.workerCPU, "worker-cpu", "2", "Number of CPU for the ray worker. Default to 2") cmd.Flags().StringVar(&options.workerMemory, "worker-memory", "4Gi", "Amount of memory to use for the ray worker. Default to 4Gi") @@ -126,7 +127,6 @@ func (options *CreateClusterOptions) Run(ctx context.Context, factory cmdutil.Fa Image: options.image, HeadCPU: options.headCPU, HeadMemory: options.headMemory, - WorkerGrpName: options.workerGrpName, WorkerReplicas: options.workerReplicas, WorkerCPU: options.workerCPU, WorkerMemory: options.workerMemory, diff --git a/kubectl-plugin/pkg/cmd/create/create_workergroup.go b/kubectl-plugin/pkg/cmd/create/create_workergroup.go new file mode 100644 index 0000000000..6147fd736e --- /dev/null +++ b/kubectl-plugin/pkg/cmd/create/create_workergroup.go @@ -0,0 +1,164 @@ +package create + +import ( + "context" + "fmt" + + "github.com/spf13/cobra" + "k8s.io/cli-runtime/pkg/genericclioptions" + + "github.com/ray-project/kuberay/kubectl-plugin/pkg/util/client" + rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + cmdutil "k8s.io/kubectl/pkg/cmd/util" + "k8s.io/kubectl/pkg/util/templates" +) + +type CreateWorkerGroupOptions struct { + configFlags *genericclioptions.ConfigFlags + ioStreams *genericclioptions.IOStreams + clusterName string + groupName string + rayVersion string + image string + workerCPU string + workerMemory string + workerReplicas int32 + workerMinReplicas int32 + workerMaxReplicas int32 +} + +var ( + createWorkerGroupLong = templates.LongDesc(` + Adds a worker group to an existing RayCluster. + `) + + createWorkerGroupExample = templates.Examples(` + # Create a worker group in an existing RayCluster + kubectl ray create worker-group gpu-group --cluster sample-cluster --image rayproject/ray:2.39.0 --worker-gpu 1 --worker-memory=5Gi + `) +) + +func NewCreateWorkerGroupOptions(streams genericclioptions.IOStreams) *CreateWorkerGroupOptions { + return &CreateWorkerGroupOptions{ + configFlags: genericclioptions.NewConfigFlags(true), + ioStreams: &streams, + } +} + +func NewCreateWorkerGroupCommand(streams genericclioptions.IOStreams) *cobra.Command { + options := NewCreateWorkerGroupOptions(streams) + cmdFactory := cmdutil.NewFactory(options.configFlags) + + cmd := &cobra.Command{ + Use: "workergroup [WORKERGROUP]", + Short: "Create worker group in an existing RayCluster", + Long: createWorkerGroupLong, + Example: createWorkerGroupExample, + SilenceUsage: true, + RunE: func(cmd *cobra.Command, args []string) error { + if err := options.Complete(cmd, args); err != nil { + return err + } + if err := options.Validate(); err != nil { + return err + } + return options.Run(cmd.Context(), cmdFactory) + }, + } + + cmd.Flags().StringVar(&options.clusterName, "ray-cluster", "", "The name of the RayCluster to add a worker group.") + cmd.Flags().StringVar(&options.rayVersion, "ray-version", "2.39.0", "Ray Version to use in the Ray Cluster yaml. Default to 2.39.0") + cmd.Flags().StringVar(&options.image, "image", options.image, "Ray image to use in the Ray Cluster yaml") + cmd.Flags().Int32Var(&options.workerReplicas, "worker-replicas", 1, "Number of the worker group replicas. Default of 1") + cmd.Flags().Int32Var(&options.workerMinReplicas, "worker-min-replicas", 1, "Number of the worker group replicas. Default of 10") + cmd.Flags().Int32Var(&options.workerMaxReplicas, "worker-max-replicas", 10, "Number of the worker group replicas. Default of 10") + cmd.Flags().StringVar(&options.workerCPU, "worker-cpu", "2", "Number of CPU for the ray worker. Default to 2") + cmd.Flags().StringVar(&options.workerMemory, "worker-memory", "4Gi", "Amount of memory to use for the ray worker. Default to 4Gi") + + options.configFlags.AddFlags(cmd.Flags()) + return cmd +} + +func (options *CreateWorkerGroupOptions) Complete(cmd *cobra.Command, args []string) error { + if *options.configFlags.Namespace == "" { + *options.configFlags.Namespace = "default" + } + + if len(args) != 1 { + return cmdutil.UsageErrorf(cmd, "%s", cmd.Use) + } + options.groupName = args[0] + + if options.image == "" { + options.image = fmt.Sprintf("rayproject/ray:%s", options.rayVersion) + } + + return nil +} + +func (options *CreateWorkerGroupOptions) Validate() error { + config, err := options.configFlags.ToRawKubeConfigLoader().RawConfig() + if err != nil { + return fmt.Errorf("Error retrieving raw config: %w", err) + } + if len(config.CurrentContext) == 0 { + return fmt.Errorf("no context is currently set, use %q to select a new one", "kubectl config use-context ") + } + + return nil +} + +func (options *CreateWorkerGroupOptions) Run(ctx context.Context, factory cmdutil.Factory) error { + k8sClient, err := client.NewClient(factory) + if err != nil { + return fmt.Errorf("failed to create client: %w", err) + } + + rayCluster, err := k8sClient.RayClient().RayV1().RayClusters(*options.configFlags.Namespace).Get(ctx, options.clusterName, metav1.GetOptions{}) + if err != nil { + return fmt.Errorf("error getting RayCluster: %v", err) + } + + newRayCluster := rayCluster.DeepCopy() + podTemplate := corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "ray-worker", + Image: options.image, + Resources: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse(options.workerCPU), + corev1.ResourceMemory: resource.MustParse(options.workerMemory), + }, + Limits: corev1.ResourceList{ + corev1.ResourceMemory: resource.MustParse(options.workerMemory), + }, + }, + }, + }, + }, + } + + workerGroup := rayv1.WorkerGroupSpec{ + GroupName: options.groupName, + Replicas: &options.workerReplicas, + MinReplicas: &options.workerMinReplicas, + MaxReplicas: &options.workerMaxReplicas, + RayStartParams: map[string]string{}, + Template: podTemplate, + } + newRayCluster.Spec.WorkerGroupSpecs = append(newRayCluster.Spec.WorkerGroupSpecs, workerGroup) + + newRayCluster, err = k8sClient.RayClient().RayV1().RayClusters(*options.configFlags.Namespace).Update(ctx, newRayCluster, metav1.UpdateOptions{}) + if err != nil { + return fmt.Errorf("error updating RayCluster with new worker group: %v", err) + } + + fmt.Printf("Updated RayCluster %s/%s with new worker group\n", newRayCluster.Namespace, newRayCluster.Name) + return nil +} diff --git a/kubectl-plugin/pkg/cmd/job/job_submit.go b/kubectl-plugin/pkg/cmd/job/job_submit.go index 6fc461ce0c..82d300c927 100644 --- a/kubectl-plugin/pkg/cmd/job/job_submit.go +++ b/kubectl-plugin/pkg/cmd/job/job_submit.go @@ -160,7 +160,6 @@ func NewJobSubmitCommand(streams genericclioptions.IOStreams) *cobra.Command { cmd.Flags().StringVar(&options.image, "image", "rayproject/ray:2.39.0", "Ray image to use in the Ray Cluster yaml") cmd.Flags().StringVar(&options.headCPU, "head-cpu", "2", "Number of CPU for the ray head") cmd.Flags().StringVar(&options.headMemory, "head-memory", "4Gi", "Amount of memory to use for the ray head") - cmd.Flags().StringVar(&options.workerGrpName, "worker-grp-name", "default-group", "Name of the worker group for the Ray Cluster") cmd.Flags().Int32Var(&options.workerReplicas, "worker-replicas", 1, "Number of the worker group replicas") cmd.Flags().StringVar(&options.workerCPU, "worker-cpu", "2", "Number of CPU for the ray worker") cmd.Flags().StringVar(&options.workerMemory, "worker-memory", "4Gi", "Amount of memory to use for the ray worker") @@ -280,7 +279,6 @@ func (options *SubmitJobOptions) Run(ctx context.Context, factory cmdutil.Factor Image: options.image, HeadCPU: options.headCPU, HeadMemory: options.headMemory, - WorkerGrpName: options.workerGrpName, WorkerCPU: options.workerCPU, WorkerMemory: options.workerMemory, WorkerReplicas: options.workerReplicas, diff --git a/kubectl-plugin/pkg/util/client/client.go b/kubectl-plugin/pkg/util/client/client.go index 26dcfd1a14..22b21b15f4 100644 --- a/kubectl-plugin/pkg/util/client/client.go +++ b/kubectl-plugin/pkg/util/client/client.go @@ -43,10 +43,12 @@ func NewClient(factory cmdutil.Factory) (Client, error) { if err != nil { return nil, err } + rayClient, err := rayclient.NewForConfig(restConfig) if err != nil { return nil, err } + return &k8sClient{ kubeClient: kubeClient, dynamicClient: dynamicClient, diff --git a/kubectl-plugin/pkg/util/generation/generation.go b/kubectl-plugin/pkg/util/generation/generation.go index 61d714d698..adfd42075c 100644 --- a/kubectl-plugin/pkg/util/generation/generation.go +++ b/kubectl-plugin/pkg/util/generation/generation.go @@ -13,16 +13,13 @@ import ( ) type RayClusterSpecObject struct { - RayVersion string - Image string - HeadCPU string - HeadMemory string - WorkerGrpName string - WorkerCPU string - WorkerMemory string - HeadLifecyclePrestopExecCommand []string - WorkerLifecyclePrestopExecComand []string - WorkerReplicas int32 + RayVersion string + Image string + HeadCPU string + HeadMemory string + WorkerCPU string + WorkerMemory string + WorkerReplicas int32 } type RayClusterYamlObject struct { @@ -81,7 +78,7 @@ func (rayClusterSpecObject *RayClusterSpecObject) generateRayClusterSpec() *rayv corev1ac.ContainerPort().WithContainerPort(10001).WithName("client")))))). WithWorkerGroupSpecs(rayv1ac.WorkerGroupSpec(). WithRayStartParams(map[string]string{"metrics-export-port": "8080"}). - WithGroupName(rayClusterSpecObject.WorkerGrpName). + WithGroupName("default-group"). WithReplicas(rayClusterSpecObject.WorkerReplicas). WithTemplate(corev1ac.PodTemplateSpec(). WithSpec(corev1ac.PodSpec(). @@ -97,20 +94,6 @@ func (rayClusterSpecObject *RayClusterSpecObject) generateRayClusterSpec() *rayv corev1.ResourceCPU: resource.MustParse(rayClusterSpecObject.HeadCPU), corev1.ResourceMemory: resource.MustParse(rayClusterSpecObject.HeadMemory), })))))) - - // Lifecycle cannot be empty, an empty lifecycle will stop pod startup so this will add lifecycle if its not empty - if len(rayClusterSpecObject.WorkerLifecyclePrestopExecComand) > 0 { - rayClusterSpec.WorkerGroupSpecs[0].Template.Spec.Containers[0].Lifecycle = corev1ac.Lifecycle(). - WithPreStop(corev1ac.LifecycleHandler(). - WithExec(corev1ac.ExecAction(). - WithCommand(rayClusterSpecObject.WorkerLifecyclePrestopExecComand...))) - } - if len(rayClusterSpecObject.HeadLifecyclePrestopExecCommand) > 0 { - rayClusterSpec.HeadGroupSpec.Template.Spec.Containers[0].Lifecycle = corev1ac.Lifecycle(). - WithPreStop(corev1ac.LifecycleHandler(). - WithExec(corev1ac.ExecAction(). - WithCommand(rayClusterSpecObject.HeadLifecyclePrestopExecCommand...))) - } return rayClusterSpec }