Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RayJob] implement deletion policy API #2643

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions docs/reference/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,19 @@ _Appears in:_
| `volumeMounts` _[VolumeMount](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.28/#volumemount-v1-core) array_ | Optional list of volumeMounts. This is needed for enabling TLS for the autoscaler container. | | |


#### DeletionPolicy

_Underlying type:_ _string_





_Appears in:_
- [RayJobSpec](#rayjobspec)





#### GcsFaultToleranceOptions
Expand Down Expand Up @@ -181,6 +194,7 @@ _Appears in:_
| `clusterSelector` _object (keys:string, values:string)_ | clusterSelector is used to select running rayclusters by labels | | |
| `submitterConfig` _[SubmitterConfig](#submitterconfig)_ | Configurations of submitter k8s job. | | |
| `managedBy` _string_ | ManagedBy is an optional configuration for the controller or entity that manages a RayJob.<br />The value must be either 'ray.io/kuberay-operator' or 'kueue.x-k8s.io/multikueue'.<br />The kuberay-operator reconciles a RayJob which doesn't have this field at all or<br />the field value is the reserved string 'ray.io/kuberay-operator',<br />but delegates reconciling the RayJob with 'kueue.x-k8s.io/multikueue' to the Kueue.<br />The field is immutable. | | |
| `deletionPolicy` _[DeletionPolicy](#deletionpolicy)_ | DeletionPolicy indicates what resources of the RayJob are deleted upon job completion.<br />Valid values are 'DeleteCluster', 'DeleteWorkers', 'DeleteSelf' or 'DeleteNone'.<br />If unset, deletion policy is based on 'spec.shutdownAfterJobFinishes'.<br />This field requires the RayJobDeletionPolicy feature gate to be enabled. | | |
| `entrypoint` _string_ | INSERT ADDITIONAL SPEC FIELDS - desired state of cluster<br />Important: Run "make" to regenerate code after modifying this file | | |
| `runtimeEnvYAML` _string_ | RuntimeEnvYAML represents the runtime environment configuration<br />provided as a multi-line YAML string. | | |
| `jobId` _string_ | If jobId is not set, a new jobId will be auto-generated. | | |
Expand Down
6 changes: 6 additions & 0 deletions helm-chart/kuberay-operator/crds/ray.io_rayjobs.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions helm-chart/kuberay-operator/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ batchScheduler:
featureGates:
- name: RayClusterStatusConditions
enabled: true
- name: RayJobDeletionPolicy
enabled: false

# Path to the operator binary
operatorComand: /manager
Expand Down
15 changes: 15 additions & 0 deletions ray-operator/apis/ray/v1/rayjob_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,15 @@ const (
InteractiveMode JobSubmissionMode = "InteractiveMode" // Don't submit job in KubeRay. Instead, wait for user to submit job and provide the job submission ID.
)

type DeletionPolicy string

const (
DeleteClusterDeletionPolicy DeletionPolicy = "DeleteCluster" // Deletion policy to delete the entire Ray cluster on job completion.
DeleteWorkersDeletionPolicy DeletionPolicy = "DeleteWorkers" // Deletion policy to delete only the workers on job completion.
DeleteSelfDeletionPolicy DeletionPolicy = "DeleteSelf" // Deletion policy to delete the RayJob resource (and all associated resources) on job completion.
DeleteNoneDeletionPolicy DeletionPolicy = "DeleteNone" // Deletion policy to delete no resources on job completion.
)

type SubmitterConfig struct {
// BackoffLimit of the submitter k8s job.
BackoffLimit *int32 `json:"backoffLimit,omitempty"`
Expand Down Expand Up @@ -95,6 +104,12 @@ type RayJobSpec struct {
// +kubebuilder:validation:XValidation:rule="self == oldSelf",message="the managedBy field is immutable"
// +kubebuilder:validation:XValidation:rule="self in ['ray.io/kuberay-operator', 'kueue.x-k8s.io/multikueue']",message="the managedBy field value must be either 'ray.io/kuberay-operator' or 'kueue.x-k8s.io/multikueue'"
ManagedBy *string `json:"managedBy,omitempty"`
// DeletionPolicy indicates what resources of the RayJob are deleted upon job completion.
// Valid values are 'DeleteCluster', 'DeleteWorkers', 'DeleteSelf' or 'DeleteNone'.
// If unset, deletion policy is based on 'spec.shutdownAfterJobFinishes'.
// This field requires the RayJobDeletionPolicy feature gate to be enabled.
// +kubebuilder:validation:XValidation:rule="self in ['DeleteCluster', 'DeleteWorkers', 'DeleteSelf', 'DeleteNone']",message="the deletionPolicy field value must be either 'DeleteCluster', 'DeleteWorkers', 'DeleteSelf', or 'DeleteNone'"
DeletionPolicy *DeletionPolicy `json:"deletionPolicy,omitempty"`
// INSERT ADDITIONAL SPEC FIELDS - desired state of cluster
// Important: Run "make" to regenerate code after modifying this file
Entrypoint string `json:"entrypoint,omitempty"`
Expand Down
5 changes: 5 additions & 0 deletions ray-operator/apis/ray/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions ray-operator/config/crd/bases/ray.io_rayjobs.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

81 changes: 76 additions & 5 deletions ray-operator/controllers/ray/rayjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

"github.com/ray-project/kuberay/ray-operator/controllers/ray/common"
"github.com/ray-project/kuberay/ray-operator/controllers/ray/utils"
"github.com/ray-project/kuberay/ray-operator/pkg/features"

"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -346,11 +347,53 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, nil
case rayv1.JobDeploymentStatusComplete, rayv1.JobDeploymentStatusFailed:
// If this RayJob uses an existing RayCluster (i.e., ClusterSelector is set), we should not delete the RayCluster.
logger.Info(string(rayJobInstance.Status.JobDeploymentStatus), "RayJob", rayJobInstance.Name, "ShutdownAfterJobFinishes", rayJobInstance.Spec.ShutdownAfterJobFinishes, "ClusterSelector", rayJobInstance.Spec.ClusterSelector)
if rayJobInstance.Spec.ShutdownAfterJobFinishes && len(rayJobInstance.Spec.ClusterSelector) == 0 {
ttlSeconds := rayJobInstance.Spec.TTLSecondsAfterFinished
nowTime := time.Now()
shutdownTime := rayJobInstance.Status.EndTime.Add(time.Duration(ttlSeconds) * time.Second)
ttlSeconds := rayJobInstance.Spec.TTLSecondsAfterFinished
nowTime := time.Now()
shutdownTime := rayJobInstance.Status.EndTime.Add(time.Duration(ttlSeconds) * time.Second)
logger.Info(string(rayJobInstance.Status.JobDeploymentStatus), "RayJob", rayJobInstance.Name,
"ShutdownAfterJobFinishes", rayJobInstance.Spec.ShutdownAfterJobFinishes,
"ClusterSelector", rayJobInstance.Spec.ClusterSelector,
"ttlSecondsAfterFinished", ttlSeconds,
"Status.endTime", rayJobInstance.Status.EndTime,
"Now", nowTime,
"ShutdownTime", shutdownTime)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe move the check before if features.Enabled(features.RayJobDeletionPolicy) ... to simplify the code?

	if shutdownTime.After(nowTime) {
		delta := int32(time.Until(shutdownTime.Add(2 * time.Second)).Seconds())
		logger.Info("shutdownTime not reached; requeue this RayJob.", "requeue seconds", delta)
		return ctrl.Result{RequeueAfter: time.Duration(delta) * time.Second}, nil
	}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could add a requeue for cases where deletion is not required

if features.Enabled(features.RayJobDeletionPolicy) &&
andrewsykim marked this conversation as resolved.
Show resolved Hide resolved
rayJobInstance.Spec.DeletionPolicy != nil &&
*rayJobInstance.Spec.DeletionPolicy != rayv1.DeleteNoneDeletionPolicy &&
len(rayJobInstance.Spec.ClusterSelector) == 0 {
logger.Info(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move

			logger.Info(
				"RayJob deployment status",
				"jobDeploymentStatus", rayJobInstance.Status.JobDeploymentStatus,
				"deletionPolicy", rayJobInstance.Spec.DeletionPolicy,
				"ttlSecondsAfterFinished", ttlSeconds,
				"Status.endTime", rayJobInstance.Status.EndTime,
				"Now", nowTime,
				"ShutdownTime", shutdownTime)
			if shutdownTime.After(nowTime) {
				delta := int32(time.Until(shutdownTime.Add(2 * time.Second)).Seconds())
				logger.Info("shutdownTime not reached, requeue this RayJob for n seconds", "seconds", delta)
				return ctrl.Result{RequeueAfter: time.Duration(delta) * time.Second}, nil
			}

to the above of if features.Enabled(features.RayJobDeletionPolicy) && and remove the similar logics from L391 to L403.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The conditions are split here because the keys passed into logger.Info are different (shutdownAfterJobFinishes vs deletePolicy)

"RayJob deployment status",
"jobDeploymentStatus", rayJobInstance.Status.JobDeploymentStatus,
"deletionPolicy", rayJobInstance.Spec.DeletionPolicy,
"ttlSecondsAfterFinished", ttlSeconds,
"Status.endTime", rayJobInstance.Status.EndTime,
"Now", nowTime,
"ShutdownTime", shutdownTime)
if shutdownTime.After(nowTime) {
delta := int32(time.Until(shutdownTime.Add(2 * time.Second)).Seconds())
logger.Info("shutdownTime not reached, requeue this RayJob for n seconds", "seconds", delta)
return ctrl.Result{RequeueAfter: time.Duration(delta) * time.Second}, nil
}

switch *rayJobInstance.Spec.DeletionPolicy {
case rayv1.DeleteClusterDeletionPolicy:
logger.Info("Deleting RayCluster", "RayCluster", rayJobInstance.Status.RayClusterName)
_, err = r.deleteClusterResources(ctx, rayJobInstance)
case rayv1.DeleteWorkersDeletionPolicy:
logger.Info("Suspending all worker groups", "RayCluster", rayJobInstance.Status.RayClusterName)
err = r.suspendWorkerGroups(ctx, rayJobInstance)
case rayv1.DeleteSelfDeletionPolicy:
logger.Info("Deleting RayJob")
err = r.Client.Delete(ctx, rayJobInstance)
default:
}
if err != nil {
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
}
}

if (!features.Enabled(features.RayJobDeletionPolicy) || rayJobInstance.Spec.DeletionPolicy == nil) && rayJobInstance.Spec.ShutdownAfterJobFinishes && len(rayJobInstance.Spec.ClusterSelector) == 0 {
logger.Info(
"RayJob deployment status",
"jobDeploymentStatus", rayJobInstance.Status.JobDeploymentStatus,
Expand All @@ -377,6 +420,7 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
}
}

// If the RayJob is completed, we should not requeue it.
return ctrl.Result{}, nil
default:
Expand Down Expand Up @@ -617,6 +661,30 @@ func (r *RayJobReconciler) deleteClusterResources(ctx context.Context, rayJobIns
return isClusterDeleted, nil
}

func (r *RayJobReconciler) suspendWorkerGroups(ctx context.Context, rayJobInstance *rayv1.RayJob) error {
logger := ctrl.LoggerFrom(ctx)
clusterIdentifier := common.RayJobRayClusterNamespacedName(rayJobInstance)

cluster := rayv1.RayCluster{}
if err := r.Get(ctx, clusterIdentifier, &cluster); err != nil {
return err
}

for i := range cluster.Spec.WorkerGroupSpecs {
cluster.Spec.WorkerGroupSpecs[i].Suspend = ptr.To[bool](true)
}

if err := r.Update(ctx, &cluster); err != nil {
r.Recorder.Eventf(rayJobInstance, corev1.EventTypeWarning, string(utils.FailedToUpdateRayCluster), "Failed to update cluster %s/%s: %v", cluster.Namespace, cluster.Name, err)
return err
}

logger.Info("All worker groups for RayCluster have had `suspend` set to true", "RayCluster", clusterIdentifier)
r.Recorder.Eventf(rayJobInstance, corev1.EventTypeNormal, string(utils.UpdatedRayCluster), "Set the `suspend` field to true for all worker groups in cluster %s/%s", cluster.Namespace, cluster.Name)

return nil
}

// SetupWithManager sets up the controller with the Manager.
func (r *RayJobReconciler) SetupWithManager(mgr ctrl.Manager, reconcileConcurrency int) error {
return ctrl.NewControllerManagedBy(mgr).
Expand Down Expand Up @@ -845,6 +913,9 @@ func validateRayJobSpec(rayJob *rayv1.RayJob) error {
if rayJob.Spec.BackoffLimit != nil && *rayJob.Spec.BackoffLimit < 0 {
return fmt.Errorf("backoffLimit must be a positive integer")
}
if rayJob.Spec.ShutdownAfterJobFinishes && rayJob.Spec.DeletionPolicy != nil && *rayJob.Spec.DeletionPolicy == rayv1.DeleteNoneDeletionPolicy {
return fmt.Errorf("shutdownAfterJobFinshes is set to 'true' while deletion policy is 'DeleteNone'")
}
return nil
}

Expand Down
Loading
Loading