diff --git a/docs/reference/api.md b/docs/reference/api.md
index 6ae199a9301..ffa42a5a61e 100644
--- a/docs/reference/api.md
+++ b/docs/reference/api.md
@@ -40,6 +40,19 @@ _Appears in:_
| `volumeMounts` _[VolumeMount](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.28/#volumemount-v1-core) array_ | Optional list of volumeMounts. This is needed for enabling TLS for the autoscaler container. | | |
+#### DeletionPolicy
+
+_Underlying type:_ _string_
+
+
+
+
+
+_Appears in:_
+- [RayJobSpec](#rayjobspec)
+
+
+
#### HeadGroupSpec
@@ -171,6 +184,7 @@ _Appears in:_
| `ttlSecondsAfterFinished` _integer_ | TTLSecondsAfterFinished is the TTL to clean up RayCluster.
It's only working when ShutdownAfterJobFinishes set to true. | 0 | |
| `shutdownAfterJobFinishes` _boolean_ | ShutdownAfterJobFinishes will determine whether to delete the ray cluster once rayJob succeed or failed. | | |
| `suspend` _boolean_ | suspend specifies whether the RayJob controller should create a RayCluster instance
If a job is applied with the suspend field set to true,
the RayCluster will not be created and will wait for the transition to false.
If the RayCluster is already created, it will be deleted.
In case of transition to false a new RayCluster will be created. | | |
+| `deletionPolicy` _[DeletionPolicy](#deletionpolicy)_ | deletionPolicy indicates what resources of the RayJob are deleted upon job completeion.
Valid values are 'DeleteCluster', 'DeleteWorkers', 'DeleteSelf' or 'None'.
If unset, deletion policy is based on 'spec.shutdownAfterJobFinishes'.
This field requires the RayJobDeletionPolicy feature gate to be enabled. | | |
diff --git a/helm-chart/kuberay-operator/crds/ray.io_rayjobs.yaml b/helm-chart/kuberay-operator/crds/ray.io_rayjobs.yaml
index ffbe90665d9..73611c65af9 100644
--- a/helm-chart/kuberay-operator/crds/ray.io_rayjobs.yaml
+++ b/helm-chart/kuberay-operator/crds/ray.io_rayjobs.yaml
@@ -58,6 +58,12 @@ spec:
additionalProperties:
type: string
type: object
+ deletionPolicy:
+ type: string
+ x-kubernetes-validations:
+ - message: the deletionPolicy field value must be either 'DeleteCluster',
+ 'DeleteWorkers', 'DeleteSelf', or 'DeleteNone'
+ rule: self in ['DeleteCluster', 'DeleteWorkers', 'DeleteSelf', 'DeleteNone']
entrypoint:
type: string
entrypointNumCpus:
diff --git a/helm-chart/kuberay-operator/values.yaml b/helm-chart/kuberay-operator/values.yaml
index 7b2cc7601cf..354554c0b31 100644
--- a/helm-chart/kuberay-operator/values.yaml
+++ b/helm-chart/kuberay-operator/values.yaml
@@ -89,6 +89,8 @@ batchScheduler:
featureGates:
- name: RayClusterStatusConditions
enabled: false
+ - name: RayJobDeletionPolicy
+ enabled: false
# Path to the operator binary
operatorComand: /manager
diff --git a/ray-operator/apis/ray/v1/rayjob_types.go b/ray-operator/apis/ray/v1/rayjob_types.go
index d975011ed8c..ae78b3fcc4f 100644
--- a/ray-operator/apis/ray/v1/rayjob_types.go
+++ b/ray-operator/apis/ray/v1/rayjob_types.go
@@ -62,6 +62,15 @@ const (
InteractiveMode JobSubmissionMode = "InteractiveMode" // Don't submit job in KubeRay. Instead, wait for user to submit job and provide the job submission ID.
)
+type DeletionPolicy string
+
+const (
+ DeleteClusterDeletionPolicy DeletionPolicy = "DeleteCluster"
+ DeleteWorkersDeletionPolicy DeletionPolicy = "DeleteWorkers"
+ DeleteSelfDeltionPolicy DeletionPolicy = "DeleteSelf"
+ DeleteNoneDeletionPolicy DeletionPolicy = "None"
+)
+
type SubmitterConfig struct {
// BackoffLimit of the submitter k8s job.
BackoffLimit *int32 `json:"backoffLimit,omitempty"`
@@ -128,6 +137,12 @@ type RayJobSpec struct {
// If the RayCluster is already created, it will be deleted.
// In case of transition to false a new RayCluster will be created.
Suspend bool `json:"suspend,omitempty"`
+ // deletionPolicy indicates what resources of the RayJob are deleted upon job completeion.
+ // Valid values are 'DeleteCluster', 'DeleteWorkers', 'DeleteSelf' or 'None'.
+ // If unset, deletion policy is based on 'spec.shutdownAfterJobFinishes'.
+ // This field requires the RayJobDeletionPolicy feature gate to be enabled.
+ // +kubebuilder:validation:XValidation:rule="self in ['DeleteCluster', 'DeleteWorkers', 'DeleteSelf', 'DeleteNone']",message="the deletionPolicy field value must be either 'DeleteCluster', 'DeleteWorkers', 'DeleteSelf', or 'DeleteNone'"
+ DeletionPolicy *DeletionPolicy `json:"deletionPolicy,omitempty"`
}
// RayJobStatus defines the observed state of RayJob
diff --git a/ray-operator/apis/ray/v1/zz_generated.deepcopy.go b/ray-operator/apis/ray/v1/zz_generated.deepcopy.go
index 586c39ac5a7..c8b4a7efaf7 100644
--- a/ray-operator/apis/ray/v1/zz_generated.deepcopy.go
+++ b/ray-operator/apis/ray/v1/zz_generated.deepcopy.go
@@ -410,6 +410,11 @@ func (in *RayJobSpec) DeepCopyInto(out *RayJobSpec) {
*out = new(string)
**out = **in
}
+ if in.DeletionPolicy != nil {
+ in, out := &in.DeletionPolicy, &out.DeletionPolicy
+ *out = new(DeletionPolicy)
+ **out = **in
+ }
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new RayJobSpec.
diff --git a/ray-operator/config/crd/bases/ray.io_rayjobs.yaml b/ray-operator/config/crd/bases/ray.io_rayjobs.yaml
index ffbe90665d9..73611c65af9 100644
--- a/ray-operator/config/crd/bases/ray.io_rayjobs.yaml
+++ b/ray-operator/config/crd/bases/ray.io_rayjobs.yaml
@@ -58,6 +58,12 @@ spec:
additionalProperties:
type: string
type: object
+ deletionPolicy:
+ type: string
+ x-kubernetes-validations:
+ - message: the deletionPolicy field value must be either 'DeleteCluster',
+ 'DeleteWorkers', 'DeleteSelf', or 'DeleteNone'
+ rule: self in ['DeleteCluster', 'DeleteWorkers', 'DeleteSelf', 'DeleteNone']
entrypoint:
type: string
entrypointNumCpus:
diff --git a/ray-operator/controllers/ray/rayjob_controller.go b/ray-operator/controllers/ray/rayjob_controller.go
index ccf041f06ff..6e440d46a2b 100644
--- a/ray-operator/controllers/ray/rayjob_controller.go
+++ b/ray-operator/controllers/ray/rayjob_controller.go
@@ -19,6 +19,7 @@ import (
"github.com/ray-project/kuberay/ray-operator/controllers/ray/common"
"github.com/ray-project/kuberay/ray-operator/controllers/ray/utils"
+ "github.com/ray-project/kuberay/ray-operator/pkg/features"
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
@@ -347,10 +348,46 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
case rayv1.JobDeploymentStatusComplete, rayv1.JobDeploymentStatusFailed:
// If this RayJob uses an existing RayCluster (i.e., ClusterSelector is set), we should not delete the RayCluster.
logger.Info(string(rayJobInstance.Status.JobDeploymentStatus), "RayJob", rayJobInstance.Name, "ShutdownAfterJobFinishes", rayJobInstance.Spec.ShutdownAfterJobFinishes, "ClusterSelector", rayJobInstance.Spec.ClusterSelector)
- if rayJobInstance.Spec.ShutdownAfterJobFinishes && len(rayJobInstance.Spec.ClusterSelector) == 0 {
- ttlSeconds := rayJobInstance.Spec.TTLSecondsAfterFinished
- nowTime := time.Now()
- shutdownTime := rayJobInstance.Status.EndTime.Add(time.Duration(ttlSeconds) * time.Second)
+ ttlSeconds := rayJobInstance.Spec.TTLSecondsAfterFinished
+ nowTime := time.Now()
+ shutdownTime := rayJobInstance.Status.EndTime.Add(time.Duration(ttlSeconds) * time.Second)
+
+ if features.Enabled(features.RayJobDeletionPolicy) &&
+ rayJobInstance.Spec.DeletionPolicy != nil &&
+ *rayJobInstance.Spec.DeletionPolicy != rayv1.DeleteNoneDeletionPolicy &&
+ len(rayJobInstance.Spec.ClusterSelector) == 0 {
+ logger.Info(
+ "RayJob deployment status",
+ "jobDeploymentStatus", rayJobInstance.Status.JobDeploymentStatus,
+ "deletionPolicy", rayJobInstance.Spec.DeletionPolicy,
+ "ttlSecondsAfterFinished", ttlSeconds,
+ "Status.endTime", rayJobInstance.Status.EndTime,
+ "Now", nowTime,
+ "ShutdownTime", shutdownTime)
+ if shutdownTime.After(nowTime) {
+ delta := int32(time.Until(shutdownTime.Add(2 * time.Second)).Seconds())
+ logger.Info("shutdownTime not reached, requeue this RayJob for n seconds", "seconds", delta)
+ return ctrl.Result{RequeueAfter: time.Duration(delta) * time.Second}, nil
+ }
+
+ switch *rayJobInstance.Spec.DeletionPolicy {
+ case rayv1.DeleteClusterDeletionPolicy:
+ logger.Info("Deleting RayCluster", "RayCluster", rayJobInstance.Status.RayClusterName)
+ _, err = r.deleteClusterResources(ctx, rayJobInstance)
+ case rayv1.DeleteWorkersDeletionPolicy:
+ logger.Info("Scaling all worker replicas to 0", "RayCluster", rayJobInstance.Status.RayClusterName)
+ _, err = r.scaleWorkerReplicasToZero(ctx, rayJobInstance)
+ case rayv1.DeleteSelfDeltionPolicy:
+ logger.Info("Deleting RayJob")
+ err = r.Client.Delete(ctx, rayJobInstance)
+ default:
+ }
+ if err != nil {
+ return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
+ }
+ }
+
+ if rayJobInstance.Spec.DeletionPolicy == nil && rayJobInstance.Spec.ShutdownAfterJobFinishes && len(rayJobInstance.Spec.ClusterSelector) == 0 {
logger.Info(
"RayJob deployment status",
"jobDeploymentStatus", rayJobInstance.Status.JobDeploymentStatus,
@@ -377,6 +414,7 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
}
}
+
// If the RayJob is completed, we should not requeue it.
return ctrl.Result{}, nil
default:
@@ -616,6 +654,31 @@ func (r *RayJobReconciler) deleteClusterResources(ctx context.Context, rayJobIns
return isClusterDeleted, nil
}
+func (r *RayJobReconciler) scaleWorkerReplicasToZero(ctx context.Context, rayJobInstance *rayv1.RayJob) (bool, error) {
+ logger := ctrl.LoggerFrom(ctx)
+ clusterIdentifier := common.RayJobRayClusterNamespacedName(rayJobInstance)
+
+ cluster := rayv1.RayCluster{}
+ if err := r.Get(ctx, clusterIdentifier, &cluster); err != nil {
+ return false, err
+ }
+
+ for i := range cluster.Spec.WorkerGroupSpecs {
+ cluster.Spec.WorkerGroupSpecs[i].Replicas = ptr.To[int32](0)
+ cluster.Spec.WorkerGroupSpecs[i].MinReplicas = ptr.To[int32](0)
+ }
+
+ if err := r.Update(ctx, &cluster); err != nil {
+ r.Recorder.Eventf(rayJobInstance, corev1.EventTypeWarning, string(utils.FailedToUpdateRayCluster), "Failed to update cluster %s/%s: %v", cluster.Namespace, cluster.Name, err)
+ return false, err
+ }
+
+ logger.Info("All worker groups for RayCluster has been scaled to 0", "RayCluster", clusterIdentifier)
+ r.Recorder.Eventf(rayJobInstance, corev1.EventTypeNormal, string(utils.UpdatedRayCluster), "Updated cluster %s/%s", cluster.Namespace, cluster.Name)
+
+ return true, nil
+}
+
// SetupWithManager sets up the controller with the Manager.
func (r *RayJobReconciler) SetupWithManager(mgr ctrl.Manager, reconcileConcurrency int) error {
return ctrl.NewControllerManagedBy(mgr).
diff --git a/ray-operator/controllers/ray/utils/constant.go b/ray-operator/controllers/ray/utils/constant.go
index dbcff1e0b0c..5e273550a2c 100644
--- a/ray-operator/controllers/ray/utils/constant.go
+++ b/ray-operator/controllers/ray/utils/constant.go
@@ -261,9 +261,11 @@ const (
FailedToCreateRayJobSubmitter K8sEventType = "FailedToCreateRayJobSubmitter"
FailedToDeleteRayJobSubmitter K8sEventType = "FailedToDeleteRayJobSubmitter"
CreatedRayCluster K8sEventType = "CreatedRayCluster"
+ UpdatedRayCluster K8sEventType = "UpdatedRayCluster"
DeletedRayCluster K8sEventType = "DeletedRayCluster"
FailedToCreateRayCluster K8sEventType = "FailedToCreateRayCluster"
FailedToDeleteRayCluster K8sEventType = "FailedToDeleteRayCluster"
+ FailedToUpdateRayCluster K8sEventType = "FailedToUpdateRayCluster"
// RayService event list
InvalidRayServiceSpec K8sEventType = "InvalidRayServiceSpec"
diff --git a/ray-operator/pkg/client/applyconfiguration/ray/v1/rayjobspec.go b/ray-operator/pkg/client/applyconfiguration/ray/v1/rayjobspec.go
index ced013bebf7..99e3adea1d2 100644
--- a/ray-operator/pkg/client/applyconfiguration/ray/v1/rayjobspec.go
+++ b/ray-operator/pkg/client/applyconfiguration/ray/v1/rayjobspec.go
@@ -28,6 +28,7 @@ type RayJobSpecApplyConfiguration struct {
TTLSecondsAfterFinished *int32 `json:"ttlSecondsAfterFinished,omitempty"`
ShutdownAfterJobFinishes *bool `json:"shutdownAfterJobFinishes,omitempty"`
Suspend *bool `json:"suspend,omitempty"`
+ DeletionPolicy *rayv1.DeletionPolicy `json:"deletionPolicy,omitempty"`
}
// RayJobSpecApplyConfiguration constructs an declarative configuration of the RayJobSpec type for use with
@@ -191,3 +192,11 @@ func (b *RayJobSpecApplyConfiguration) WithSuspend(value bool) *RayJobSpecApplyC
b.Suspend = &value
return b
}
+
+// WithDeletionPolicy sets the DeletionPolicy field in the declarative configuration to the given value
+// and returns the receiver, so that objects can be built by chaining "With" function invocations.
+// If called multiple times, the DeletionPolicy field is set to the value of the last call.
+func (b *RayJobSpecApplyConfiguration) WithDeletionPolicy(value rayv1.DeletionPolicy) *RayJobSpecApplyConfiguration {
+ b.DeletionPolicy = &value
+ return b
+}
diff --git a/ray-operator/pkg/features/features.go b/ray-operator/pkg/features/features.go
index 178e215e602..3df4422d600 100644
--- a/ray-operator/pkg/features/features.go
+++ b/ray-operator/pkg/features/features.go
@@ -17,6 +17,13 @@ const (
//
// Enables new conditions in RayCluster status
RayClusterStatusConditions featuregate.Feature = "RayClusterStatusConditions"
+
+ // owner: @andrewsykim
+ // rep: N/A
+ // alpha: v1.3
+ //
+ // Enables new deletion policy API in RayJob
+ RayJobDeletionPolicy featuregate.Feature = "RayJobDeletionPolicy"
)
func init() {
@@ -25,6 +32,7 @@ func init() {
var defaultFeatureGates = map[featuregate.Feature]featuregate.FeatureSpec{
RayClusterStatusConditions: {Default: true, PreRelease: featuregate.Beta},
+ RayJobDeletionPolicy: {Default: false, PreRelease: featuregate.Alpha},
}
// SetFeatureGateDuringTest is a helper method to override feature gates in tests.