Skip to content

Commit

Permalink
[RayJob] UserMode -> InteractiveMode and check rayjob.spec.jobId inst…
Browse files Browse the repository at this point in the history
…ead of annotation (#2446)

Signed-off-by: Andrew Sy Kim <[email protected]>
  • Loading branch information
andrewsykim authored Oct 16, 2024
1 parent 714aea6 commit 55a6688
Show file tree
Hide file tree
Showing 8 changed files with 27 additions and 31 deletions.
2 changes: 1 addition & 1 deletion docs/reference/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ _Appears in:_
| `entrypoint` _string_ | INSERT ADDITIONAL SPEC FIELDS - desired state of cluster<br />Important: Run "make" to regenerate code after modifying this file | | |
| `runtimeEnvYAML` _string_ | RuntimeEnvYAML represents the runtime environment configuration<br />provided as a multi-line YAML string. | | |
| `jobId` _string_ | If jobId is not set, a new jobId will be auto-generated. | | |
| `submissionMode` _[JobSubmissionMode](#jobsubmissionmode)_ | SubmissionMode specifies how RayJob submits the Ray job to the RayCluster.<br />In "K8sJobMode", the KubeRay operator creates a submitter Kubernetes Job to submit the Ray job.<br />In "HTTPMode", the KubeRay operator sends a request to the RayCluster to create a Ray job. | K8sJobMode | |
| `submissionMode` _[JobSubmissionMode](#jobsubmissionmode)_ | SubmissionMode specifies how RayJob submits the Ray job to the RayCluster.<br />In "K8sJobMode", the KubeRay operator creates a submitter Kubernetes Job to submit the Ray job.<br />In "HTTPMode", the KubeRay operator sends a request to the RayCluster to create a Ray job.<br />In "InteractiveMode", the KubeRay operator waits for a user to submit a job to the Ray cluster. | K8sJobMode | |
| `entrypointResources` _string_ | EntrypointResources specifies the custom resources and quantities to reserve for the<br />entrypoint command. | | |
| `entrypointNumCpus` _float_ | EntrypointNumCpus specifies the number of cpus to reserve for the entrypoint command. | | |
| `entrypointNumGpus` _float_ | EntrypointNumGpus specifies the number of gpus to reserve for the entrypoint command. | | |
Expand Down
5 changes: 2 additions & 3 deletions kubectl-plugin/pkg/cmd/job/job_submit.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,12 +198,11 @@ func (options *SubmitJobOptions) Validate() error {
return fmt.Errorf("RayJob does not have `submissionMode` field set")
}
if submissionMode != nil {
// Currently using string since latest release does not have `rayv1api.UserMode` yet
if submissionMode != "UserMode" {
if submissionMode != "InteractiveMode" {
return fmt.Errorf("Submission mode of the Ray Job is not supported")
}
} else {
return fmt.Errorf("Submission mode must be set to rayv1api.UserMode or `UserMode`")
return fmt.Errorf("Submission mode must be set to 'InteractiveMode'")
}

runtimeEnvYaml, ok := options.RayJob.Object["spec"].(map[string]interface{})["runtimeEnvYAML"].(string)
Expand Down
6 changes: 3 additions & 3 deletions kubectl-plugin/pkg/cmd/job/job_submit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ kind: RayJob
metadata:
name: rayjob-sample
spec:
submissionMode: 'UserMode'`
submissionMode: 'InteractiveMode'`

rayJobYamlPath := filepath.Join(fakeDir, "rayjob-temp-*.yaml")

Expand Down Expand Up @@ -132,7 +132,7 @@ kind: RayJob
metadata:
name: rayjob-sample
spec:
submissionMode: 'UserMode'`
submissionMode: 'InteractiveMode'`
_, err = rayjobtmpfile.Write([]byte(rayYaml))
assert.Nil(t, err)

Expand All @@ -145,7 +145,7 @@ spec:

submissionMode, ok := rayJobYamlActual.Object["spec"].(map[string]interface{})["submissionMode"]
assert.True(t, ok)
assert.Equal(t, "UserMode", submissionMode)
assert.Equal(t, "InteractiveMode", submissionMode)
}

func TestRuntimeEnvHasWorkingDir(t *testing.T) {
Expand Down
7 changes: 4 additions & 3 deletions ray-operator/apis/ray/v1/rayjob_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,9 @@ const (
type JobSubmissionMode string

const (
K8sJobMode JobSubmissionMode = "K8sJobMode" // Submit job via Kubernetes Job
HTTPMode JobSubmissionMode = "HTTPMode" // Submit job via HTTP request
UserMode JobSubmissionMode = "UserMode" // Don't submit job in KubeRay. Instead, wait for user to submit job and provide the job submission ID
K8sJobMode JobSubmissionMode = "K8sJobMode" // Submit job via Kubernetes Job
HTTPMode JobSubmissionMode = "HTTPMode" // Submit job via HTTP request
InteractiveMode JobSubmissionMode = "InteractiveMode" // Don't submit job in KubeRay. Instead, wait for user to submit job and provide the job submission ID.
)

type SubmitterConfig struct {
Expand Down Expand Up @@ -97,6 +97,7 @@ type RayJobSpec struct {
// SubmissionMode specifies how RayJob submits the Ray job to the RayCluster.
// In "K8sJobMode", the KubeRay operator creates a submitter Kubernetes Job to submit the Ray job.
// In "HTTPMode", the KubeRay operator sends a request to the RayCluster to create a Ray job.
// In "InteractiveMode", the KubeRay operator waits for a user to submit a job to the Ray cluster.
// +kubebuilder:default:=K8sJobMode
SubmissionMode JobSubmissionMode `json:"submissionMode,omitempty"`
// EntrypointResources specifies the custom resources and quantities to reserve for the
Expand Down
21 changes: 10 additions & 11 deletions ray-operator/controllers/ray/rayjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,8 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
rayJobInstance.Status.DashboardURL = clientURL
}

if rayJobInstance.Spec.SubmissionMode == rayv1.UserMode {
logger.Info("SubmissionMode is UserMode and the RayCluster is created. Transition the status from `Initializing` to `Waiting`.")
if rayJobInstance.Spec.SubmissionMode == rayv1.InteractiveMode {
logger.Info("SubmissionMode is InteractiveMode and the RayCluster is created. Transition the status from `Initializing` to `Waiting`.")
rayJobInstance.Status.JobDeploymentStatus = rayv1.JobDeploymentStatusWaiting
break
}
Expand All @@ -202,13 +202,12 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
"RayJob", rayJobInstance.Name, "RayCluster", rayJobInstance.Status.RayClusterName)
rayJobInstance.Status.JobDeploymentStatus = rayv1.JobDeploymentStatusRunning
case rayv1.JobDeploymentStatusWaiting:
// Try to get the Ray job id from the Ray job annotations.
rayJobId, found := rayJobInstance.ObjectMeta.Annotations[utils.RayJobSubmissionIdLabelKey]
logger.Info("Get Ray job id from the Ray job annotations", "RayJobId", rayJobId, "Found", found)
if !found {
// Try to get the Ray job id from rayJob.Spec.JobId
if rayJobInstance.Spec.JobId == "" {
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, nil
}
rayJobInstance.Status.JobId = rayJobId

rayJobInstance.Status.JobId = rayJobInstance.Spec.JobId
rayJobInstance.Status.JobDeploymentStatus = rayv1.JobDeploymentStatusRunning
case rayv1.JobDeploymentStatusRunning:
if shouldUpdate := r.updateStatusToSuspendingIfNeeded(ctx, rayJobInstance); shouldUpdate {
Expand Down Expand Up @@ -634,7 +633,7 @@ func (r *RayJobReconciler) SetupWithManager(mgr ctrl.Manager, reconcileConcurren
// This function is the sole place where `JobDeploymentStatusInitializing` is defined. It initializes `Status.JobId` and `Status.RayClusterName`
// prior to job submissions and RayCluster creations. This is used to avoid duplicate job submissions and cluster creations. In addition, this
// function also sets `Status.StartTime` to support `ActiveDeadlineSeconds`.
// This function will set or generate JobId if SubmissionMode is not UserMode.
// This function will set or generate JobId if SubmissionMode is not InteractiveMode.
func (r *RayJobReconciler) initRayJobStatusIfNeed(ctx context.Context, rayJob *rayv1.RayJob) error {
logger := ctrl.LoggerFrom(ctx)
shouldUpdateStatus := rayJob.Status.JobId == "" || rayJob.Status.RayClusterName == "" || rayJob.Status.JobStatus == ""
Expand All @@ -644,7 +643,7 @@ func (r *RayJobReconciler) initRayJobStatusIfNeed(ctx context.Context, rayJob *r
return nil
}

if rayJob.Spec.SubmissionMode != rayv1.UserMode && rayJob.Status.JobId == "" {
if rayJob.Spec.SubmissionMode != rayv1.InteractiveMode && rayJob.Status.JobId == "" {
if rayJob.Spec.JobId != "" {
rayJob.Status.JobId = rayJob.Spec.JobId
} else {
Expand Down Expand Up @@ -843,8 +842,8 @@ func validateRayJobSpec(rayJob *rayv1.RayJob) error {
}

func validateRayJobStatus(rayJob *rayv1.RayJob) error {
if rayJob.Status.JobDeploymentStatus == rayv1.JobDeploymentStatusWaiting && rayJob.Spec.SubmissionMode != rayv1.UserMode {
return fmt.Errorf("invalid RayJob State: JobDeploymentStatus cannot be `Waiting` when SubmissionMode is not UserMode")
if rayJob.Status.JobDeploymentStatus == rayv1.JobDeploymentStatusWaiting && rayJob.Spec.SubmissionMode != rayv1.InteractiveMode {
return fmt.Errorf("invalid RayJob State: JobDeploymentStatus cannot be `Waiting` when SubmissionMode is not InteractiveMode")
}

return nil
Expand Down
10 changes: 5 additions & 5 deletions ray-operator/controllers/ray/rayjob_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -752,17 +752,17 @@ var _ = Context("RayJob with different submission modes", func() {
})
})

Context("RayJob in UserMode", func() {
Context("RayJob in InteractiveMode", func() {
Describe("Successful RayJob", Ordered, func() {
ctx := context.Background()
namespace := "default"
rayJob := rayJobTemplate("rayjob-test-none-mode", namespace)
rayJob.Spec.SubmissionMode = rayv1.UserMode
rayJob.Spec.SubmissionMode = rayv1.InteractiveMode
rayCluster := &rayv1.RayCluster{}
testRayJobId := "fake-id"

It("Verify RayJob spec", func() {
Expect(rayJob.Spec.SubmissionMode).To(Equal(rayv1.UserMode))
Expect(rayJob.Spec.SubmissionMode).To(Equal(rayv1.InteractiveMode))
Expect(rayJob.Spec.ShutdownAfterJobFinishes).To(BeTrue())
Expect(rayJob.Spec.RayClusterSpec.WorkerGroupSpecs).To(HaveLen(1))
})
Expand Down Expand Up @@ -818,8 +818,8 @@ var _ = Context("RayJob with different submission modes", func() {
time.Second*3, time.Millisecond*500).Should(Equal(rayv1.JobDeploymentStatusWaiting), "JobDeploymentStatus = %v", rayJob.Status.JobDeploymentStatus)
})

It("sets annotation to RayJob", func() {
err := setAnnotationOnRayJob(ctx, rayJob, utils.RayJobSubmissionIdLabelKey, testRayJobId)
It("sets jobId in RayJob", func() {
err := setJobIdOnRayJob(ctx, rayJob, testRayJobId)
Expect(err).NotTo(HaveOccurred())
})

Expand Down
6 changes: 2 additions & 4 deletions ray-operator/controllers/ray/suite_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ import (
"reflect"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/ray-project/kuberay/ray-operator/controllers/ray/common"

"github.com/onsi/gomega"
Expand Down Expand Up @@ -306,13 +304,13 @@ func updateRayJobSuspendField(ctx context.Context, rayJob *rayv1.RayJob, suspend
})
}

func setAnnotationOnRayJob(ctx context.Context, rayJob *rayv1.RayJob, key, value string) error {
func setJobIdOnRayJob(ctx context.Context, rayJob *rayv1.RayJob, jobId string) error {
return retry.RetryOnConflict(retry.DefaultRetry, func() error {
err := k8sClient.Get(ctx, client.ObjectKey{Namespace: rayJob.Namespace, Name: rayJob.Name}, rayJob)
if err != nil {
return err
}
metav1.SetMetaDataAnnotation(&rayJob.ObjectMeta, key, value)
rayJob.Spec.JobId = jobId
return k8sClient.Update(ctx, rayJob)
})
}
1 change: 0 additions & 1 deletion ray-operator/controllers/ray/utils/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ const (
HashWithoutReplicasAndWorkersToDeleteKey = "ray.io/hash-without-replicas-and-workers-to-delete"
NumWorkerGroupsKey = "ray.io/num-worker-groups"
KubeRayVersion = "ray.io/kuberay-version"
RayJobSubmissionIdLabelKey = "ray.io/ray-job-submission-id"

// In KubeRay, the Ray container must be the first application container in a head or worker Pod.
RayContainerIndex = 0
Expand Down

0 comments on commit 55a6688

Please sign in to comment.