Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[kubectl-plugin] Add rayjob yaml generation to ray job submit command #2644

Merged
merged 1 commit into from
Dec 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion kubectl-plugin/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ require (
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510
github.com/onsi/ginkgo/v2 v2.20.2
github.com/onsi/gomega v1.34.2
github.com/ray-project/kuberay/ray-operator v1.2.1
github.com/ray-project/kuberay/ray-operator v1.2.2
github.com/spf13/cobra v1.8.1
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.9.0
Expand Down
4 changes: 2 additions & 2 deletions kubectl-plugin/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 13 additions & 14 deletions kubectl-plugin/pkg/cmd/create/create_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,22 +119,21 @@ func (options *CreateClusterOptions) Run(ctx context.Context, factory cmdutil.Fa

// Will generate yaml file
rayClusterObject := generation.RayClusterYamlObject{
Namespace: *options.configFlags.Namespace,
ClusterName: options.clusterName,
RayVersion: options.rayVersion,
Image: options.image,
HeadCPU: options.headCPU,
HeadMemory: options.headMemory,
WorkerGrpName: options.workerGrpName,
WorkerReplicas: options.workerReplicas,
WorkerCPU: options.workerCPU,
WorkerMemory: options.workerMemory,
Namespace: *options.configFlags.Namespace,
ClusterName: options.clusterName,
RayClusterSpecObject: generation.RayClusterSpecObject{
RayVersion: options.rayVersion,
Image: options.image,
HeadCPU: options.headCPU,
HeadMemory: options.headMemory,
WorkerGrpName: options.workerGrpName,
WorkerReplicas: options.workerReplicas,
WorkerCPU: options.workerCPU,
chiayi marked this conversation as resolved.
Show resolved Hide resolved
WorkerMemory: options.workerMemory,
},
}

rayClusterac, err := rayClusterObject.GenerateRayClusterApplyConfig()
if err != nil {
return err
}
rayClusterac := rayClusterObject.GenerateRayClusterApplyConfig()

// If dry run is enabled, it will call the yaml converter and print out the yaml
if options.dryRun {
Expand Down
151 changes: 113 additions & 38 deletions kubectl-plugin/pkg/cmd/job/job_submit.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/google/shlex"
"github.com/ray-project/kuberay/kubectl-plugin/pkg/util"
"github.com/ray-project/kuberay/kubectl-plugin/pkg/util/client"
"github.com/ray-project/kuberay/kubectl-plugin/pkg/util/generation"
"github.com/spf13/cobra"

rayv1api "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
Expand Down Expand Up @@ -56,10 +57,20 @@ type SubmitJobOptions struct {
metadataJson string
logStyle string
logColor string
rayjobName string
rayVersion string
image string
headCPU string
headMemory string
workerGrpName string
workerCPU string
workerMemory string
entryPointCPU float32
entryPointGPU float32
entryPointMemory int
workerReplicas int32
noWait bool
dryRun bool
}

type RayJob struct {
Expand All @@ -71,6 +82,8 @@ var (
Submit ray job to ray cluster as one would using ray CLI e.g. 'ray job submit ENTRYPOINT'. Command supports all options that 'ray job submit' supports, except '--address'.
If RayCluster is already setup, use 'kubectl ray session' instead.

If no rayjob yaml file is specified, the command will create a default rayjob for the user.

Command will apply RayJob CR and also submit the ray job. RayJob CR is required.
`)

Expand All @@ -83,6 +96,15 @@ var (

# Submit ray job with runtime Env file assuming runtime-env has working_dir set
kubectl ray job submit -f rayjob.yaml --runtime-env path/to/runtimeEnv.yaml -- python my_script.py

# Submit generated ray job with default values and with runtime Env file and working directory
kubectl ray job submit --name rayjob-sample --working-dir /path/to/working-dir/ --runtime-env /runtimeEnv.yaml -- python my_script.py

# Generate ray job with specifications and submit ray job with runtime Env file and working directory
kubectl ray job submit --name rayjob-sample --ray-version 2.39.0 --image rayproject/ray:2.39.0 --head-cpu 1 --head-memory 5Gi --worker-grp-name worker-group1 --worker-replicas 3 --worker-cpu 1 --worker-memory 5Gi --runtime-env path/to/runtimeEnv.yaml -- python my_script.py

# Generate ray job with specifications and print out the generated rayjob in yaml format
kubectl ray job submit --dry-run --name rayjob-sample --ray-version 2.39.0 --image rayproject/ray:2.39.0 --head-cpu 1 --head-memory 5Gi --worker-grp-name worker-group1 --worker-replicas 3 --worker-cpu 1 --worker-memory 5Gi --runtime-env path/to/runtimeEnv.yaml -- python my_script.py
`)
)

Expand Down Expand Up @@ -127,15 +149,23 @@ func NewJobSubmitCommand(streams genericclioptions.IOStreams) *cobra.Command {
cmd.Flags().StringVar(&options.entryPointResource, "entrypoint-resources", options.entryPointResource, "JSON-serialized dictionary mapping resource name to resource quantity")
cmd.Flags().StringVar(&options.metadataJson, "metadata-json", options.metadataJson, "JSON-serialized dictionary of metadata to attach to the job.")
cmd.Flags().StringVar(&options.logStyle, "log-style", options.logStyle, "Specific to 'ray job submit'. Options are 'auto | record | pretty'")
cmd.Flags().StringVar(&options.logColor, "log-clor", options.logColor, "Specifc to 'ray job submit'. Options are 'auto | false | true'")
cmd.Flags().StringVar(&options.logColor, "log-color", options.logColor, "Specific to 'ray job submit'. Options are 'auto | false | true'")
cmd.Flags().Float32Var(&options.entryPointCPU, "entrypoint-num-cpus", options.entryPointCPU, "Number of CPU reserved for the for the entrypoint command")
cmd.Flags().Float32Var(&options.entryPointGPU, "entrypoint-num-gpus", options.entryPointGPU, "Number of GPU reserved for the for the entrypoint command")
cmd.Flags().IntVar(&options.entryPointMemory, "entrypoint-memory", options.entryPointMemory, "Amount of memory reserved for the entrypoint command")
cmd.Flags().BoolVar(&options.noWait, "no-wait", options.noWait, "If present, will not stream logs and wait for job to finish")
err := cmd.MarkFlagRequired("filename")
if err != nil {
log.Fatalf("Failed to mark flag as required %v", err)
}

cmd.Flags().StringVar(&options.rayjobName, "name", "", "Name of the ray job that will be generated")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any way we can consolidate these flags into a single place with the flags also used in kubectl ray create cluster?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know if it's possible, I think we can try sharing the flags in some way. For example, setting the "parent" command to have these flags and this way all the "child" commands will as well.

cmd.Flags().StringVar(&options.rayVersion, "ray-version", "2.39.0", "Ray Version to use in the Ray Cluster yaml.")
cmd.Flags().StringVar(&options.image, "image", "rayproject/ray:2.39.0", "Ray image to use in the Ray Cluster yaml")
cmd.Flags().StringVar(&options.headCPU, "head-cpu", "2", "Number of CPU for the ray head")
cmd.Flags().StringVar(&options.headMemory, "head-memory", "4Gi", "Amount of memory to use for the ray head")
cmd.Flags().StringVar(&options.workerGrpName, "worker-grp-name", "default-group", "Name of the worker group for the Ray Cluster")
cmd.Flags().Int32Var(&options.workerReplicas, "worker-replicas", 1, "Number of the worker group replicas")
cmd.Flags().StringVar(&options.workerCPU, "worker-cpu", "2", "Number of CPU for the ray worker")
cmd.Flags().StringVar(&options.workerMemory, "worker-memory", "4Gi", "Amount of memory to use for the ray worker")
cmd.Flags().BoolVar(&options.dryRun, "dry-run", false, "Will not apply the generated cluster and will print out the generated yaml. Only works when filename is not provided")

options.configFlags.AddFlags(cmd.Flags())
return cmd
}
Expand All @@ -149,7 +179,9 @@ func (options *SubmitJobOptions) Complete() error {
options.runtimeEnv = filepath.Clean(options.runtimeEnv)
}

options.fileName = filepath.Clean(options.fileName)
if options.fileName != "" {
options.fileName = filepath.Clean(options.fileName)
}
return nil
}

Expand Down Expand Up @@ -182,39 +214,44 @@ func (options *SubmitJobOptions) Validate() error {
}
}

info, err := os.Stat(options.fileName)
if os.IsNotExist(err) {
return fmt.Errorf("Ray Job file does not exist. Failed with: %w", err)
} else if err != nil {
return fmt.Errorf("Error occurred when checking ray job file: %w", err)
} else if !info.Mode().IsRegular() {
return fmt.Errorf("Filename given is not a regular file. Failed with: %w", err)
}
// Take care of case where there is a filename input
if options.fileName != "" {
info, err := os.Stat(options.fileName)
if os.IsNotExist(err) {
return fmt.Errorf("Ray Job file does not exist. Failed with: %w", err)
} else if err != nil {
return fmt.Errorf("Error occurred when checking ray job file: %w", err)
} else if !info.Mode().IsRegular() {
return fmt.Errorf("Filename given is not a regular file. Failed with: %w", err)
}

options.RayJob, err = decodeRayJobYaml(options.fileName)
if err != nil {
return fmt.Errorf("Failed to decode RayJob Yaml: %w", err)
}
options.RayJob, err = decodeRayJobYaml(options.fileName)
if err != nil {
return fmt.Errorf("Failed to decode RayJob Yaml: %w", err)
}

submissionMode, ok := options.RayJob.Object["spec"].(map[string]interface{})["submissionMode"]
if !ok {
return fmt.Errorf("RayJob does not have `submissionMode` field set")
}
if submissionMode != nil {
if submissionMode != "InteractiveMode" {
return fmt.Errorf("Submission mode of the Ray Job is not supported")
submissionMode, ok := options.RayJob.Object["spec"].(map[string]interface{})["submissionMode"]
if !ok {
return fmt.Errorf("RayJob does not have `submissionMode` field set")
}
if submissionMode != nil {
if submissionMode != "InteractiveMode" {
return fmt.Errorf("Submission mode of the Ray Job is not supported")
}
} else {
return fmt.Errorf("Submission mode must be set to 'InteractiveMode'")
}
} else {
return fmt.Errorf("Submission mode must be set to 'InteractiveMode'")
}

runtimeEnvYaml, ok := options.RayJob.Object["spec"].(map[string]interface{})["runtimeEnvYAML"].(string)
if ok && options.runtimeEnv == "" && options.runtimeEnvJson == "" {
runtimeJson, err := yaml.YAMLToJSON([]byte(runtimeEnvYaml))
if err != nil {
return fmt.Errorf("Failed to convert runtime env to json: %w", err)
runtimeEnvYaml, ok := options.RayJob.Object["spec"].(map[string]interface{})["runtimeEnvYAML"].(string)
if ok && options.runtimeEnv == "" && options.runtimeEnvJson == "" {
runtimeJson, err := yaml.YAMLToJSON([]byte(runtimeEnvYaml))
if err != nil {
return fmt.Errorf("Failed to convert runtime env to json: %w", err)
}
options.runtimeEnvJson = string(runtimeJson)
}
options.runtimeEnvJson = string(runtimeJson)
} else if strings.TrimSpace(options.rayjobName) == "" {
return fmt.Errorf("Must set either yaml file (--filename) or set ray job name (--name)")
}

if options.workingDir == "" {
Expand All @@ -232,10 +269,48 @@ func (options *SubmitJobOptions) Run(ctx context.Context, factory cmdutil.Factor
return fmt.Errorf("failed to initialize clientset: %w", err)
}

// createdRayJob, err = k8sClients.CreateRayCustomResource(ctx, util.RayJob, options.configFlags.Namespace, unstructuredRayjob)
options.RayJob, err = k8sClients.DynamicClient().Resource(util.RayJobGVR).Namespace(*options.configFlags.Namespace).Create(ctx, options.RayJob, v1.CreateOptions{})
if err != nil {
return fmt.Errorf("Error when creating RayJob CR: %w", err)
if options.fileName == "" {
// Genarate the ray job.
rayJobObject := generation.RayJobYamlObject{
RayJobName: options.rayjobName,
Namespace: *options.configFlags.Namespace,
SubmissionMode: "InteractiveMode",
RayClusterSpecObject: generation.RayClusterSpecObject{
RayVersion: options.rayVersion,
Image: options.image,
HeadCPU: options.headCPU,
HeadMemory: options.headMemory,
WorkerGrpName: options.workerGrpName,
WorkerCPU: options.workerCPU,
WorkerMemory: options.workerMemory,
WorkerReplicas: options.workerReplicas,
},
}
rayJobApplyConfig := rayJobObject.GenerateRayJobApplyConfig()

// Print out the yaml if it is a dry run
if options.dryRun {
resultYaml, err := generation.ConvertRayJobApplyConfigToYaml(rayJobApplyConfig)
if err != nil {
return fmt.Errorf("Failed to convert rayjob into yaml format: %w", err)
}

fmt.Printf("%s\n", resultYaml)
return nil
}

// Apply the generated yaml
rayJobApplyConfigResult, err := k8sClients.RayClient().RayV1().RayJobs(*options.configFlags.Namespace).Apply(ctx, rayJobApplyConfig, v1.ApplyOptions{FieldManager: "ray-kubectl-plugin"})
if err != nil {
return fmt.Errorf("Failed to apply generated yaml: %w", err)
}
options.RayJob = &unstructured.Unstructured{}
options.RayJob.SetName(rayJobApplyConfigResult.Name)
} else {
options.RayJob, err = k8sClients.DynamicClient().Resource(util.RayJobGVR).Namespace(*options.configFlags.Namespace).Create(ctx, options.RayJob, v1.CreateOptions{})
if err != nil {
return fmt.Errorf("Error when creating RayJob CR: %w", err)
}
}
fmt.Printf("Submitted RayJob %s.\n", options.RayJob.GetName())

Expand Down
Loading
Loading