Skip to content

Commit

Permalink
Add rayjob yaml generation to ray job submit command
Browse files Browse the repository at this point in the history
  • Loading branch information
chiayi committed Dec 13, 2024
1 parent 8ec59e5 commit 99a45e8
Show file tree
Hide file tree
Showing 6 changed files with 300 additions and 133 deletions.
2 changes: 1 addition & 1 deletion kubectl-plugin/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ require (
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510
github.com/onsi/ginkgo/v2 v2.20.2
github.com/onsi/gomega v1.34.2
github.com/ray-project/kuberay/ray-operator v1.2.1
github.com/ray-project/kuberay/ray-operator v1.2.2
github.com/spf13/cobra v1.8.1
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.9.0
Expand Down
4 changes: 2 additions & 2 deletions kubectl-plugin/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 13 additions & 14 deletions kubectl-plugin/pkg/cmd/create/create_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,22 +119,21 @@ func (options *CreateClusterOptions) Run(ctx context.Context, factory cmdutil.Fa

// Will generate yaml file
rayClusterObject := generation.RayClusterYamlObject{
Namespace: *options.configFlags.Namespace,
ClusterName: options.clusterName,
RayVersion: options.rayVersion,
Image: options.image,
HeadCPU: options.headCPU,
HeadMemory: options.headMemory,
WorkerGrpName: options.workerGrpName,
WorkerReplicas: options.workerReplicas,
WorkerCPU: options.workerCPU,
WorkerMemory: options.workerMemory,
Namespace: *options.configFlags.Namespace,
ClusterName: options.clusterName,
RayClusterSpecObject: generation.RayClusterSpecObject{
RayVersion: options.rayVersion,
Image: options.image,
HeadCPU: options.headCPU,
HeadMemory: options.headMemory,
WorkerGrpName: options.workerGrpName,
WorkerReplicas: options.workerReplicas,
WorkerCPU: options.workerCPU,
WorkerMemory: options.workerMemory,
},
}

rayClusterac, err := rayClusterObject.GenerateRayClusterApplyConfig()
if err != nil {
return err
}
rayClusterac := rayClusterObject.GenerateRayClusterApplyConfig()

// If dry run is enabled, it will call the yaml converter and print out the yaml
if options.dryRun {
Expand Down
153 changes: 115 additions & 38 deletions kubectl-plugin/pkg/cmd/job/job_submit.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/google/shlex"
"github.com/ray-project/kuberay/kubectl-plugin/pkg/util"
"github.com/ray-project/kuberay/kubectl-plugin/pkg/util/client"
"github.com/ray-project/kuberay/kubectl-plugin/pkg/util/generation"
"github.com/spf13/cobra"

rayv1api "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
Expand Down Expand Up @@ -56,10 +57,20 @@ type SubmitJobOptions struct {
metadataJson string
logStyle string
logColor string
rayjobName string
rayVersion string
image string
headCPU string
headMemory string
workerGrpName string
workerCPU string
workerMemory string
entryPointCPU float32
entryPointGPU float32
entryPointMemory int
workerReplicas int32
noWait bool
dryRun bool
}

type RayJob struct {
Expand All @@ -71,6 +82,8 @@ var (
Submit ray job to ray cluster as one would using ray CLI e.g. 'ray job submit ENTRYPOINT'. Command supports all options that 'ray job submit' supports, except '--address'.
If RayCluster is already setup, use 'kubectl ray session' instead.
If no rayjob yaml file is specified, the command will create a default rayjob for the user.
Command will apply RayJob CR and also submit the ray job. RayJob CR is required.
`)

Expand All @@ -83,6 +96,15 @@ var (
# Submit ray job with runtime Env file assuming runtime-env has working_dir set
kubectl ray job submit -f rayjob.yaml --runtime-env path/to/runtimeEnv.yaml -- python my_script.py
# Submit generated ray job with default values and with runtime Env file and working directory
kubectl ray job submit --rayjob-name rayjob-sample --working-dir /path/to/working-dir/ --runtime-env /runtimeEnv.yaml -- python my_script.py
# Generate ray job with specifications and submit ray job with runtime Env file and working directory
kubectl ray job submit --rayjob-name rayjob-sample --ray-version 2.39.0 --image rayproject/ray:2.39.0 --head-cpu 1 --head-memory 5Gi --worker-grp-name worker-group1 --worker-replicas 3 --worker-cpu 1 --worker-memory 5Gi --runtime-env path/to/runtimeEnv.yaml -- python my_script.py
# Generate ray job with specifications and print out the generated rayjob in yaml format
kubectl ray job submit --dryrun --rayjob-name rayjob-sample --ray-version 2.39.0 --image rayproject/ray:2.39.0 --head-cpu 1 --head-memory 5Gi --worker-grp-name worker-group1 --worker-replicas 3 --worker-cpu 1 --worker-memory 5Gi --runtime-env path/to/runtimeEnv.yaml -- python my_script.py
`)
)

Expand Down Expand Up @@ -127,15 +149,23 @@ func NewJobSubmitCommand(streams genericclioptions.IOStreams) *cobra.Command {
cmd.Flags().StringVar(&options.entryPointResource, "entrypoint-resources", options.entryPointResource, "JSON-serialized dictionary mapping resource name to resource quantity")
cmd.Flags().StringVar(&options.metadataJson, "metadata-json", options.metadataJson, "JSON-serialized dictionary of metadata to attach to the job.")
cmd.Flags().StringVar(&options.logStyle, "log-style", options.logStyle, "Specific to 'ray job submit'. Options are 'auto | record | pretty'")
cmd.Flags().StringVar(&options.logColor, "log-clor", options.logColor, "Specifc to 'ray job submit'. Options are 'auto | false | true'")
cmd.Flags().StringVar(&options.logColor, "log-color", options.logColor, "Specific to 'ray job submit'. Options are 'auto | false | true'")
cmd.Flags().Float32Var(&options.entryPointCPU, "entrypoint-num-cpus", options.entryPointCPU, "Number of CPU reserved for the for the entrypoint command")
cmd.Flags().Float32Var(&options.entryPointGPU, "entrypoint-num-gpus", options.entryPointGPU, "Number of GPU reserved for the for the entrypoint command")
cmd.Flags().IntVar(&options.entryPointMemory, "entrypoint-memory", options.entryPointMemory, "Amount of memory reserved for the entrypoint command")
cmd.Flags().BoolVar(&options.noWait, "no-wait", options.noWait, "If present, will not stream logs and wait for job to finish")
err := cmd.MarkFlagRequired("filename")
if err != nil {
log.Fatalf("Failed to mark flag as required %v", err)
}

cmd.Flags().StringVar(&options.rayjobName, "rayjob-name", "", "Name of the ray job that will be generated")
cmd.Flags().StringVar(&options.rayVersion, "ray-version", "2.39.0", "Ray Version to use in the Ray Cluster yaml.")
cmd.Flags().StringVar(&options.image, "image", "rayproject/ray:2.39.0", "Ray image to use in the Ray Cluster yaml")
cmd.Flags().StringVar(&options.headCPU, "head-cpu", "2", "Number of CPU for the ray head")
cmd.Flags().StringVar(&options.headMemory, "head-memory", "4Gi", "Amount of memory to use for the ray head")
cmd.Flags().StringVar(&options.workerGrpName, "worker-grp-name", "default-group", "Name of the worker group for the Ray Cluster")
cmd.Flags().Int32Var(&options.workerReplicas, "worker-replicas", 1, "Number of the worker group replicas")
cmd.Flags().StringVar(&options.workerCPU, "worker-cpu", "2", "Number of CPU for the ray worker")
cmd.Flags().StringVar(&options.workerMemory, "worker-memory", "4Gi", "Amount of memory to use for the ray worker")
cmd.Flags().BoolVar(&options.dryRun, "dry-run", false, "Will not apply the generated cluster and will print out the generated yaml. Only works when yaml is generated")

options.configFlags.AddFlags(cmd.Flags())
return cmd
}
Expand All @@ -149,7 +179,9 @@ func (options *SubmitJobOptions) Complete() error {
options.runtimeEnv = filepath.Clean(options.runtimeEnv)
}

options.fileName = filepath.Clean(options.fileName)
if options.fileName != "" {
options.fileName = filepath.Clean(options.fileName)
}
return nil
}

Expand Down Expand Up @@ -182,39 +214,44 @@ func (options *SubmitJobOptions) Validate() error {
}
}

info, err := os.Stat(options.fileName)
if os.IsNotExist(err) {
return fmt.Errorf("Ray Job file does not exist. Failed with: %w", err)
} else if err != nil {
return fmt.Errorf("Error occurred when checking ray job file: %w", err)
} else if !info.Mode().IsRegular() {
return fmt.Errorf("Filename given is not a regular file. Failed with: %w", err)
}
// Take care of case where there is a filename input
if options.fileName != "" {
info, err := os.Stat(options.fileName)
if os.IsNotExist(err) {
return fmt.Errorf("Ray Job file does not exist. Failed with: %w", err)
} else if err != nil {
return fmt.Errorf("Error occurred when checking ray job file: %w", err)
} else if !info.Mode().IsRegular() {
return fmt.Errorf("Filename given is not a regular file. Failed with: %w", err)
}

options.RayJob, err = decodeRayJobYaml(options.fileName)
if err != nil {
return fmt.Errorf("Failed to decode RayJob Yaml: %w", err)
}
options.RayJob, err = decodeRayJobYaml(options.fileName)
if err != nil {
return fmt.Errorf("Failed to decode RayJob Yaml: %w", err)
}

submissionMode, ok := options.RayJob.Object["spec"].(map[string]interface{})["submissionMode"]
if !ok {
return fmt.Errorf("RayJob does not have `submissionMode` field set")
}
if submissionMode != nil {
if submissionMode != "InteractiveMode" {
return fmt.Errorf("Submission mode of the Ray Job is not supported")
submissionMode, ok := options.RayJob.Object["spec"].(map[string]interface{})["submissionMode"]
if !ok {
return fmt.Errorf("RayJob does not have `submissionMode` field set")
}
if submissionMode != nil {
if submissionMode != "InteractiveMode" {
return fmt.Errorf("Submission mode of the Ray Job is not supported")
}
} else {
return fmt.Errorf("Submission mode must be set to 'InteractiveMode'")
}
} else {
return fmt.Errorf("Submission mode must be set to 'InteractiveMode'")
}

runtimeEnvYaml, ok := options.RayJob.Object["spec"].(map[string]interface{})["runtimeEnvYAML"].(string)
if ok && options.runtimeEnv == "" && options.runtimeEnvJson == "" {
runtimeJson, err := yaml.YAMLToJSON([]byte(runtimeEnvYaml))
if err != nil {
return fmt.Errorf("Failed to convert runtime env to json: %w", err)
runtimeEnvYaml, ok := options.RayJob.Object["spec"].(map[string]interface{})["runtimeEnvYAML"].(string)
if ok && options.runtimeEnv == "" && options.runtimeEnvJson == "" {
runtimeJson, err := yaml.YAMLToJSON([]byte(runtimeEnvYaml))
if err != nil {
return fmt.Errorf("Failed to convert runtime env to json: %w", err)
}
options.runtimeEnvJson = string(runtimeJson)
}
options.runtimeEnvJson = string(runtimeJson)
} else if strings.TrimSpace(options.rayjobName) == "" {
return fmt.Errorf("Must set either yaml file (--filename) or set ray job name (--rayjob-name)")
}

if options.workingDir == "" {
Expand All @@ -232,10 +269,50 @@ func (options *SubmitJobOptions) Run(ctx context.Context, factory cmdutil.Factor
return fmt.Errorf("failed to initialize clientset: %w", err)
}

// createdRayJob, err = k8sClients.CreateRayCustomResource(ctx, util.RayJob, options.configFlags.Namespace, unstructuredRayjob)
options.RayJob, err = k8sClients.DynamicClient().Resource(util.RayJobGVR).Namespace(*options.configFlags.Namespace).Create(ctx, options.RayJob, v1.CreateOptions{})
if err != nil {
return fmt.Errorf("Error when creating RayJob CR: %w", err)
if options.fileName == "" {
// Genarate the ray job.
rayJobObject := generation.RayJobYamlObject{
RayJobName: options.rayjobName,
Namespace: *options.configFlags.Namespace,
SubmissionMode: "InteractiveMode",
RayClusterSpecObject: generation.RayClusterSpecObject{
RayVersion: options.rayVersion,
Image: options.image,
HeadCPU: options.headCPU,
HeadMemory: options.headMemory,
WorkerGrpName: options.workerGrpName,
WorkerCPU: options.workerCPU,
WorkerMemory: options.workerMemory,
WorkerReplicas: options.workerReplicas,
// This is here to match the existing rayjob sample.
WorkerLifecyclePrestopExecComand: []string{"/bin/sh", "-c", "ray stop"},
},
}
rayJobApplyConfig := rayJobObject.GenerateRayJobApplyConfig()

// Print out the yaml if it is a dry run
if options.dryRun {
resultYaml, err := generation.ConvertRayJobApplyConfigToYaml(rayJobApplyConfig)
if err != nil {
return fmt.Errorf("Failed to convert rayjob into yaml format: %w", err)
}

fmt.Printf("%s\n", resultYaml)
return nil
}

// Apply the generated yaml
rayJobApplyConfigResult, err := k8sClients.RayClient().RayV1().RayJobs(*options.configFlags.Namespace).Apply(ctx, rayJobApplyConfig, v1.ApplyOptions{FieldManager: "kubectl-plugin"})
if err != nil {
return fmt.Errorf("Failed to apply generated yaml: %w", err)
}
options.RayJob = &unstructured.Unstructured{}
options.RayJob.SetName(rayJobApplyConfigResult.Name)
} else {
options.RayJob, err = k8sClients.DynamicClient().Resource(util.RayJobGVR).Namespace(*options.configFlags.Namespace).Create(ctx, options.RayJob, v1.CreateOptions{})
if err != nil {
return fmt.Errorf("Error when creating RayJob CR: %w", err)
}
}
fmt.Printf("Submitted RayJob %s.\n", options.RayJob.GetName())

Expand Down
Loading

0 comments on commit 99a45e8

Please sign in to comment.