From af91523b47d21f9938d3ac1db1f2b9235d8d72b3 Mon Sep 17 00:00:00 2001 From: Aaron Liang Date: Wed, 11 Dec 2024 16:08:42 -0800 Subject: [PATCH] Add rayjob yaml generation to ray job submit command --- kubectl-plugin/go.mod | 2 +- kubectl-plugin/go.sum | 4 +- .../pkg/cmd/create/create_cluster.go | 27 ++- kubectl-plugin/pkg/cmd/job/job_submit.go | 150 ++++++++++++---- .../pkg/util/generation/generation.go | 163 ++++++++++++------ .../pkg/util/generation/generation_test.go | 84 ++++++--- 6 files changed, 297 insertions(+), 133 deletions(-) diff --git a/kubectl-plugin/go.mod b/kubectl-plugin/go.mod index aecc34efcc..26a5256a70 100644 --- a/kubectl-plugin/go.mod +++ b/kubectl-plugin/go.mod @@ -8,7 +8,7 @@ require ( github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 github.com/onsi/ginkgo/v2 v2.20.2 github.com/onsi/gomega v1.34.2 - github.com/ray-project/kuberay/ray-operator v1.2.1 + github.com/ray-project/kuberay/ray-operator v1.2.2 github.com/spf13/cobra v1.8.1 github.com/spf13/pflag v1.0.5 github.com/stretchr/testify v1.9.0 diff --git a/kubectl-plugin/go.sum b/kubectl-plugin/go.sum index 0863090a11..03e5334348 100644 --- a/kubectl-plugin/go.sum +++ b/kubectl-plugin/go.sum @@ -133,8 +133,8 @@ github.com/prometheus/common v0.59.1 h1:LXb1quJHWm1P6wq/U824uxYi4Sg0oGvNeUm1z5dJ github.com/prometheus/common v0.59.1/go.mod h1:GpWM7dewqmVYcd7SmRaiWVe9SSqjf0UrwnYnpEZNuT0= github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc= github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk= -github.com/ray-project/kuberay/ray-operator v1.2.1 h1:H7ofodGclghsU2TxbDHs+gvqvsOp5DJ/vAPGySL1DIE= -github.com/ray-project/kuberay/ray-operator v1.2.1/go.mod h1:osTiIyaDoWi5IN1f0tOOtZ4TzVf+5kJXZor8VFvcEiI= +github.com/ray-project/kuberay/ray-operator v1.2.2 h1:wj4qe9SmJfD1ubgEaVPuAsnU/WFDvremzR8j3JslBdk= +github.com/ray-project/kuberay/ray-operator v1.2.2/go.mod h1:osTiIyaDoWi5IN1f0tOOtZ4TzVf+5kJXZor8VFvcEiI= github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk= diff --git a/kubectl-plugin/pkg/cmd/create/create_cluster.go b/kubectl-plugin/pkg/cmd/create/create_cluster.go index 05dba6fb29..3434e93642 100644 --- a/kubectl-plugin/pkg/cmd/create/create_cluster.go +++ b/kubectl-plugin/pkg/cmd/create/create_cluster.go @@ -119,22 +119,21 @@ func (options *CreateClusterOptions) Run(ctx context.Context, factory cmdutil.Fa // Will generate yaml file rayClusterObject := generation.RayClusterYamlObject{ - Namespace: *options.configFlags.Namespace, - ClusterName: options.clusterName, - RayVersion: options.rayVersion, - Image: options.image, - HeadCPU: options.headCPU, - HeadMemory: options.headMemory, - WorkerGrpName: options.workerGrpName, - WorkerReplicas: options.workerReplicas, - WorkerCPU: options.workerCPU, - WorkerMemory: options.workerMemory, + Namespace: *options.configFlags.Namespace, + ClusterName: options.clusterName, + RayClusterSpecObject: generation.RayClusterSpecObject{ + RayVersion: options.rayVersion, + Image: options.image, + HeadCPU: options.headCPU, + HeadMemory: options.headMemory, + WorkerGrpName: options.workerGrpName, + WorkerReplicas: options.workerReplicas, + WorkerCPU: options.workerCPU, + WorkerMemory: options.workerMemory, + }, } - rayClusterac, err := rayClusterObject.GenerateRayClusterApplyConfig() - if err != nil { - return err - } + rayClusterac := rayClusterObject.GenerateRayClusterApplyConfig() // If dry run is enabled, it will call the yaml converter and print out the yaml if options.dryRun { diff --git a/kubectl-plugin/pkg/cmd/job/job_submit.go b/kubectl-plugin/pkg/cmd/job/job_submit.go index 5073abf375..1d27b6a4e4 100644 --- a/kubectl-plugin/pkg/cmd/job/job_submit.go +++ b/kubectl-plugin/pkg/cmd/job/job_submit.go @@ -28,6 +28,7 @@ import ( "github.com/google/shlex" "github.com/ray-project/kuberay/kubectl-plugin/pkg/util" "github.com/ray-project/kuberay/kubectl-plugin/pkg/util/client" + "github.com/ray-project/kuberay/kubectl-plugin/pkg/util/generation" "github.com/spf13/cobra" rayv1api "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" @@ -56,10 +57,20 @@ type SubmitJobOptions struct { metadataJson string logStyle string logColor string + rayjobName string + rayVersion string + image string + headCPU string + headMemory string + workerGrpName string + workerCPU string + workerMemory string entryPointCPU float32 entryPointGPU float32 entryPointMemory int + workerReplicas int32 noWait bool + dryRun bool } type RayJob struct { @@ -71,6 +82,8 @@ var ( Submit ray job to ray cluster as one would using ray CLI e.g. 'ray job submit ENTRYPOINT'. Command supports all options that 'ray job submit' supports, except '--address'. If RayCluster is already setup, use 'kubectl ray session' instead. + If no ray job file is specified, one will be generated. + Command will apply RayJob CR and also submit the ray job. RayJob CR is required. `) @@ -83,6 +96,12 @@ var ( # Submit ray job with runtime Env file assuming runtime-env has working_dir set kubectl ray job submit -f rayjob.yaml --runtime-env path/to/runtimeEnv.yaml -- python my_script.py + + # Submit generated ray job with default values and with runtime Env file and working directory + kubectl ray job submit --rayjob-name rayjob-sample --working-dir /path/to/working-dir/ --runtime-env /runtimeEnv.yaml -- python my_script.py + + # Generate ray job with specifications and submit ray job with runtime Env file and working directory + kubectl ray job submit --rayjob-name rayjob-sample --ray-version 2.39.0 --image rayproject/ray:2.39.0 --head-cpu 1 --head-memory 5Gi --worker-grp-name worker-group1 --worker-replicas 3 --worker-cpu 1 --worker-memory 5Gi --runtime-env path/to/runtimeEnv.yaml -- python my_script.py `) ) @@ -127,15 +146,23 @@ func NewJobSubmitCommand(streams genericclioptions.IOStreams) *cobra.Command { cmd.Flags().StringVar(&options.entryPointResource, "entrypoint-resources", options.entryPointResource, "JSON-serialized dictionary mapping resource name to resource quantity") cmd.Flags().StringVar(&options.metadataJson, "metadata-json", options.metadataJson, "JSON-serialized dictionary of metadata to attach to the job.") cmd.Flags().StringVar(&options.logStyle, "log-style", options.logStyle, "Specific to 'ray job submit'. Options are 'auto | record | pretty'") - cmd.Flags().StringVar(&options.logColor, "log-clor", options.logColor, "Specifc to 'ray job submit'. Options are 'auto | false | true'") + cmd.Flags().StringVar(&options.logColor, "log-color", options.logColor, "Specific to 'ray job submit'. Options are 'auto | false | true'") cmd.Flags().Float32Var(&options.entryPointCPU, "entrypoint-num-cpus", options.entryPointCPU, "Number of CPU reserved for the for the entrypoint command") cmd.Flags().Float32Var(&options.entryPointGPU, "entrypoint-num-gpus", options.entryPointGPU, "Number of GPU reserved for the for the entrypoint command") cmd.Flags().IntVar(&options.entryPointMemory, "entrypoint-memory", options.entryPointMemory, "Amount of memory reserved for the entrypoint command") cmd.Flags().BoolVar(&options.noWait, "no-wait", options.noWait, "If present, will not stream logs and wait for job to finish") - err := cmd.MarkFlagRequired("filename") - if err != nil { - log.Fatalf("Failed to mark flag as required %v", err) - } + + cmd.Flags().StringVar(&options.rayjobName, "rayjob-name", "", "Name of the ray job that will be generated") + cmd.Flags().StringVar(&options.rayVersion, "ray-version", "2.39.0", "Ray Version to use in the Ray Cluster yaml.") + cmd.Flags().StringVar(&options.image, "image", "rayproject/ray:2.39.0", "Ray image to use in the Ray Cluster yaml") + cmd.Flags().StringVar(&options.headCPU, "head-cpu", "2", "Number of CPU for the ray head") + cmd.Flags().StringVar(&options.headMemory, "head-memory", "4Gi", "Amount of memory to use for the ray head") + cmd.Flags().StringVar(&options.workerGrpName, "worker-grp-name", "default-group", "Name of the worker group for the Ray Cluster") + cmd.Flags().Int32Var(&options.workerReplicas, "worker-replicas", 1, "Number of the worker group replicas") + cmd.Flags().StringVar(&options.workerCPU, "worker-cpu", "2", "Number of CPU for the ray worker") + cmd.Flags().StringVar(&options.workerMemory, "worker-memory", "4Gi", "Amount of memory to use for the ray worker") + cmd.Flags().BoolVar(&options.dryRun, "dry-run", false, "Will not apply the generated cluster and will print out the generated yaml. Only works when yaml is generated") + options.configFlags.AddFlags(cmd.Flags()) return cmd } @@ -149,7 +176,9 @@ func (options *SubmitJobOptions) Complete() error { options.runtimeEnv = filepath.Clean(options.runtimeEnv) } - options.fileName = filepath.Clean(options.fileName) + if options.fileName != "" { + options.fileName = filepath.Clean(options.fileName) + } return nil } @@ -182,39 +211,44 @@ func (options *SubmitJobOptions) Validate() error { } } - info, err := os.Stat(options.fileName) - if os.IsNotExist(err) { - return fmt.Errorf("Ray Job file does not exist. Failed with: %w", err) - } else if err != nil { - return fmt.Errorf("Error occurred when checking ray job file: %w", err) - } else if !info.Mode().IsRegular() { - return fmt.Errorf("Filename given is not a regular file. Failed with: %w", err) - } + // Take care of case where no file name + if options.fileName != "" { + info, err := os.Stat(options.fileName) + if os.IsNotExist(err) { + return fmt.Errorf("Ray Job file does not exist. Failed with: %w", err) + } else if err != nil { + return fmt.Errorf("Error occurred when checking ray job file: %w", err) + } else if !info.Mode().IsRegular() { + return fmt.Errorf("Filename given is not a regular file. Failed with: %w", err) + } - options.RayJob, err = decodeRayJobYaml(options.fileName) - if err != nil { - return fmt.Errorf("Failed to decode RayJob Yaml: %w", err) - } + options.RayJob, err = decodeRayJobYaml(options.fileName) + if err != nil { + return fmt.Errorf("Failed to decode RayJob Yaml: %w", err) + } - submissionMode, ok := options.RayJob.Object["spec"].(map[string]interface{})["submissionMode"] - if !ok { - return fmt.Errorf("RayJob does not have `submissionMode` field set") - } - if submissionMode != nil { - if submissionMode != "InteractiveMode" { - return fmt.Errorf("Submission mode of the Ray Job is not supported") + submissionMode, ok := options.RayJob.Object["spec"].(map[string]interface{})["submissionMode"] + if !ok { + return fmt.Errorf("RayJob does not have `submissionMode` field set") + } + if submissionMode != nil { + if submissionMode != "InteractiveMode" { + return fmt.Errorf("Submission mode of the Ray Job is not supported") + } + } else { + return fmt.Errorf("Submission mode must be set to 'InteractiveMode'") } - } else { - return fmt.Errorf("Submission mode must be set to 'InteractiveMode'") - } - runtimeEnvYaml, ok := options.RayJob.Object["spec"].(map[string]interface{})["runtimeEnvYAML"].(string) - if ok && options.runtimeEnv == "" && options.runtimeEnvJson == "" { - runtimeJson, err := yaml.YAMLToJSON([]byte(runtimeEnvYaml)) - if err != nil { - return fmt.Errorf("Failed to convert runtime env to json: %w", err) + runtimeEnvYaml, ok := options.RayJob.Object["spec"].(map[string]interface{})["runtimeEnvYAML"].(string) + if ok && options.runtimeEnv == "" && options.runtimeEnvJson == "" { + runtimeJson, err := yaml.YAMLToJSON([]byte(runtimeEnvYaml)) + if err != nil { + return fmt.Errorf("Failed to convert runtime env to json: %w", err) + } + options.runtimeEnvJson = string(runtimeJson) } - options.runtimeEnvJson = string(runtimeJson) + } else if strings.TrimSpace(options.rayjobName) == "" { + return fmt.Errorf("Must set either yaml file (--filename) or set ray job name (--rayjob-name)") } if options.workingDir == "" { @@ -232,10 +266,50 @@ func (options *SubmitJobOptions) Run(ctx context.Context, factory cmdutil.Factor return fmt.Errorf("failed to initialize clientset: %w", err) } - // createdRayJob, err = k8sClients.CreateRayCustomResource(ctx, util.RayJob, options.configFlags.Namespace, unstructuredRayjob) - options.RayJob, err = k8sClients.DynamicClient().Resource(util.RayJobGVR).Namespace(*options.configFlags.Namespace).Create(ctx, options.RayJob, v1.CreateOptions{}) - if err != nil { - return fmt.Errorf("Error when creating RayJob CR: %w", err) + if options.fileName == "" { + // Genarate the ray job. + rayJobObject := generation.RayJobYamlObject{ + RayJobName: options.rayjobName, + Namespace: *options.configFlags.Namespace, + SubmissionMode: "InteractiveMode", + RayClusterSpecObject: generation.RayClusterSpecObject{ + RayVersion: options.rayVersion, + Image: options.image, + HeadCPU: options.headCPU, + HeadMemory: options.headMemory, + WorkerGrpName: options.workerGrpName, + WorkerCPU: options.workerCPU, + WorkerMemory: options.workerMemory, + WorkerReplicas: options.workerReplicas, + // This is here to match the existing rayjob sample. + WorkerLifecyclePrestopExecComand: []string{"/bin/sh", "-c", "ray stop"}, + }, + } + rayJobApplyConfig := rayJobObject.GenerateRayJobApplyConfig() + + // Print out the yaml if it is a dry run + if options.dryRun { + resultYaml, err := generation.ConvertRayJobApplyConfigToYaml(rayJobApplyConfig) + if err != nil { + return fmt.Errorf("Failed to convert rayjob into yaml format: %w", err) + } + + fmt.Printf("%s\n", resultYaml) + return nil + } + + // Apply the generated yaml + rayJobApplyConfigResult, err := k8sClients.RayClient().RayV1().RayJobs(*options.configFlags.Namespace).Apply(ctx, rayJobApplyConfig, v1.ApplyOptions{FieldManager: "kubectl-plugin"}) + if err != nil { + return fmt.Errorf("Failed to apply generated yaml: %w", err) + } + options.RayJob = &unstructured.Unstructured{} + options.RayJob.SetName(rayJobApplyConfigResult.Name) + } else { + options.RayJob, err = k8sClients.DynamicClient().Resource(util.RayJobGVR).Namespace(*options.configFlags.Namespace).Create(ctx, options.RayJob, v1.CreateOptions{}) + if err != nil { + return fmt.Errorf("Error when creating RayJob CR: %w", err) + } } fmt.Printf("Submitted RayJob %s.\n", options.RayJob.GetName()) diff --git a/kubectl-plugin/pkg/util/generation/generation.go b/kubectl-plugin/pkg/util/generation/generation.go index d3ec61c969..61d714d698 100644 --- a/kubectl-plugin/pkg/util/generation/generation.go +++ b/kubectl-plugin/pkg/util/generation/generation.go @@ -8,74 +8,114 @@ import ( "k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/runtime" + rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" rayv1ac "github.com/ray-project/kuberay/ray-operator/pkg/client/applyconfiguration/ray/v1" ) +type RayClusterSpecObject struct { + RayVersion string + Image string + HeadCPU string + HeadMemory string + WorkerGrpName string + WorkerCPU string + WorkerMemory string + HeadLifecyclePrestopExecCommand []string + WorkerLifecyclePrestopExecComand []string + WorkerReplicas int32 +} + type RayClusterYamlObject struct { - ClusterName string + ClusterName string + Namespace string + RayClusterSpecObject +} + +type RayJobYamlObject struct { + RayJobName string Namespace string - RayVersion string - Image string - HeadCPU string - HeadMemory string - WorkerGrpName string - WorkerCPU string - WorkerMemory string - WorkerReplicas int32 + SubmissionMode string + RayClusterSpecObject +} + +func (rayClusterObject *RayClusterYamlObject) GenerateRayClusterApplyConfig() *rayv1ac.RayClusterApplyConfiguration { + rayClusterApplyConfig := rayv1ac.RayCluster(rayClusterObject.ClusterName, rayClusterObject.Namespace). + WithSpec(rayClusterObject.generateRayClusterSpec()) + + return rayClusterApplyConfig +} + +func (rayJobObject *RayJobYamlObject) GenerateRayJobApplyConfig() *rayv1ac.RayJobApplyConfiguration { + rayJobApplyConfig := rayv1ac.RayJob(rayJobObject.RayJobName, rayJobObject.Namespace). + WithSpec(rayv1ac.RayJobSpec(). + WithSubmissionMode(rayv1.JobSubmissionMode(rayJobObject.SubmissionMode)). + WithRayClusterSpec(rayJobObject.generateRayClusterSpec())) + + return rayJobApplyConfig } -func (rayClusterObject *RayClusterYamlObject) GenerateRayClusterApplyConfig() (*rayv1ac.RayClusterApplyConfiguration, error) { +func (rayClusterSpecObject *RayClusterSpecObject) generateRayClusterSpec() *rayv1ac.RayClusterSpecApplyConfiguration { // TODO: Look for better workaround/fixes for RayStartParams. Currently using `WithRayStartParams()` requires // a non-empty map with valid key value pairs and will not populate the field with empty/nil values. This // isn't ideal as it forces the generated RayCluster yamls to use those parameters. - rayClusterApplyConfig := rayv1ac.RayCluster(rayClusterObject.ClusterName, rayClusterObject.Namespace). - WithName(rayClusterObject.ClusterName). - WithSpec(rayv1ac.RayClusterSpec(). - WithRayVersion(rayClusterObject.RayVersion). - WithHeadGroupSpec(rayv1ac.HeadGroupSpec(). - WithRayStartParams(map[string]string{"dashboard-host": "0.0.0.0"}). - WithTemplate(corev1ac.PodTemplateSpec(). - WithSpec(corev1ac.PodSpec(). - WithContainers(corev1ac.Container(). - WithName("ray-head"). - WithImage(rayClusterObject.Image). - WithResources(corev1ac.ResourceRequirements(). - WithRequests(corev1.ResourceList{ - corev1.ResourceCPU: resource.MustParse(rayClusterObject.HeadCPU), - corev1.ResourceMemory: resource.MustParse(rayClusterObject.HeadMemory), - }). - WithLimits(corev1.ResourceList{ - corev1.ResourceCPU: resource.MustParse(rayClusterObject.HeadCPU), - corev1.ResourceMemory: resource.MustParse(rayClusterObject.HeadMemory), - })). - WithPorts(corev1ac.ContainerPort().WithContainerPort(6379).WithName("gcs-server"), - corev1ac.ContainerPort().WithContainerPort(8265).WithName("dashboard"), - corev1ac.ContainerPort().WithContainerPort(10001).WithName("client")))))). - WithWorkerGroupSpecs(rayv1ac.WorkerGroupSpec(). - WithRayStartParams(map[string]string{"metrics-export-port": "8080"}). - WithGroupName(rayClusterObject.WorkerGrpName). - WithReplicas(rayClusterObject.WorkerReplicas). - WithTemplate(corev1ac.PodTemplateSpec(). - WithSpec(corev1ac.PodSpec(). - WithContainers(corev1ac.Container(). - WithName("ray-worker"). - WithImage(rayClusterObject.Image). - WithResources(corev1ac.ResourceRequirements(). - WithRequests(corev1.ResourceList{ - corev1.ResourceCPU: resource.MustParse(rayClusterObject.HeadCPU), - corev1.ResourceMemory: resource.MustParse(rayClusterObject.HeadMemory), - }). - WithLimits(corev1.ResourceList{ - corev1.ResourceCPU: resource.MustParse(rayClusterObject.HeadCPU), - corev1.ResourceMemory: resource.MustParse(rayClusterObject.HeadMemory), - }))))))) - - return rayClusterApplyConfig, nil + rayClusterSpec := rayv1ac.RayClusterSpec(). + WithRayVersion(rayClusterSpecObject.RayVersion). + WithHeadGroupSpec(rayv1ac.HeadGroupSpec(). + WithRayStartParams(map[string]string{"dashboard-host": "0.0.0.0"}). + WithTemplate(corev1ac.PodTemplateSpec(). + WithSpec(corev1ac.PodSpec(). + WithContainers(corev1ac.Container(). + WithName("ray-head"). + WithImage(rayClusterSpecObject.Image). + WithResources(corev1ac.ResourceRequirements(). + WithRequests(corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse(rayClusterSpecObject.HeadCPU), + corev1.ResourceMemory: resource.MustParse(rayClusterSpecObject.HeadMemory), + }). + WithLimits(corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse(rayClusterSpecObject.HeadCPU), + corev1.ResourceMemory: resource.MustParse(rayClusterSpecObject.HeadMemory), + })). + WithPorts(corev1ac.ContainerPort().WithContainerPort(6379).WithName("gcs-server"), + corev1ac.ContainerPort().WithContainerPort(8265).WithName("dashboard"), + corev1ac.ContainerPort().WithContainerPort(10001).WithName("client")))))). + WithWorkerGroupSpecs(rayv1ac.WorkerGroupSpec(). + WithRayStartParams(map[string]string{"metrics-export-port": "8080"}). + WithGroupName(rayClusterSpecObject.WorkerGrpName). + WithReplicas(rayClusterSpecObject.WorkerReplicas). + WithTemplate(corev1ac.PodTemplateSpec(). + WithSpec(corev1ac.PodSpec(). + WithContainers(corev1ac.Container(). + WithName("ray-worker"). + WithImage(rayClusterSpecObject.Image). + WithResources(corev1ac.ResourceRequirements(). + WithRequests(corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse(rayClusterSpecObject.HeadCPU), + corev1.ResourceMemory: resource.MustParse(rayClusterSpecObject.HeadMemory), + }). + WithLimits(corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse(rayClusterSpecObject.HeadCPU), + corev1.ResourceMemory: resource.MustParse(rayClusterSpecObject.HeadMemory), + })))))) + + // Lifecycle cannot be empty, an empty lifecycle will stop pod startup so this will add lifecycle if its not empty + if len(rayClusterSpecObject.WorkerLifecyclePrestopExecComand) > 0 { + rayClusterSpec.WorkerGroupSpecs[0].Template.Spec.Containers[0].Lifecycle = corev1ac.Lifecycle(). + WithPreStop(corev1ac.LifecycleHandler(). + WithExec(corev1ac.ExecAction(). + WithCommand(rayClusterSpecObject.WorkerLifecyclePrestopExecComand...))) + } + if len(rayClusterSpecObject.HeadLifecyclePrestopExecCommand) > 0 { + rayClusterSpec.HeadGroupSpec.Template.Spec.Containers[0].Lifecycle = corev1ac.Lifecycle(). + WithPreStop(corev1ac.LifecycleHandler(). + WithExec(corev1ac.ExecAction(). + WithCommand(rayClusterSpecObject.HeadLifecyclePrestopExecCommand...))) + } + return rayClusterSpec } // Converts RayClusterApplyConfiguration object into a yaml string func ConvertRayClusterApplyConfigToYaml(rayClusterac *rayv1ac.RayClusterApplyConfiguration) (string, error) { - var resource map[string]interface{} resource, err := runtime.DefaultUnstructuredConverter.ToUnstructured(rayClusterac) if err != nil { return "", err @@ -88,3 +128,18 @@ func ConvertRayClusterApplyConfigToYaml(rayClusterac *rayv1ac.RayClusterApplyCon return string(podByte), nil } + +// Converts RayJobApplyConfiguration object into a yaml string +func ConvertRayJobApplyConfigToYaml(rayJobac *rayv1ac.RayJobApplyConfiguration) (string, error) { + resource, err := runtime.DefaultUnstructuredConverter.ToUnstructured(rayJobac) + if err != nil { + return "", err + } + + podByte, err := yaml.Marshal(resource) + if err != nil { + return "", err + } + + return string(podByte), nil +} diff --git a/kubectl-plugin/pkg/util/generation/generation_test.go b/kubectl-plugin/pkg/util/generation/generation_test.go index f493580153..09238a5246 100644 --- a/kubectl-plugin/pkg/util/generation/generation_test.go +++ b/kubectl-plugin/pkg/util/generation/generation_test.go @@ -6,24 +6,27 @@ import ( "github.com/stretchr/testify/assert" "k8s.io/apimachinery/pkg/api/resource" + + rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" ) func TestGenerateRayCluterApplyConfig(t *testing.T) { testRayClusterYamlObject := RayClusterYamlObject{ - ClusterName: "test-ray-cluster", - Namespace: "default", - RayVersion: "2.39.0", - Image: "rayproject/ray:2.39.0", - HeadCPU: "1", - HeadMemory: "5Gi", - WorkerGrpName: "worker-group1", - WorkerReplicas: 3, - WorkerCPU: "1", - WorkerMemory: "5Gi", + ClusterName: "test-ray-cluster", + Namespace: "default", + RayClusterSpecObject: RayClusterSpecObject{ + RayVersion: "2.39.0", + Image: "rayproject/ray:2.39.0", + HeadCPU: "1", + HeadMemory: "5Gi", + WorkerGrpName: "worker-group1", + WorkerReplicas: 3, + WorkerCPU: "1", + WorkerMemory: "5Gi", + }, } - result, err := testRayClusterYamlObject.GenerateRayClusterApplyConfig() - assert.Nil(t, err) + result := testRayClusterYamlObject.GenerateRayClusterApplyConfig() assert.Equal(t, testRayClusterYamlObject.ClusterName, *result.Name) assert.Equal(t, testRayClusterYamlObject.Namespace, *result.Namespace) @@ -37,22 +40,55 @@ func TestGenerateRayCluterApplyConfig(t *testing.T) { assert.Equal(t, resource.MustParse(testRayClusterYamlObject.WorkerMemory), *result.Spec.WorkerGroupSpecs[0].Template.Spec.Containers[0].Resources.Requests.Memory()) } +func TestGenerateRayJobApplyConfig(t *testing.T) { + testRayJobYamlObject := RayJobYamlObject{ + RayJobName: "test-ray-job", + Namespace: "default", + SubmissionMode: "InteractiveMode", + RayClusterSpecObject: RayClusterSpecObject{ + RayVersion: "2.39.0", + Image: "rayproject/ray:2.39.0", + HeadCPU: "1", + HeadMemory: "5Gi", + WorkerGrpName: "worker-group1", + WorkerReplicas: 3, + WorkerCPU: "1", + WorkerMemory: "5Gi", + }, + } + + result := testRayJobYamlObject.GenerateRayJobApplyConfig() + + assert.Equal(t, testRayJobYamlObject.RayJobName, *result.Name) + assert.Equal(t, testRayJobYamlObject.Namespace, *result.Namespace) + assert.Equal(t, rayv1.JobSubmissionMode(testRayJobYamlObject.SubmissionMode), *result.Spec.SubmissionMode) + assert.Equal(t, testRayJobYamlObject.RayVersion, *result.Spec.RayClusterSpec.RayVersion) + assert.Equal(t, testRayJobYamlObject.Image, *result.Spec.RayClusterSpec.HeadGroupSpec.Template.Spec.Containers[0].Image) + assert.Equal(t, resource.MustParse(testRayJobYamlObject.HeadCPU), *result.Spec.RayClusterSpec.HeadGroupSpec.Template.Spec.Containers[0].Resources.Requests.Cpu()) + assert.Equal(t, resource.MustParse(testRayJobYamlObject.HeadMemory), *result.Spec.RayClusterSpec.HeadGroupSpec.Template.Spec.Containers[0].Resources.Requests.Memory()) + assert.Equal(t, testRayJobYamlObject.WorkerGrpName, *result.Spec.RayClusterSpec.WorkerGroupSpecs[0].GroupName) + assert.Equal(t, testRayJobYamlObject.WorkerReplicas, *result.Spec.RayClusterSpec.WorkerGroupSpecs[0].Replicas) + assert.Equal(t, resource.MustParse(testRayJobYamlObject.WorkerCPU), *result.Spec.RayClusterSpec.WorkerGroupSpecs[0].Template.Spec.Containers[0].Resources.Requests.Cpu()) + assert.Equal(t, resource.MustParse(testRayJobYamlObject.WorkerMemory), *result.Spec.RayClusterSpec.WorkerGroupSpecs[0].Template.Spec.Containers[0].Resources.Requests.Memory()) +} + func TestConvertRayClusterApplyConfigToYaml(t *testing.T) { testRayClusterYamlObject := RayClusterYamlObject{ - ClusterName: "test-ray-cluster", - Namespace: "default", - RayVersion: "2.39.0", - Image: "rayproject/ray:2.39.0", - HeadCPU: "1", - HeadMemory: "5Gi", - WorkerGrpName: "worker-group1", - WorkerReplicas: 3, - WorkerCPU: "1", - WorkerMemory: "5Gi", + ClusterName: "test-ray-cluster", + Namespace: "default", + RayClusterSpecObject: RayClusterSpecObject{ + RayVersion: "2.39.0", + Image: "rayproject/ray:2.39.0", + HeadCPU: "1", + HeadMemory: "5Gi", + WorkerGrpName: "worker-group1", + WorkerReplicas: 3, + WorkerCPU: "1", + WorkerMemory: "5Gi", + }, } - result, err := testRayClusterYamlObject.GenerateRayClusterApplyConfig() - assert.Nil(t, err) + result := testRayClusterYamlObject.GenerateRayClusterApplyConfig() resultString, err := ConvertRayClusterApplyConfigToYaml(result) assert.Nil(t, err)