diff --git a/docs/reference/api.md b/docs/reference/api.md
index fd4145e713..57c83118b2 100644
--- a/docs/reference/api.md
+++ b/docs/reference/api.md
@@ -174,7 +174,7 @@ _Appears in:_
| `clusterSelector` _object (keys:string, values:string)_ | clusterSelector is used to select running rayclusters by labels | | |
| `submitterConfig` _[SubmitterConfig](#submitterconfig)_ | Configurations of submitter k8s job. | | |
| `managedBy` _string_ | ManagedBy is an optional configuration for the controller or entity that manages a RayJob.
The value must be either 'ray.io/kuberay-operator' or 'kueue.x-k8s.io/multikueue'.
The kuberay-operator reconciles a RayJob which doesn't have this field at all or
the field value is the reserved string 'ray.io/kuberay-operator',
but delegates reconciling the RayJob with 'kueue.x-k8s.io/multikueue' to the Kueue.
The field is immutable. | | |
-| `deletionPolicy` _[DeletionPolicy](#deletionpolicy)_ | deletionPolicy indicates what resources of the RayJob are deleted upon job completion.
Valid values are 'DeleteCluster', 'DeleteWorkers', 'DeleteSelf' or 'None'.
If unset, deletion policy is based on 'spec.shutdownAfterJobFinishes'.
This field requires the RayJobDeletionPolicy feature gate to be enabled. | | |
+| `deletionPolicy` _[DeletionPolicy](#deletionpolicy)_ | deletionPolicy indicates what resources of the RayJob are deleted upon job completion.
Valid values are 'DeleteCluster', 'DeleteWorkers', 'DeleteSelf' or 'DeleteNone'.
If unset, deletion policy is based on 'spec.shutdownAfterJobFinishes'.
This field requires the RayJobDeletionPolicy feature gate to be enabled. | | |
| `entrypoint` _string_ | INSERT ADDITIONAL SPEC FIELDS - desired state of cluster
Important: Run "make" to regenerate code after modifying this file | | |
| `runtimeEnvYAML` _string_ | RuntimeEnvYAML represents the runtime environment configuration
provided as a multi-line YAML string. | | |
| `jobId` _string_ | If jobId is not set, a new jobId will be auto-generated. | | |
diff --git a/ray-operator/apis/ray/v1/rayjob_types.go b/ray-operator/apis/ray/v1/rayjob_types.go
index 3ec550c83e..dbdf1aae89 100644
--- a/ray-operator/apis/ray/v1/rayjob_types.go
+++ b/ray-operator/apis/ray/v1/rayjob_types.go
@@ -67,8 +67,8 @@ type DeletionPolicy string
const (
DeleteClusterDeletionPolicy DeletionPolicy = "DeleteCluster"
DeleteWorkersDeletionPolicy DeletionPolicy = "DeleteWorkers"
- DeleteSelfDeltionPolicy DeletionPolicy = "DeleteSelf"
- DeleteNoneDeletionPolicy DeletionPolicy = "None"
+ DeleteSelfDeletionPolicy DeletionPolicy = "DeleteSelf"
+ DeleteNoneDeletionPolicy DeletionPolicy = "DeleteNone"
)
type SubmitterConfig struct {
@@ -105,7 +105,7 @@ type RayJobSpec struct {
// +kubebuilder:validation:XValidation:rule="self in ['ray.io/kuberay-operator', 'kueue.x-k8s.io/multikueue']",message="the managedBy field value must be either 'ray.io/kuberay-operator' or 'kueue.x-k8s.io/multikueue'"
ManagedBy *string `json:"managedBy,omitempty"`
// deletionPolicy indicates what resources of the RayJob are deleted upon job completion.
- // Valid values are 'DeleteCluster', 'DeleteWorkers', 'DeleteSelf' or 'None'.
+ // Valid values are 'DeleteCluster', 'DeleteWorkers', 'DeleteSelf' or 'DeleteNone'.
// If unset, deletion policy is based on 'spec.shutdownAfterJobFinishes'.
// This field requires the RayJobDeletionPolicy feature gate to be enabled.
// +kubebuilder:validation:XValidation:rule="self in ['DeleteCluster', 'DeleteWorkers', 'DeleteSelf', 'DeleteNone']",message="the deletionPolicy field value must be either 'DeleteCluster', 'DeleteWorkers', 'DeleteSelf', or 'DeleteNone'"
diff --git a/ray-operator/controllers/ray/rayjob_controller.go b/ray-operator/controllers/ray/rayjob_controller.go
index a1f4c9a5b4..db5adef490 100644
--- a/ray-operator/controllers/ray/rayjob_controller.go
+++ b/ray-operator/controllers/ray/rayjob_controller.go
@@ -377,7 +377,7 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
case rayv1.DeleteWorkersDeletionPolicy:
logger.Info("Scaling all worker replicas to 0", "RayCluster", rayJobInstance.Status.RayClusterName)
_, err = r.scaleWorkerReplicasToZero(ctx, rayJobInstance)
- case rayv1.DeleteSelfDeltionPolicy:
+ case rayv1.DeleteSelfDeletionPolicy:
logger.Info("Deleting RayJob")
err = r.Client.Delete(ctx, rayJobInstance)
default:
diff --git a/ray-operator/controllers/ray/rayjob_controller_test.go b/ray-operator/controllers/ray/rayjob_controller_test.go
index 856d8e8a7f..2cd28ef3ef 100644
--- a/ray-operator/controllers/ray/rayjob_controller_test.go
+++ b/ray-operator/controllers/ray/rayjob_controller_test.go
@@ -30,6 +30,7 @@ import (
rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
"github.com/ray-project/kuberay/ray-operator/controllers/ray/common"
"github.com/ray-project/kuberay/ray-operator/controllers/ray/utils"
+ "github.com/ray-project/kuberay/ray-operator/pkg/features"
"github.com/ray-project/kuberay/ray-operator/test/support"
batchv1 "k8s.io/api/batch/v1"
@@ -835,4 +836,514 @@ var _ = Context("RayJob with different submission modes", func() {
})
})
})
+
+ Describe("RayJob with DeletionPolicy=DeleteCluster", Ordered, func() {
+ features.SetFeatureGateDuringTest(GinkgoTB(), features.RayJobDeletionPolicy, true)
+
+ ctx := context.Background()
+ namespace := "default"
+ rayJob := rayJobTemplate("rayjob-test-deletionpolicy-deletecluster", namespace)
+ deletionPolicy := rayv1.DeleteClusterDeletionPolicy
+ rayJob.Spec.DeletionPolicy = &deletionPolicy
+ rayJob.Spec.ShutdownAfterJobFinishes = false
+ rayCluster := &rayv1.RayCluster{}
+
+ It("Verify RayJob spec", func() {
+ Expect(*rayJob.Spec.DeletionPolicy).To(Equal(rayv1.DeleteClusterDeletionPolicy))
+ })
+
+ It("Create a RayJob custom resource", func() {
+ err := k8sClient.Create(ctx, rayJob)
+ Expect(err).NotTo(HaveOccurred(), "Failed to create RayJob")
+ Eventually(
+ getResourceFunc(ctx, client.ObjectKey{Name: rayJob.Name, Namespace: namespace}, rayJob),
+ time.Second*3, time.Millisecond*500).Should(BeNil(), "Should be able to see RayJob: %v", rayJob.Name)
+ })
+
+ It("RayJobs's JobDeploymentStatus transitions from New to Initializing.", func() {
+ Eventually(
+ getRayJobDeploymentStatus(ctx, rayJob),
+ time.Second*3, time.Millisecond*500).Should(Equal(rayv1.JobDeploymentStatusInitializing), "JobDeploymentStatus = %v", rayJob.Status.JobDeploymentStatus)
+
+ // In Initializing state, Status.RayClusterName, Status.JobId, and Status.StartTime must be set.
+ Expect(rayJob.Status.RayClusterName).NotTo(BeEmpty())
+ Expect(rayJob.Status.JobId).NotTo(BeEmpty())
+ Expect(rayJob.Status.StartTime).NotTo(BeNil())
+ })
+
+ It("In Initializing state, the RayCluster should eventually be created.", func() {
+ Eventually(
+ getResourceFunc(ctx, client.ObjectKey{Name: rayJob.Status.RayClusterName, Namespace: namespace}, rayCluster),
+ time.Second*3, time.Millisecond*500).Should(BeNil(), "RayCluster %v not found", rayJob.Status.RayClusterName)
+
+ // Check whether RayCluster is consistent with RayJob's RayClusterSpec.
+ Expect(rayCluster.Spec.WorkerGroupSpecs[0].Replicas).To(Equal(rayJob.Spec.RayClusterSpec.WorkerGroupSpecs[0].Replicas))
+ Expect(rayCluster.Spec.RayVersion).To(Equal(rayJob.Spec.RayClusterSpec.RayVersion))
+
+ // TODO (kevin85421): Check the RayCluster labels and annotations.
+ Expect(rayCluster.Labels).Should(HaveKeyWithValue(utils.RayOriginatedFromCRNameLabelKey, rayJob.Name))
+ Expect(rayCluster.Labels).Should(HaveKeyWithValue(utils.RayOriginatedFromCRDLabelKey, utils.RayOriginatedFromCRDLabelValue(utils.RayJobCRD)))
+ })
+
+ It("Make RayCluster.Status.State to be rayv1.Ready", func() {
+ // The RayCluster is not 'Ready' yet because Pods are not running and ready.
+ Expect(rayCluster.Status.State).NotTo(Equal(rayv1.Ready)) //nolint:staticcheck // https://github.com/ray-project/kuberay/pull/2288
+
+ updateHeadPodToRunningAndReady(ctx, rayJob.Status.RayClusterName, namespace)
+ updateWorkerPodsToRunningAndReady(ctx, rayJob.Status.RayClusterName, namespace)
+
+ // The RayCluster.Status.State should be Ready.
+ Eventually(
+ getClusterState(ctx, namespace, rayCluster.Name),
+ time.Second*3, time.Millisecond*500).Should(Equal(rayv1.Ready))
+ })
+
+ It("RayJobs's JobDeploymentStatus transitions from Initializing to Running.", func() {
+ Eventually(
+ getRayJobDeploymentStatus(ctx, rayJob),
+ time.Second*3, time.Millisecond*500).Should(Equal(rayv1.JobDeploymentStatusRunning), "JobDeploymentStatus = %v", rayJob.Status.JobDeploymentStatus)
+
+ // In Running state, the RayJob's Status.DashboardURL must be set.
+ Expect(rayJob.Status.DashboardURL).NotTo(BeEmpty())
+
+ // In Running state, the submitter Kubernetes Job must be created if this RayJob is in K8sJobMode.
+ namespacedName := common.RayJobK8sJobNamespacedName(rayJob)
+ job := &batchv1.Job{}
+ err := k8sClient.Get(ctx, namespacedName, job)
+ Expect(err).NotTo(HaveOccurred(), "failed to get Kubernetes Job")
+ })
+
+ It("RayJobs's JobDeploymentStatus transitions from Running to Complete.", func() {
+ // Update fake dashboard client to return job info with "Succeeded" status.
+ getJobInfo := func(context.Context, string) (*utils.RayJobInfo, error) { //nolint:unparam // This is a mock function so parameters are required
+ return &utils.RayJobInfo{JobStatus: rayv1.JobStatusSucceeded}, nil
+ }
+ fakeRayDashboardClient.GetJobInfoMock.Store(&getJobInfo)
+ defer fakeRayDashboardClient.GetJobInfoMock.Store(nil)
+
+ // RayJob transitions to Complete if and only if the corresponding submitter Kubernetes Job is Complete or Failed.
+ Consistently(
+ getRayJobDeploymentStatus(ctx, rayJob),
+ time.Second*3, time.Millisecond*500).Should(Equal(rayv1.JobDeploymentStatusRunning), "JobDeploymentStatus = %v", rayJob.Status.JobDeploymentStatus)
+
+ // Update the submitter Kubernetes Job to Complete.
+ namespacedName := common.RayJobK8sJobNamespacedName(rayJob)
+ job := &batchv1.Job{}
+ err := k8sClient.Get(ctx, namespacedName, job)
+ Expect(err).NotTo(HaveOccurred(), "failed to get Kubernetes Job")
+
+ // Update the submitter Kubernetes Job to Complete.
+ conditions := []batchv1.JobCondition{
+ {Type: batchv1.JobComplete, Status: corev1.ConditionTrue},
+ }
+ job.Status.Conditions = conditions
+ Expect(k8sClient.Status().Update(ctx, job)).Should(Succeed())
+
+ // RayJob transitions to Complete.
+ Eventually(
+ getRayJobDeploymentStatus(ctx, rayJob),
+ time.Second*5, time.Millisecond*500).Should(Equal(rayv1.JobDeploymentStatusComplete), "jobDeploymentStatus = %v", rayJob.Status.JobDeploymentStatus)
+ })
+
+ It("If DeletionPolicy=DeleteCluster, RayCluster should be deleted, but not the submitter Job.", func() {
+ Eventually(
+ func() bool {
+ return apierrors.IsNotFound(getResourceFunc(ctx, client.ObjectKey{Name: rayJob.Status.RayClusterName, Namespace: namespace}, rayCluster)())
+ },
+ time.Second*3, time.Millisecond*500).Should(BeTrue())
+ namespacedName := common.RayJobK8sJobNamespacedName(rayJob)
+ job := &batchv1.Job{}
+ Consistently(
+ getResourceFunc(ctx, namespacedName, job),
+ time.Second*3, time.Millisecond*500).Should(BeNil())
+ })
+ })
+
+ Describe("RayJob with DeletionPolicy=DeleteWorkers", Ordered, func() {
+ features.SetFeatureGateDuringTest(GinkgoTB(), features.RayJobDeletionPolicy, true)
+
+ ctx := context.Background()
+ namespace := "default"
+ rayJob := rayJobTemplate("rayjob-test-deletionpolicy-deleteworkers", namespace)
+ deletionPolicy := rayv1.DeleteWorkersDeletionPolicy
+ rayJob.Spec.DeletionPolicy = &deletionPolicy
+ rayJob.Spec.ShutdownAfterJobFinishes = false
+ rayCluster := &rayv1.RayCluster{}
+
+ It("Verify RayJob spec", func() {
+ Expect(*rayJob.Spec.DeletionPolicy).To(Equal(rayv1.DeleteWorkersDeletionPolicy))
+ })
+
+ It("Create a RayJob custom resource", func() {
+ err := k8sClient.Create(ctx, rayJob)
+ Expect(err).NotTo(HaveOccurred(), "Failed to create RayJob")
+ Eventually(
+ getResourceFunc(ctx, client.ObjectKey{Name: rayJob.Name, Namespace: namespace}, rayJob),
+ time.Second*3, time.Millisecond*500).Should(BeNil(), "Should be able to see RayJob: %v", rayJob.Name)
+ })
+
+ It("RayJobs's JobDeploymentStatus transitions from New to Initializing.", func() {
+ Eventually(
+ getRayJobDeploymentStatus(ctx, rayJob),
+ time.Second*3, time.Millisecond*500).Should(Equal(rayv1.JobDeploymentStatusInitializing), "JobDeploymentStatus = %v", rayJob.Status.JobDeploymentStatus)
+
+ // In Initializing state, Status.RayClusterName, Status.JobId, and Status.StartTime must be set.
+ Expect(rayJob.Status.RayClusterName).NotTo(BeEmpty())
+ Expect(rayJob.Status.JobId).NotTo(BeEmpty())
+ Expect(rayJob.Status.StartTime).NotTo(BeNil())
+ })
+
+ It("In Initializing state, the RayCluster should eventually be created.", func() {
+ Eventually(
+ getResourceFunc(ctx, client.ObjectKey{Name: rayJob.Status.RayClusterName, Namespace: namespace}, rayCluster),
+ time.Second*3, time.Millisecond*500).Should(BeNil(), "RayCluster %v not found", rayJob.Status.RayClusterName)
+
+ // Check whether RayCluster is consistent with RayJob's RayClusterSpec.
+ Expect(rayCluster.Spec.WorkerGroupSpecs[0].Replicas).To(Equal(rayJob.Spec.RayClusterSpec.WorkerGroupSpecs[0].Replicas))
+ Expect(rayCluster.Spec.RayVersion).To(Equal(rayJob.Spec.RayClusterSpec.RayVersion))
+
+ // TODO (kevin85421): Check the RayCluster labels and annotations.
+ Expect(rayCluster.Labels).Should(HaveKeyWithValue(utils.RayOriginatedFromCRNameLabelKey, rayJob.Name))
+ Expect(rayCluster.Labels).Should(HaveKeyWithValue(utils.RayOriginatedFromCRDLabelKey, utils.RayOriginatedFromCRDLabelValue(utils.RayJobCRD)))
+ })
+
+ It("Make RayCluster.Status.State to be rayv1.Ready", func() {
+ // The RayCluster is not 'Ready' yet because Pods are not running and ready.
+ Expect(rayCluster.Status.State).NotTo(Equal(rayv1.Ready)) //nolint:staticcheck // https://github.com/ray-project/kuberay/pull/2288
+
+ updateHeadPodToRunningAndReady(ctx, rayJob.Status.RayClusterName, namespace)
+ updateWorkerPodsToRunningAndReady(ctx, rayJob.Status.RayClusterName, namespace)
+
+ // The RayCluster.Status.State should be Ready.
+ Eventually(
+ getClusterState(ctx, namespace, rayCluster.Name),
+ time.Second*3, time.Millisecond*500).Should(Equal(rayv1.Ready))
+ })
+
+ It("RayJobs's JobDeploymentStatus transitions from Initializing to Running.", func() {
+ Eventually(
+ getRayJobDeploymentStatus(ctx, rayJob),
+ time.Second*3, time.Millisecond*500).Should(Equal(rayv1.JobDeploymentStatusRunning), "JobDeploymentStatus = %v", rayJob.Status.JobDeploymentStatus)
+
+ // In Running state, the RayJob's Status.DashboardURL must be set.
+ Expect(rayJob.Status.DashboardURL).NotTo(BeEmpty())
+
+ // In Running state, the submitter Kubernetes Job must be created if this RayJob is in K8sJobMode.
+ namespacedName := common.RayJobK8sJobNamespacedName(rayJob)
+ job := &batchv1.Job{}
+ err := k8sClient.Get(ctx, namespacedName, job)
+ Expect(err).NotTo(HaveOccurred(), "failed to get Kubernetes Job")
+ })
+
+ It("RayJobs's JobDeploymentStatus transitions from Running to Complete.", func() {
+ // Update fake dashboard client to return job info with "Succeeded" status.
+ getJobInfo := func(context.Context, string) (*utils.RayJobInfo, error) { //nolint:unparam // This is a mock function so parameters are required
+ return &utils.RayJobInfo{JobStatus: rayv1.JobStatusSucceeded}, nil
+ }
+ fakeRayDashboardClient.GetJobInfoMock.Store(&getJobInfo)
+ defer fakeRayDashboardClient.GetJobInfoMock.Store(nil)
+
+ // RayJob transitions to Complete if and only if the corresponding submitter Kubernetes Job is Complete or Failed.
+ Consistently(
+ getRayJobDeploymentStatus(ctx, rayJob),
+ time.Second*3, time.Millisecond*500).Should(Equal(rayv1.JobDeploymentStatusRunning), "JobDeploymentStatus = %v", rayJob.Status.JobDeploymentStatus)
+
+ // Update the submitter Kubernetes Job to Complete.
+ namespacedName := common.RayJobK8sJobNamespacedName(rayJob)
+ job := &batchv1.Job{}
+ err := k8sClient.Get(ctx, namespacedName, job)
+ Expect(err).NotTo(HaveOccurred(), "failed to get Kubernetes Job")
+
+ // Update the submitter Kubernetes Job to Complete.
+ conditions := []batchv1.JobCondition{
+ {Type: batchv1.JobComplete, Status: corev1.ConditionTrue},
+ }
+ job.Status.Conditions = conditions
+ Expect(k8sClient.Status().Update(ctx, job)).Should(Succeed())
+
+ // RayJob transitions to Complete.
+ Eventually(
+ getRayJobDeploymentStatus(ctx, rayJob),
+ time.Second*5, time.Millisecond*500).Should(Equal(rayv1.JobDeploymentStatusComplete), "jobDeploymentStatus = %v", rayJob.Status.JobDeploymentStatus)
+ })
+
+ It("If DeletionPolicy=DeleteWorkers, all workers should be deleted, but not the Head pod and submitter Job", func() {
+ // RayCluster exists
+ Consistently(
+ getResourceFunc(ctx, client.ObjectKey{Name: rayJob.Status.RayClusterName, Namespace: namespace}, rayCluster),
+ time.Second*3, time.Millisecond*500).Should(BeNil(), "RayCluster %v not found", rayJob.Status.RayClusterName)
+
+ // Worker replicas set to 0
+ Expect(*rayCluster.Spec.WorkerGroupSpecs[0].MinReplicas).To(Equal(int32(0)))
+ Expect(*rayCluster.Spec.WorkerGroupSpecs[0].Replicas).To(Equal(int32(0)))
+
+ // 0 worker Pods exist
+ workerPods := corev1.PodList{}
+ workerLabels := common.RayClusterWorkerPodsAssociationOptions(rayCluster).ToListOptions()
+ Eventually(
+ listResourceFunc(ctx, &workerPods, workerLabels...),
+ time.Second*3, time.Millisecond*500).Should(Equal(0), "expected 0 workers")
+
+ // Head Pod is still running
+ headPods := corev1.PodList{}
+ headLabels := common.RayClusterHeadPodsAssociationOptions(rayCluster).ToListOptions()
+ Consistently(
+ listResourceFunc(ctx, &headPods, headLabels...),
+ time.Second*3, time.Millisecond*500).Should(Equal(1), "Head pod list should have only 1 Pod = %v", headPods.Items)
+
+ namespacedName := common.RayJobK8sJobNamespacedName(rayJob)
+ job := &batchv1.Job{}
+ Consistently(
+ getResourceFunc(ctx, namespacedName, job),
+ time.Second*3, time.Millisecond*500).Should(BeNil())
+ })
+ })
+
+ Describe("RayJob with DeletionPolicy=DeleteSelf", Ordered, func() {
+ ctx := context.Background()
+ namespace := "default"
+ rayJob := rayJobTemplate("rayjob-test-deleteself", namespace)
+ deletionPolicy := rayv1.DeleteSelfDeletionPolicy
+ rayJob.Spec.DeletionPolicy = &deletionPolicy
+ rayJob.Spec.ShutdownAfterJobFinishes = false
+ rayCluster := &rayv1.RayCluster{}
+
+ It("Create a RayJob custom resource", func() {
+ err := k8sClient.Create(ctx, rayJob)
+ Expect(err).NotTo(HaveOccurred(), "Failed to create RayJob")
+ Eventually(
+ getResourceFunc(ctx, client.ObjectKey{Name: rayJob.Name, Namespace: namespace}, rayJob),
+ time.Second*3, time.Millisecond*500).Should(BeNil(), "Should be able to see RayJob: %v", rayJob.Name)
+ })
+
+ It("RayJobs's JobDeploymentStatus transitions from New to Initializing.", func() {
+ Eventually(
+ getRayJobDeploymentStatus(ctx, rayJob),
+ time.Second*3, time.Millisecond*500).Should(Equal(rayv1.JobDeploymentStatusInitializing), "JobDeploymentStatus = %v", rayJob.Status.JobDeploymentStatus)
+
+ // In Initializing state, Status.RayClusterName, Status.JobId, and Status.StartTime must be set.
+ Expect(rayJob.Status.RayClusterName).NotTo(BeEmpty())
+ Expect(rayJob.Status.JobId).NotTo(BeEmpty())
+ Expect(rayJob.Status.StartTime).NotTo(BeNil())
+ })
+
+ It("In Initializing state, the RayCluster should eventually be created.", func() {
+ Eventually(
+ getResourceFunc(ctx, client.ObjectKey{Name: rayJob.Status.RayClusterName, Namespace: namespace}, rayCluster),
+ time.Second*3, time.Millisecond*500).Should(BeNil(), "RayCluster %v not found", rayJob.Status.RayClusterName)
+
+ // Check whether RayCluster is consistent with RayJob's RayClusterSpec.
+ Expect(rayCluster.Spec.WorkerGroupSpecs[0].Replicas).To(Equal(rayJob.Spec.RayClusterSpec.WorkerGroupSpecs[0].Replicas))
+ Expect(rayCluster.Spec.RayVersion).To(Equal(rayJob.Spec.RayClusterSpec.RayVersion))
+
+ // TODO (kevin85421): Check the RayCluster labels and annotations.
+ Expect(rayCluster.Labels).Should(HaveKeyWithValue(utils.RayOriginatedFromCRNameLabelKey, rayJob.Name))
+ Expect(rayCluster.Labels).Should(HaveKeyWithValue(utils.RayOriginatedFromCRDLabelKey, utils.RayOriginatedFromCRDLabelValue(utils.RayJobCRD)))
+ })
+
+ It("Make RayCluster.Status.State to be rayv1.Ready", func() {
+ // The RayCluster is not 'Ready' yet because Pods are not running and ready.
+ Expect(rayCluster.Status.State).NotTo(Equal(rayv1.Ready)) //nolint:staticcheck // https://github.com/ray-project/kuberay/pull/2288
+
+ updateHeadPodToRunningAndReady(ctx, rayJob.Status.RayClusterName, namespace)
+ updateWorkerPodsToRunningAndReady(ctx, rayJob.Status.RayClusterName, namespace)
+
+ // The RayCluster.Status.State should be Ready.
+ Eventually(
+ getClusterState(ctx, namespace, rayCluster.Name),
+ time.Second*3, time.Millisecond*500).Should(Equal(rayv1.Ready))
+ })
+
+ It("RayJobs's JobDeploymentStatus transitions from Initializing to Running.", func() {
+ Eventually(
+ getRayJobDeploymentStatus(ctx, rayJob),
+ time.Second*3, time.Millisecond*500).Should(Equal(rayv1.JobDeploymentStatusRunning), "JobDeploymentStatus = %v", rayJob.Status.JobDeploymentStatus)
+
+ // In Running state, the RayJob's Status.DashboardURL must be set.
+ Expect(rayJob.Status.DashboardURL).NotTo(BeEmpty())
+
+ // In Running state, the submitter Kubernetes Job must be created if this RayJob is in K8sJobMode.
+ namespacedName := common.RayJobK8sJobNamespacedName(rayJob)
+ job := &batchv1.Job{}
+ err := k8sClient.Get(ctx, namespacedName, job)
+ Expect(err).NotTo(HaveOccurred(), "failed to get Kubernetes Job")
+ })
+
+ It("RayJobs's JobDeploymentStatus transitions from Running to Complete.", func() {
+ // Update fake dashboard client to return job info with "Succeeded" status.
+ getJobInfo := func(context.Context, string) (*utils.RayJobInfo, error) { //nolint:unparam // This is a mock function so parameters are required
+ return &utils.RayJobInfo{JobStatus: rayv1.JobStatusSucceeded}, nil
+ }
+ fakeRayDashboardClient.GetJobInfoMock.Store(&getJobInfo)
+
+ // RayJob transitions to Complete if and only if the corresponding submitter Kubernetes Job is Complete or Failed.
+ Consistently(
+ getRayJobDeploymentStatus(ctx, rayJob),
+ time.Second*3, time.Millisecond*500).Should(Equal(rayv1.JobDeploymentStatusRunning), "JobDeploymentStatus = %v", rayJob.Status.JobDeploymentStatus)
+
+ // Update the submitter Kubernetes Job to Complete.
+ namespacedName := common.RayJobK8sJobNamespacedName(rayJob)
+ job := &batchv1.Job{}
+ err := k8sClient.Get(ctx, namespacedName, job)
+ Expect(err).NotTo(HaveOccurred(), "failed to get Kubernetes Job")
+
+ // Update the submitter Kubernetes Job to Complete.
+ conditions := []batchv1.JobCondition{
+ {Type: batchv1.JobComplete, Status: corev1.ConditionTrue},
+ }
+ job.Status.Conditions = conditions
+ Expect(k8sClient.Status().Update(ctx, job)).Should(Succeed())
+ })
+
+ It("If DeletionPolicy=DeleteSelf, the RayJob is deleted", func() {
+ Eventually(
+ func() bool {
+ return apierrors.IsNotFound(k8sClient.Get(ctx, client.ObjectKey{Name: rayJob.Name, Namespace: namespace}, rayJob))
+ }, time.Second*5, time.Millisecond*500).Should(BeTrue())
+ })
+ })
+
+ Describe("RayJob with DeletionPolicy=DeleteNone", Ordered, func() {
+ features.SetFeatureGateDuringTest(GinkgoTB(), features.RayJobDeletionPolicy, true)
+
+ ctx := context.Background()
+ namespace := "default"
+ rayJob := rayJobTemplate("rayjob-test-deletionpolicy-deletenone", namespace)
+ deletionPolicy := rayv1.DeleteNoneDeletionPolicy
+ rayJob.Spec.DeletionPolicy = &deletionPolicy
+ rayJob.Spec.ShutdownAfterJobFinishes = false
+ rayCluster := &rayv1.RayCluster{}
+
+ It("Verify RayJob spec", func() {
+ Expect(*rayJob.Spec.DeletionPolicy).To(Equal(rayv1.DeleteNoneDeletionPolicy))
+ })
+
+ It("Create a RayJob custom resource", func() {
+ err := k8sClient.Create(ctx, rayJob)
+ Expect(err).NotTo(HaveOccurred(), "Failed to create RayJob")
+ Eventually(
+ getResourceFunc(ctx, client.ObjectKey{Name: rayJob.Name, Namespace: namespace}, rayJob),
+ time.Second*3, time.Millisecond*500).Should(BeNil(), "Should be able to see RayJob: %v", rayJob.Name)
+ })
+
+ It("RayJobs's JobDeploymentStatus transitions from New to Initializing.", func() {
+ Eventually(
+ getRayJobDeploymentStatus(ctx, rayJob),
+ time.Second*3, time.Millisecond*500).Should(Equal(rayv1.JobDeploymentStatusInitializing), "JobDeploymentStatus = %v", rayJob.Status.JobDeploymentStatus)
+
+ // In Initializing state, Status.RayClusterName, Status.JobId, and Status.StartTime must be set.
+ Expect(rayJob.Status.RayClusterName).NotTo(BeEmpty())
+ Expect(rayJob.Status.JobId).NotTo(BeEmpty())
+ Expect(rayJob.Status.StartTime).NotTo(BeNil())
+ })
+
+ It("In Initializing state, the RayCluster should eventually be created.", func() {
+ Eventually(
+ getResourceFunc(ctx, client.ObjectKey{Name: rayJob.Status.RayClusterName, Namespace: namespace}, rayCluster),
+ time.Second*3, time.Millisecond*500).Should(BeNil(), "RayCluster %v not found", rayJob.Status.RayClusterName)
+
+ // Check whether RayCluster is consistent with RayJob's RayClusterSpec.
+ Expect(rayCluster.Spec.WorkerGroupSpecs[0].Replicas).To(Equal(rayJob.Spec.RayClusterSpec.WorkerGroupSpecs[0].Replicas))
+ Expect(rayCluster.Spec.RayVersion).To(Equal(rayJob.Spec.RayClusterSpec.RayVersion))
+
+ // TODO (kevin85421): Check the RayCluster labels and annotations.
+ Expect(rayCluster.Labels).Should(HaveKeyWithValue(utils.RayOriginatedFromCRNameLabelKey, rayJob.Name))
+ Expect(rayCluster.Labels).Should(HaveKeyWithValue(utils.RayOriginatedFromCRDLabelKey, utils.RayOriginatedFromCRDLabelValue(utils.RayJobCRD)))
+ })
+
+ It("Make RayCluster.Status.State to be rayv1.Ready", func() {
+ // The RayCluster is not 'Ready' yet because Pods are not running and ready.
+ Expect(rayCluster.Status.State).NotTo(Equal(rayv1.Ready)) //nolint:staticcheck // https://github.com/ray-project/kuberay/pull/2288
+
+ updateHeadPodToRunningAndReady(ctx, rayJob.Status.RayClusterName, namespace)
+ updateWorkerPodsToRunningAndReady(ctx, rayJob.Status.RayClusterName, namespace)
+
+ // The RayCluster.Status.State should be Ready.
+ Eventually(
+ getClusterState(ctx, namespace, rayCluster.Name),
+ time.Second*3, time.Millisecond*500).Should(Equal(rayv1.Ready))
+ })
+
+ It("RayJobs's JobDeploymentStatus transitions from Initializing to Running.", func() {
+ Eventually(
+ getRayJobDeploymentStatus(ctx, rayJob),
+ time.Second*3, time.Millisecond*500).Should(Equal(rayv1.JobDeploymentStatusRunning), "JobDeploymentStatus = %v", rayJob.Status.JobDeploymentStatus)
+
+ // In Running state, the RayJob's Status.DashboardURL must be set.
+ Expect(rayJob.Status.DashboardURL).NotTo(BeEmpty())
+
+ // In Running state, the submitter Kubernetes Job must be created if this RayJob is in K8sJobMode.
+ namespacedName := common.RayJobK8sJobNamespacedName(rayJob)
+ job := &batchv1.Job{}
+ err := k8sClient.Get(ctx, namespacedName, job)
+ Expect(err).NotTo(HaveOccurred(), "failed to get Kubernetes Job")
+ })
+
+ It("RayJobs's JobDeploymentStatus transitions from Running to Complete.", func() {
+ // Update fake dashboard client to return job info with "Succeeded" status.
+ getJobInfo := func(context.Context, string) (*utils.RayJobInfo, error) { //nolint:unparam // This is a mock function so parameters are required
+ return &utils.RayJobInfo{JobStatus: rayv1.JobStatusSucceeded}, nil
+ }
+ fakeRayDashboardClient.GetJobInfoMock.Store(&getJobInfo)
+ defer fakeRayDashboardClient.GetJobInfoMock.Store(nil)
+
+ // RayJob transitions to Complete if and only if the corresponding submitter Kubernetes Job is Complete or Failed.
+ Consistently(
+ getRayJobDeploymentStatus(ctx, rayJob),
+ time.Second*3, time.Millisecond*500).Should(Equal(rayv1.JobDeploymentStatusRunning), "JobDeploymentStatus = %v", rayJob.Status.JobDeploymentStatus)
+
+ // Update the submitter Kubernetes Job to Complete.
+ namespacedName := common.RayJobK8sJobNamespacedName(rayJob)
+ job := &batchv1.Job{}
+ err := k8sClient.Get(ctx, namespacedName, job)
+ Expect(err).NotTo(HaveOccurred(), "failed to get Kubernetes Job")
+
+ // Update the submitter Kubernetes Job to Complete.
+ conditions := []batchv1.JobCondition{
+ {Type: batchv1.JobComplete, Status: corev1.ConditionTrue},
+ }
+ job.Status.Conditions = conditions
+ Expect(k8sClient.Status().Update(ctx, job)).Should(Succeed())
+
+ // RayJob transitions to Complete.
+ Eventually(
+ getRayJobDeploymentStatus(ctx, rayJob),
+ time.Second*5, time.Millisecond*500).Should(Equal(rayv1.JobDeploymentStatusComplete), "jobDeploymentStatus = %v", rayJob.Status.JobDeploymentStatus)
+ })
+
+ It("If DeletionPolicy=DeleteNone, no resources are deleted", func() {
+ // RayJob exists
+ Consistently(
+ getResourceFunc(ctx, client.ObjectKey{Name: rayJob.Name, Namespace: namespace}, rayJob),
+ time.Second*3, time.Millisecond*500).Should(BeNil(), "RayJob %v not found", rayJob)
+
+ // RayCluster exists
+ Consistently(
+ getResourceFunc(ctx, client.ObjectKey{Name: rayJob.Status.RayClusterName, Namespace: namespace}, rayCluster),
+ time.Second*3, time.Millisecond*500).Should(BeNil(), "RayCluster %v not found", rayJob.Status.RayClusterName)
+
+ // Worker replicas set to 3
+ Expect(*rayCluster.Spec.WorkerGroupSpecs[0].Replicas).To(Equal(int32(3)))
+
+ // 3 worker Pods exist
+ workerPods := corev1.PodList{}
+ workerLabels := common.RayClusterWorkerPodsAssociationOptions(rayCluster).ToListOptions()
+ Consistently(
+ listResourceFunc(ctx, &workerPods, workerLabels...),
+ time.Second*3, time.Millisecond*500).Should(Equal(3), "expected 3 workers")
+
+ // Head Pod is still running
+ headPods := corev1.PodList{}
+ headLabels := common.RayClusterHeadPodsAssociationOptions(rayCluster).ToListOptions()
+ Consistently(
+ listResourceFunc(ctx, &headPods, headLabels...),
+ time.Second*3, time.Millisecond*500).Should(Equal(1), "Head pod list should have only 1 Pod = %v", headPods.Items)
+
+ namespacedName := common.RayJobK8sJobNamespacedName(rayJob)
+ job := &batchv1.Job{}
+ Consistently(
+ getResourceFunc(ctx, namespacedName, job),
+ time.Second*3, time.Millisecond*500).Should(BeNil())
+ })
+ })
})