diff --git a/docs/reference/api.md b/docs/reference/api.md
index 5d0a2ed062..fd4145e713 100644
--- a/docs/reference/api.md
+++ b/docs/reference/api.md
@@ -40,6 +40,19 @@ _Appears in:_
| `volumeMounts` _[VolumeMount](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.28/#volumemount-v1-core) array_ | Optional list of volumeMounts. This is needed for enabling TLS for the autoscaler container. | | |
+#### DeletionPolicy
+
+_Underlying type:_ _string_
+
+
+
+
+
+_Appears in:_
+- [RayJobSpec](#rayjobspec)
+
+
+
#### HeadGroupSpec
@@ -161,6 +174,7 @@ _Appears in:_
| `clusterSelector` _object (keys:string, values:string)_ | clusterSelector is used to select running rayclusters by labels | | |
| `submitterConfig` _[SubmitterConfig](#submitterconfig)_ | Configurations of submitter k8s job. | | |
| `managedBy` _string_ | ManagedBy is an optional configuration for the controller or entity that manages a RayJob.
The value must be either 'ray.io/kuberay-operator' or 'kueue.x-k8s.io/multikueue'.
The kuberay-operator reconciles a RayJob which doesn't have this field at all or
the field value is the reserved string 'ray.io/kuberay-operator',
but delegates reconciling the RayJob with 'kueue.x-k8s.io/multikueue' to the Kueue.
The field is immutable. | | |
+| `deletionPolicy` _[DeletionPolicy](#deletionpolicy)_ | deletionPolicy indicates what resources of the RayJob are deleted upon job completion.
Valid values are 'DeleteCluster', 'DeleteWorkers', 'DeleteSelf' or 'None'.
If unset, deletion policy is based on 'spec.shutdownAfterJobFinishes'.
This field requires the RayJobDeletionPolicy feature gate to be enabled. | | |
| `entrypoint` _string_ | INSERT ADDITIONAL SPEC FIELDS - desired state of cluster
Important: Run "make" to regenerate code after modifying this file | | |
| `runtimeEnvYAML` _string_ | RuntimeEnvYAML represents the runtime environment configuration
provided as a multi-line YAML string. | | |
| `jobId` _string_ | If jobId is not set, a new jobId will be auto-generated. | | |
diff --git a/helm-chart/kuberay-operator/crds/ray.io_rayjobs.yaml b/helm-chart/kuberay-operator/crds/ray.io_rayjobs.yaml
index ffbe90665d..73611c65af 100644
--- a/helm-chart/kuberay-operator/crds/ray.io_rayjobs.yaml
+++ b/helm-chart/kuberay-operator/crds/ray.io_rayjobs.yaml
@@ -58,6 +58,12 @@ spec:
additionalProperties:
type: string
type: object
+ deletionPolicy:
+ type: string
+ x-kubernetes-validations:
+ - message: the deletionPolicy field value must be either 'DeleteCluster',
+ 'DeleteWorkers', 'DeleteSelf', or 'DeleteNone'
+ rule: self in ['DeleteCluster', 'DeleteWorkers', 'DeleteSelf', 'DeleteNone']
entrypoint:
type: string
entrypointNumCpus:
diff --git a/helm-chart/kuberay-operator/values.yaml b/helm-chart/kuberay-operator/values.yaml
index d9b4912c59..803a7aeda6 100644
--- a/helm-chart/kuberay-operator/values.yaml
+++ b/helm-chart/kuberay-operator/values.yaml
@@ -89,6 +89,8 @@ batchScheduler:
featureGates:
- name: RayClusterStatusConditions
enabled: true
+ - name: RayJobDeletionPolicy
+ enabled: false
# Path to the operator binary
operatorComand: /manager
diff --git a/ray-operator/apis/ray/v1/rayjob_types.go b/ray-operator/apis/ray/v1/rayjob_types.go
index d975011ed8..3ec550c83e 100644
--- a/ray-operator/apis/ray/v1/rayjob_types.go
+++ b/ray-operator/apis/ray/v1/rayjob_types.go
@@ -62,6 +62,15 @@ const (
InteractiveMode JobSubmissionMode = "InteractiveMode" // Don't submit job in KubeRay. Instead, wait for user to submit job and provide the job submission ID.
)
+type DeletionPolicy string
+
+const (
+ DeleteClusterDeletionPolicy DeletionPolicy = "DeleteCluster"
+ DeleteWorkersDeletionPolicy DeletionPolicy = "DeleteWorkers"
+ DeleteSelfDeltionPolicy DeletionPolicy = "DeleteSelf"
+ DeleteNoneDeletionPolicy DeletionPolicy = "None"
+)
+
type SubmitterConfig struct {
// BackoffLimit of the submitter k8s job.
BackoffLimit *int32 `json:"backoffLimit,omitempty"`
@@ -95,6 +104,12 @@ type RayJobSpec struct {
// +kubebuilder:validation:XValidation:rule="self == oldSelf",message="the managedBy field is immutable"
// +kubebuilder:validation:XValidation:rule="self in ['ray.io/kuberay-operator', 'kueue.x-k8s.io/multikueue']",message="the managedBy field value must be either 'ray.io/kuberay-operator' or 'kueue.x-k8s.io/multikueue'"
ManagedBy *string `json:"managedBy,omitempty"`
+ // deletionPolicy indicates what resources of the RayJob are deleted upon job completion.
+ // Valid values are 'DeleteCluster', 'DeleteWorkers', 'DeleteSelf' or 'None'.
+ // If unset, deletion policy is based on 'spec.shutdownAfterJobFinishes'.
+ // This field requires the RayJobDeletionPolicy feature gate to be enabled.
+ // +kubebuilder:validation:XValidation:rule="self in ['DeleteCluster', 'DeleteWorkers', 'DeleteSelf', 'DeleteNone']",message="the deletionPolicy field value must be either 'DeleteCluster', 'DeleteWorkers', 'DeleteSelf', or 'DeleteNone'"
+ DeletionPolicy *DeletionPolicy `json:"deletionPolicy,omitempty"`
// INSERT ADDITIONAL SPEC FIELDS - desired state of cluster
// Important: Run "make" to regenerate code after modifying this file
Entrypoint string `json:"entrypoint,omitempty"`
diff --git a/ray-operator/apis/ray/v1/zz_generated.deepcopy.go b/ray-operator/apis/ray/v1/zz_generated.deepcopy.go
index 586c39ac5a..c8b4a7efaf 100644
--- a/ray-operator/apis/ray/v1/zz_generated.deepcopy.go
+++ b/ray-operator/apis/ray/v1/zz_generated.deepcopy.go
@@ -410,6 +410,11 @@ func (in *RayJobSpec) DeepCopyInto(out *RayJobSpec) {
*out = new(string)
**out = **in
}
+ if in.DeletionPolicy != nil {
+ in, out := &in.DeletionPolicy, &out.DeletionPolicy
+ *out = new(DeletionPolicy)
+ **out = **in
+ }
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new RayJobSpec.
diff --git a/ray-operator/config/crd/bases/ray.io_rayjobs.yaml b/ray-operator/config/crd/bases/ray.io_rayjobs.yaml
index ffbe90665d..73611c65af 100644
--- a/ray-operator/config/crd/bases/ray.io_rayjobs.yaml
+++ b/ray-operator/config/crd/bases/ray.io_rayjobs.yaml
@@ -58,6 +58,12 @@ spec:
additionalProperties:
type: string
type: object
+ deletionPolicy:
+ type: string
+ x-kubernetes-validations:
+ - message: the deletionPolicy field value must be either 'DeleteCluster',
+ 'DeleteWorkers', 'DeleteSelf', or 'DeleteNone'
+ rule: self in ['DeleteCluster', 'DeleteWorkers', 'DeleteSelf', 'DeleteNone']
entrypoint:
type: string
entrypointNumCpus:
diff --git a/ray-operator/controllers/ray/rayjob_controller.go b/ray-operator/controllers/ray/rayjob_controller.go
index 0cb3ebd4e9..a1f4c9a5b4 100644
--- a/ray-operator/controllers/ray/rayjob_controller.go
+++ b/ray-operator/controllers/ray/rayjob_controller.go
@@ -19,6 +19,7 @@ import (
"github.com/ray-project/kuberay/ray-operator/controllers/ray/common"
"github.com/ray-project/kuberay/ray-operator/controllers/ray/utils"
+ "github.com/ray-project/kuberay/ray-operator/pkg/features"
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
@@ -347,10 +348,46 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
case rayv1.JobDeploymentStatusComplete, rayv1.JobDeploymentStatusFailed:
// If this RayJob uses an existing RayCluster (i.e., ClusterSelector is set), we should not delete the RayCluster.
logger.Info(string(rayJobInstance.Status.JobDeploymentStatus), "RayJob", rayJobInstance.Name, "ShutdownAfterJobFinishes", rayJobInstance.Spec.ShutdownAfterJobFinishes, "ClusterSelector", rayJobInstance.Spec.ClusterSelector)
- if rayJobInstance.Spec.ShutdownAfterJobFinishes && len(rayJobInstance.Spec.ClusterSelector) == 0 {
- ttlSeconds := rayJobInstance.Spec.TTLSecondsAfterFinished
- nowTime := time.Now()
- shutdownTime := rayJobInstance.Status.EndTime.Add(time.Duration(ttlSeconds) * time.Second)
+ ttlSeconds := rayJobInstance.Spec.TTLSecondsAfterFinished
+ nowTime := time.Now()
+ shutdownTime := rayJobInstance.Status.EndTime.Add(time.Duration(ttlSeconds) * time.Second)
+
+ if features.Enabled(features.RayJobDeletionPolicy) &&
+ rayJobInstance.Spec.DeletionPolicy != nil &&
+ *rayJobInstance.Spec.DeletionPolicy != rayv1.DeleteNoneDeletionPolicy &&
+ len(rayJobInstance.Spec.ClusterSelector) == 0 {
+ logger.Info(
+ "RayJob deployment status",
+ "jobDeploymentStatus", rayJobInstance.Status.JobDeploymentStatus,
+ "deletionPolicy", rayJobInstance.Spec.DeletionPolicy,
+ "ttlSecondsAfterFinished", ttlSeconds,
+ "Status.endTime", rayJobInstance.Status.EndTime,
+ "Now", nowTime,
+ "ShutdownTime", shutdownTime)
+ if shutdownTime.After(nowTime) {
+ delta := int32(time.Until(shutdownTime.Add(2 * time.Second)).Seconds())
+ logger.Info("shutdownTime not reached, requeue this RayJob for n seconds", "seconds", delta)
+ return ctrl.Result{RequeueAfter: time.Duration(delta) * time.Second}, nil
+ }
+
+ switch *rayJobInstance.Spec.DeletionPolicy {
+ case rayv1.DeleteClusterDeletionPolicy:
+ logger.Info("Deleting RayCluster", "RayCluster", rayJobInstance.Status.RayClusterName)
+ _, err = r.deleteClusterResources(ctx, rayJobInstance)
+ case rayv1.DeleteWorkersDeletionPolicy:
+ logger.Info("Scaling all worker replicas to 0", "RayCluster", rayJobInstance.Status.RayClusterName)
+ _, err = r.scaleWorkerReplicasToZero(ctx, rayJobInstance)
+ case rayv1.DeleteSelfDeltionPolicy:
+ logger.Info("Deleting RayJob")
+ err = r.Client.Delete(ctx, rayJobInstance)
+ default:
+ }
+ if err != nil {
+ return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
+ }
+ }
+
+ if (!features.Enabled(features.RayJobDeletionPolicy) || rayJobInstance.Spec.DeletionPolicy == nil) && rayJobInstance.Spec.ShutdownAfterJobFinishes && len(rayJobInstance.Spec.ClusterSelector) == 0 {
logger.Info(
"RayJob deployment status",
"jobDeploymentStatus", rayJobInstance.Status.JobDeploymentStatus,
@@ -377,6 +414,7 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
}
}
+
// If the RayJob is completed, we should not requeue it.
return ctrl.Result{}, nil
default:
@@ -617,6 +655,31 @@ func (r *RayJobReconciler) deleteClusterResources(ctx context.Context, rayJobIns
return isClusterDeleted, nil
}
+func (r *RayJobReconciler) scaleWorkerReplicasToZero(ctx context.Context, rayJobInstance *rayv1.RayJob) (bool, error) {
+ logger := ctrl.LoggerFrom(ctx)
+ clusterIdentifier := common.RayJobRayClusterNamespacedName(rayJobInstance)
+
+ cluster := rayv1.RayCluster{}
+ if err := r.Get(ctx, clusterIdentifier, &cluster); err != nil {
+ return false, err
+ }
+
+ for i := range cluster.Spec.WorkerGroupSpecs {
+ cluster.Spec.WorkerGroupSpecs[i].Replicas = ptr.To[int32](0)
+ cluster.Spec.WorkerGroupSpecs[i].MinReplicas = ptr.To[int32](0)
+ }
+
+ if err := r.Update(ctx, &cluster); err != nil {
+ r.Recorder.Eventf(rayJobInstance, corev1.EventTypeWarning, string(utils.FailedToUpdateRayCluster), "Failed to update cluster %s/%s: %v", cluster.Namespace, cluster.Name, err)
+ return false, err
+ }
+
+ logger.Info("All worker groups for RayCluster has been scaled to 0", "RayCluster", clusterIdentifier)
+ r.Recorder.Eventf(rayJobInstance, corev1.EventTypeNormal, string(utils.UpdatedRayCluster), "Updated cluster %s/%s", cluster.Namespace, cluster.Name)
+
+ return true, nil
+}
+
// SetupWithManager sets up the controller with the Manager.
func (r *RayJobReconciler) SetupWithManager(mgr ctrl.Manager, reconcileConcurrency int) error {
return ctrl.NewControllerManagedBy(mgr).
diff --git a/ray-operator/controllers/ray/utils/constant.go b/ray-operator/controllers/ray/utils/constant.go
index dbcff1e0b0..5e273550a2 100644
--- a/ray-operator/controllers/ray/utils/constant.go
+++ b/ray-operator/controllers/ray/utils/constant.go
@@ -261,9 +261,11 @@ const (
FailedToCreateRayJobSubmitter K8sEventType = "FailedToCreateRayJobSubmitter"
FailedToDeleteRayJobSubmitter K8sEventType = "FailedToDeleteRayJobSubmitter"
CreatedRayCluster K8sEventType = "CreatedRayCluster"
+ UpdatedRayCluster K8sEventType = "UpdatedRayCluster"
DeletedRayCluster K8sEventType = "DeletedRayCluster"
FailedToCreateRayCluster K8sEventType = "FailedToCreateRayCluster"
FailedToDeleteRayCluster K8sEventType = "FailedToDeleteRayCluster"
+ FailedToUpdateRayCluster K8sEventType = "FailedToUpdateRayCluster"
// RayService event list
InvalidRayServiceSpec K8sEventType = "InvalidRayServiceSpec"
diff --git a/ray-operator/pkg/client/applyconfiguration/ray/v1/rayjobspec.go b/ray-operator/pkg/client/applyconfiguration/ray/v1/rayjobspec.go
index ced013bebf..f453176236 100644
--- a/ray-operator/pkg/client/applyconfiguration/ray/v1/rayjobspec.go
+++ b/ray-operator/pkg/client/applyconfiguration/ray/v1/rayjobspec.go
@@ -18,6 +18,7 @@ type RayJobSpecApplyConfiguration struct {
ClusterSelector map[string]string `json:"clusterSelector,omitempty"`
SubmitterConfig *SubmitterConfigApplyConfiguration `json:"submitterConfig,omitempty"`
ManagedBy *string `json:"managedBy,omitempty"`
+ DeletionPolicy *rayv1.DeletionPolicy `json:"deletionPolicy,omitempty"`
Entrypoint *string `json:"entrypoint,omitempty"`
RuntimeEnvYAML *string `json:"runtimeEnvYAML,omitempty"`
JobId *string `json:"jobId,omitempty"`
@@ -112,6 +113,14 @@ func (b *RayJobSpecApplyConfiguration) WithManagedBy(value string) *RayJobSpecAp
return b
}
+// WithDeletionPolicy sets the DeletionPolicy field in the declarative configuration to the given value
+// and returns the receiver, so that objects can be built by chaining "With" function invocations.
+// If called multiple times, the DeletionPolicy field is set to the value of the last call.
+func (b *RayJobSpecApplyConfiguration) WithDeletionPolicy(value rayv1.DeletionPolicy) *RayJobSpecApplyConfiguration {
+ b.DeletionPolicy = &value
+ return b
+}
+
// WithEntrypoint sets the Entrypoint field in the declarative configuration to the given value
// and returns the receiver, so that objects can be built by chaining "With" function invocations.
// If called multiple times, the Entrypoint field is set to the value of the last call.
diff --git a/ray-operator/pkg/features/features.go b/ray-operator/pkg/features/features.go
index 178e215e60..3df4422d60 100644
--- a/ray-operator/pkg/features/features.go
+++ b/ray-operator/pkg/features/features.go
@@ -17,6 +17,13 @@ const (
//
// Enables new conditions in RayCluster status
RayClusterStatusConditions featuregate.Feature = "RayClusterStatusConditions"
+
+ // owner: @andrewsykim
+ // rep: N/A
+ // alpha: v1.3
+ //
+ // Enables new deletion policy API in RayJob
+ RayJobDeletionPolicy featuregate.Feature = "RayJobDeletionPolicy"
)
func init() {
@@ -25,6 +32,7 @@ func init() {
var defaultFeatureGates = map[featuregate.Feature]featuregate.FeatureSpec{
RayClusterStatusConditions: {Default: true, PreRelease: featuregate.Beta},
+ RayJobDeletionPolicy: {Default: false, PreRelease: featuregate.Alpha},
}
// SetFeatureGateDuringTest is a helper method to override feature gates in tests.