Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP][kubectl-plugin] add create workergroup command #2673

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions kubectl-plugin/pkg/cmd/create/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,6 @@ func NewCreateCommand(streams genericclioptions.IOStreams) *cobra.Command {
}

cmd.AddCommand(NewCreateClusterCommand(streams))
cmd.AddCommand(NewCreateWorkerGroupCommand(streams))
return cmd
}
8 changes: 4 additions & 4 deletions kubectl-plugin/pkg/cmd/create/create_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ type CreateClusterOptions struct {
image string
headCPU string
headMemory string
workerGrpName string
workerCPU string
workerMemory string
workerReplicas int32
Expand All @@ -35,8 +34,11 @@ var (
`)

createClusterExample = templates.Examples(`
# Create a Ray cluster using default values
kubectl ray create cluster sample-cluster

# Creates Ray Cluster from flags input
kubectl ray create cluster sample-cluster --ray-version 2.39.0 --image rayproject/ray:2.39.0 --head-cpu 1 --head-memory 5Gi --worker-grp-name worker-group1 --worker-replicas 3 --worker-cpu 1 --worker-memory 5Gi
kubectl ray create cluster sample-cluster --ray-version 2.39.0 --image rayproject/ray:2.39.0 --head-cpu 1 --head-memory 5Gi --worker-replicas 3 --worker-cpu 1 --worker-memory 5Gi
`)
)

Expand Down Expand Up @@ -72,7 +74,6 @@ func NewCreateClusterCommand(streams genericclioptions.IOStreams) *cobra.Command
cmd.Flags().StringVar(&options.image, "image", options.image, "Ray image to use in the Ray Cluster yaml")
cmd.Flags().StringVar(&options.headCPU, "head-cpu", "2", "Number of CPU for the ray head. Default to 2")
cmd.Flags().StringVar(&options.headMemory, "head-memory", "4Gi", "Amount of memory to use for the ray head. Default to 4Gi")
cmd.Flags().StringVar(&options.workerGrpName, "worker-grp-name", "default-group", "Name of the worker group for the Ray Cluster")
cmd.Flags().Int32Var(&options.workerReplicas, "worker-replicas", 1, "Number of the worker group replicas. Default of 1")
cmd.Flags().StringVar(&options.workerCPU, "worker-cpu", "2", "Number of CPU for the ray worker. Default to 2")
cmd.Flags().StringVar(&options.workerMemory, "worker-memory", "4Gi", "Amount of memory to use for the ray worker. Default to 4Gi")
Expand Down Expand Up @@ -126,7 +127,6 @@ func (options *CreateClusterOptions) Run(ctx context.Context, factory cmdutil.Fa
Image: options.image,
HeadCPU: options.headCPU,
HeadMemory: options.headMemory,
WorkerGrpName: options.workerGrpName,
WorkerReplicas: options.workerReplicas,
WorkerCPU: options.workerCPU,
WorkerMemory: options.workerMemory,
Expand Down
164 changes: 164 additions & 0 deletions kubectl-plugin/pkg/cmd/create/create_workergroup.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
package create

import (
"context"
"fmt"

"github.com/spf13/cobra"
"k8s.io/cli-runtime/pkg/genericclioptions"

"github.com/ray-project/kuberay/kubectl-plugin/pkg/util/client"
rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
cmdutil "k8s.io/kubectl/pkg/cmd/util"
"k8s.io/kubectl/pkg/util/templates"
)

type CreateWorkerGroupOptions struct {
configFlags *genericclioptions.ConfigFlags
ioStreams *genericclioptions.IOStreams
clusterName string
groupName string
rayVersion string
image string
workerCPU string
workerMemory string
workerReplicas int32
workerMinReplicas int32
workerMaxReplicas int32
}

var (
createWorkerGroupLong = templates.LongDesc(`
Adds a worker group to an existing RayCluster.
`)

createWorkerGroupExample = templates.Examples(`
# Create a worker group in an existing RayCluster
kubectl ray create worker-group gpu-group --cluster sample-cluster --image rayproject/ray:2.39.0 --worker-gpu 1 --worker-memory=5Gi
`)
)

func NewCreateWorkerGroupOptions(streams genericclioptions.IOStreams) *CreateWorkerGroupOptions {
return &CreateWorkerGroupOptions{
configFlags: genericclioptions.NewConfigFlags(true),
ioStreams: &streams,
}
}

func NewCreateWorkerGroupCommand(streams genericclioptions.IOStreams) *cobra.Command {
options := NewCreateWorkerGroupOptions(streams)
cmdFactory := cmdutil.NewFactory(options.configFlags)

cmd := &cobra.Command{
Use: "workergroup [WORKERGROUP]",
Short: "Create worker group in an existing RayCluster",
Long: createWorkerGroupLong,
Example: createWorkerGroupExample,
SilenceUsage: true,
RunE: func(cmd *cobra.Command, args []string) error {
if err := options.Complete(cmd, args); err != nil {
return err
}
if err := options.Validate(); err != nil {
return err
}
return options.Run(cmd.Context(), cmdFactory)
},
}

cmd.Flags().StringVar(&options.clusterName, "ray-cluster", "", "The name of the RayCluster to add a worker group.")
cmd.Flags().StringVar(&options.rayVersion, "ray-version", "2.39.0", "Ray Version to use in the Ray Cluster yaml. Default to 2.39.0")
cmd.Flags().StringVar(&options.image, "image", options.image, "Ray image to use in the Ray Cluster yaml")
cmd.Flags().Int32Var(&options.workerReplicas, "worker-replicas", 1, "Number of the worker group replicas. Default of 1")
cmd.Flags().Int32Var(&options.workerMinReplicas, "worker-min-replicas", 1, "Number of the worker group replicas. Default of 10")
cmd.Flags().Int32Var(&options.workerMaxReplicas, "worker-max-replicas", 10, "Number of the worker group replicas. Default of 10")
cmd.Flags().StringVar(&options.workerCPU, "worker-cpu", "2", "Number of CPU for the ray worker. Default to 2")
cmd.Flags().StringVar(&options.workerMemory, "worker-memory", "4Gi", "Amount of memory to use for the ray worker. Default to 4Gi")

options.configFlags.AddFlags(cmd.Flags())
return cmd
}

func (options *CreateWorkerGroupOptions) Complete(cmd *cobra.Command, args []string) error {
if *options.configFlags.Namespace == "" {
*options.configFlags.Namespace = "default"
}

if len(args) != 1 {
return cmdutil.UsageErrorf(cmd, "%s", cmd.Use)
}
options.groupName = args[0]

if options.image == "" {
options.image = fmt.Sprintf("rayproject/ray:%s", options.rayVersion)
}

return nil
}

func (options *CreateWorkerGroupOptions) Validate() error {
config, err := options.configFlags.ToRawKubeConfigLoader().RawConfig()
if err != nil {
return fmt.Errorf("Error retrieving raw config: %w", err)
}
if len(config.CurrentContext) == 0 {
return fmt.Errorf("no context is currently set, use %q to select a new one", "kubectl config use-context <context>")
}

return nil
}

func (options *CreateWorkerGroupOptions) Run(ctx context.Context, factory cmdutil.Factory) error {
k8sClient, err := client.NewClient(factory)
if err != nil {
return fmt.Errorf("failed to create client: %w", err)
}

rayCluster, err := k8sClient.RayClient().RayV1().RayClusters(*options.configFlags.Namespace).Get(ctx, options.clusterName, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("error getting RayCluster: %v", err)
}

newRayCluster := rayCluster.DeepCopy()
podTemplate := corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "ray-worker",
Image: options.image,
Resources: corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse(options.workerCPU),
corev1.ResourceMemory: resource.MustParse(options.workerMemory),
},
Limits: corev1.ResourceList{
corev1.ResourceMemory: resource.MustParse(options.workerMemory),
},
},
},
},
},
}

workerGroup := rayv1.WorkerGroupSpec{
GroupName: options.groupName,
Replicas: &options.workerReplicas,
MinReplicas: &options.workerMinReplicas,
MaxReplicas: &options.workerMaxReplicas,
RayStartParams: map[string]string{},
Template: podTemplate,
}
newRayCluster.Spec.WorkerGroupSpecs = append(newRayCluster.Spec.WorkerGroupSpecs, workerGroup)

newRayCluster, err = k8sClient.RayClient().RayV1().RayClusters(*options.configFlags.Namespace).Update(ctx, newRayCluster, metav1.UpdateOptions{})
if err != nil {
return fmt.Errorf("error updating RayCluster with new worker group: %v", err)
}

fmt.Printf("Updated RayCluster %s/%s with new worker group\n", newRayCluster.Namespace, newRayCluster.Name)
return nil
}
2 changes: 0 additions & 2 deletions kubectl-plugin/pkg/cmd/job/job_submit.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,6 @@ func NewJobSubmitCommand(streams genericclioptions.IOStreams) *cobra.Command {
cmd.Flags().StringVar(&options.image, "image", "rayproject/ray:2.39.0", "Ray image to use in the Ray Cluster yaml")
cmd.Flags().StringVar(&options.headCPU, "head-cpu", "2", "Number of CPU for the ray head")
cmd.Flags().StringVar(&options.headMemory, "head-memory", "4Gi", "Amount of memory to use for the ray head")
cmd.Flags().StringVar(&options.workerGrpName, "worker-grp-name", "default-group", "Name of the worker group for the Ray Cluster")
cmd.Flags().Int32Var(&options.workerReplicas, "worker-replicas", 1, "Number of the worker group replicas")
cmd.Flags().StringVar(&options.workerCPU, "worker-cpu", "2", "Number of CPU for the ray worker")
cmd.Flags().StringVar(&options.workerMemory, "worker-memory", "4Gi", "Amount of memory to use for the ray worker")
Expand Down Expand Up @@ -280,7 +279,6 @@ func (options *SubmitJobOptions) Run(ctx context.Context, factory cmdutil.Factor
Image: options.image,
HeadCPU: options.headCPU,
HeadMemory: options.headMemory,
WorkerGrpName: options.workerGrpName,
WorkerCPU: options.workerCPU,
WorkerMemory: options.workerMemory,
WorkerReplicas: options.workerReplicas,
Expand Down
2 changes: 2 additions & 0 deletions kubectl-plugin/pkg/util/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,12 @@ func NewClient(factory cmdutil.Factory) (Client, error) {
if err != nil {
return nil, err
}

rayClient, err := rayclient.NewForConfig(restConfig)
if err != nil {
return nil, err
}

return &k8sClient{
kubeClient: kubeClient,
dynamicClient: dynamicClient,
Expand Down
33 changes: 8 additions & 25 deletions kubectl-plugin/pkg/util/generation/generation.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,13 @@ import (
)

type RayClusterSpecObject struct {
RayVersion string
Image string
HeadCPU string
HeadMemory string
WorkerGrpName string
WorkerCPU string
WorkerMemory string
HeadLifecyclePrestopExecCommand []string
WorkerLifecyclePrestopExecComand []string
WorkerReplicas int32
RayVersion string
Image string
HeadCPU string
HeadMemory string
WorkerCPU string
WorkerMemory string
WorkerReplicas int32
}

type RayClusterYamlObject struct {
Expand Down Expand Up @@ -81,7 +78,7 @@ func (rayClusterSpecObject *RayClusterSpecObject) generateRayClusterSpec() *rayv
corev1ac.ContainerPort().WithContainerPort(10001).WithName("client")))))).
WithWorkerGroupSpecs(rayv1ac.WorkerGroupSpec().
WithRayStartParams(map[string]string{"metrics-export-port": "8080"}).
WithGroupName(rayClusterSpecObject.WorkerGrpName).
WithGroupName("default-group").
WithReplicas(rayClusterSpecObject.WorkerReplicas).
WithTemplate(corev1ac.PodTemplateSpec().
WithSpec(corev1ac.PodSpec().
Expand All @@ -97,20 +94,6 @@ func (rayClusterSpecObject *RayClusterSpecObject) generateRayClusterSpec() *rayv
corev1.ResourceCPU: resource.MustParse(rayClusterSpecObject.HeadCPU),
corev1.ResourceMemory: resource.MustParse(rayClusterSpecObject.HeadMemory),
}))))))

// Lifecycle cannot be empty, an empty lifecycle will stop pod startup so this will add lifecycle if its not empty
if len(rayClusterSpecObject.WorkerLifecyclePrestopExecComand) > 0 {
rayClusterSpec.WorkerGroupSpecs[0].Template.Spec.Containers[0].Lifecycle = corev1ac.Lifecycle().
WithPreStop(corev1ac.LifecycleHandler().
WithExec(corev1ac.ExecAction().
WithCommand(rayClusterSpecObject.WorkerLifecyclePrestopExecComand...)))
}
if len(rayClusterSpecObject.HeadLifecyclePrestopExecCommand) > 0 {
rayClusterSpec.HeadGroupSpec.Template.Spec.Containers[0].Lifecycle = corev1ac.Lifecycle().
WithPreStop(corev1ac.LifecycleHandler().
WithExec(corev1ac.ExecAction().
WithCommand(rayClusterSpecObject.HeadLifecyclePrestopExecCommand...)))
}
return rayClusterSpec
}

Expand Down
Loading