Skip to content

Commit

Permalink
[RayJob] implement deletion policy API
Browse files Browse the repository at this point in the history
Signed-off-by: Andrew Sy Kim <[email protected]>
  • Loading branch information
andrewsykim committed Dec 15, 2024
1 parent e4a9645 commit 08bdbfb
Show file tree
Hide file tree
Showing 10 changed files with 134 additions and 4 deletions.
14 changes: 14 additions & 0 deletions docs/reference/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,19 @@ _Appears in:_
| `volumeMounts` _[VolumeMount](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.28/#volumemount-v1-core) array_ | Optional list of volumeMounts. This is needed for enabling TLS for the autoscaler container. | | |


#### DeletionPolicy

_Underlying type:_ _string_





_Appears in:_
- [RayJobSpec](#rayjobspec)





#### HeadGroupSpec
Expand Down Expand Up @@ -171,6 +184,7 @@ _Appears in:_
| `ttlSecondsAfterFinished` _integer_ | TTLSecondsAfterFinished is the TTL to clean up RayCluster.<br />It's only working when ShutdownAfterJobFinishes set to true. | 0 | |
| `shutdownAfterJobFinishes` _boolean_ | ShutdownAfterJobFinishes will determine whether to delete the ray cluster once rayJob succeed or failed. | | |
| `suspend` _boolean_ | suspend specifies whether the RayJob controller should create a RayCluster instance<br />If a job is applied with the suspend field set to true,<br />the RayCluster will not be created and will wait for the transition to false.<br />If the RayCluster is already created, it will be deleted.<br />In case of transition to false a new RayCluster will be created. | | |
| `deletionPolicy` _[DeletionPolicy](#deletionpolicy)_ | deletionPolicy indicates what resources of the RayJob are deleted upon job completeion.<br />Valid values are 'DeleteCluster', 'DeleteWorkers', 'DeleteSelf' or 'None'.<br />If unset, deletion policy is based on 'spec.shutdownAfterJobFinishes'.<br />This field requires the RayJobDeletionPolicy feature gate to be enabled. | | |



Expand Down
6 changes: 6 additions & 0 deletions helm-chart/kuberay-operator/crds/ray.io_rayjobs.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions helm-chart/kuberay-operator/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ batchScheduler:
featureGates:
- name: RayClusterStatusConditions
enabled: false
- name: RayJobDeletionPolicy
enabled: false

# Path to the operator binary
operatorComand: /manager
Expand Down
15 changes: 15 additions & 0 deletions ray-operator/apis/ray/v1/rayjob_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,15 @@ const (
InteractiveMode JobSubmissionMode = "InteractiveMode" // Don't submit job in KubeRay. Instead, wait for user to submit job and provide the job submission ID.
)

type DeletionPolicy string

const (
DeleteClusterDeletionPolicy DeletionPolicy = "DeleteCluster"
DeleteWorkersDeletionPolicy DeletionPolicy = "DeleteWorkers"
DeleteSelfDeltionPolicy DeletionPolicy = "DeleteSelf"
DeleteNoneDeletionPolicy DeletionPolicy = "None"
)

type SubmitterConfig struct {
// BackoffLimit of the submitter k8s job.
BackoffLimit *int32 `json:"backoffLimit,omitempty"`
Expand Down Expand Up @@ -128,6 +137,12 @@ type RayJobSpec struct {
// If the RayCluster is already created, it will be deleted.
// In case of transition to false a new RayCluster will be created.
Suspend bool `json:"suspend,omitempty"`
// deletionPolicy indicates what resources of the RayJob are deleted upon job completeion.
// Valid values are 'DeleteCluster', 'DeleteWorkers', 'DeleteSelf' or 'None'.
// If unset, deletion policy is based on 'spec.shutdownAfterJobFinishes'.
// This field requires the RayJobDeletionPolicy feature gate to be enabled.
// +kubebuilder:validation:XValidation:rule="self in ['DeleteCluster', 'DeleteWorkers', 'DeleteSelf', 'DeleteNone']",message="the deletionPolicy field value must be either 'DeleteCluster', 'DeleteWorkers', 'DeleteSelf', or 'DeleteNone'"
DeletionPolicy *DeletionPolicy `json:"deletionPolicy,omitempty"`
}

// RayJobStatus defines the observed state of RayJob
Expand Down
5 changes: 5 additions & 0 deletions ray-operator/apis/ray/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions ray-operator/config/crd/bases/ray.io_rayjobs.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

71 changes: 67 additions & 4 deletions ray-operator/controllers/ray/rayjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

"github.com/ray-project/kuberay/ray-operator/controllers/ray/common"
"github.com/ray-project/kuberay/ray-operator/controllers/ray/utils"
"github.com/ray-project/kuberay/ray-operator/pkg/features"

"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -347,10 +348,46 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
case rayv1.JobDeploymentStatusComplete, rayv1.JobDeploymentStatusFailed:
// If this RayJob uses an existing RayCluster (i.e., ClusterSelector is set), we should not delete the RayCluster.
logger.Info(string(rayJobInstance.Status.JobDeploymentStatus), "RayJob", rayJobInstance.Name, "ShutdownAfterJobFinishes", rayJobInstance.Spec.ShutdownAfterJobFinishes, "ClusterSelector", rayJobInstance.Spec.ClusterSelector)
if rayJobInstance.Spec.ShutdownAfterJobFinishes && len(rayJobInstance.Spec.ClusterSelector) == 0 {
ttlSeconds := rayJobInstance.Spec.TTLSecondsAfterFinished
nowTime := time.Now()
shutdownTime := rayJobInstance.Status.EndTime.Add(time.Duration(ttlSeconds) * time.Second)
ttlSeconds := rayJobInstance.Spec.TTLSecondsAfterFinished
nowTime := time.Now()
shutdownTime := rayJobInstance.Status.EndTime.Add(time.Duration(ttlSeconds) * time.Second)

if features.Enabled(features.RayJobDeletionPolicy) &&
rayJobInstance.Spec.DeletionPolicy != nil &&
*rayJobInstance.Spec.DeletionPolicy != rayv1.DeleteNoneDeletionPolicy &&
len(rayJobInstance.Spec.ClusterSelector) == 0 {
logger.Info(
"RayJob deployment status",
"jobDeploymentStatus", rayJobInstance.Status.JobDeploymentStatus,
"deletionPolicy", rayJobInstance.Spec.DeletionPolicy,
"ttlSecondsAfterFinished", ttlSeconds,
"Status.endTime", rayJobInstance.Status.EndTime,
"Now", nowTime,
"ShutdownTime", shutdownTime)
if shutdownTime.After(nowTime) {
delta := int32(time.Until(shutdownTime.Add(2 * time.Second)).Seconds())
logger.Info("shutdownTime not reached, requeue this RayJob for n seconds", "seconds", delta)
return ctrl.Result{RequeueAfter: time.Duration(delta) * time.Second}, nil
}

switch *rayJobInstance.Spec.DeletionPolicy {
case rayv1.DeleteClusterDeletionPolicy:
logger.Info("Deleting RayCluster", "RayCluster", rayJobInstance.Status.RayClusterName)
_, err = r.deleteClusterResources(ctx, rayJobInstance)
case rayv1.DeleteWorkersDeletionPolicy:
logger.Info("Scaling all worker replicas to 0", "RayCluster", rayJobInstance.Status.RayClusterName)
_, err = r.scaleWorkerReplicasToZero(ctx, rayJobInstance)
case rayv1.DeleteSelfDeltionPolicy:
logger.Info("Deleting RayJob")
err = r.Client.Delete(ctx, rayJobInstance)
default:
}
if err != nil {
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
}
}

if rayJobInstance.Spec.DeletionPolicy == nil && rayJobInstance.Spec.ShutdownAfterJobFinishes && len(rayJobInstance.Spec.ClusterSelector) == 0 {
logger.Info(
"RayJob deployment status",
"jobDeploymentStatus", rayJobInstance.Status.JobDeploymentStatus,
Expand All @@ -377,6 +414,7 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
}
}

// If the RayJob is completed, we should not requeue it.
return ctrl.Result{}, nil
default:
Expand Down Expand Up @@ -616,6 +654,31 @@ func (r *RayJobReconciler) deleteClusterResources(ctx context.Context, rayJobIns
return isClusterDeleted, nil
}

func (r *RayJobReconciler) scaleWorkerReplicasToZero(ctx context.Context, rayJobInstance *rayv1.RayJob) (bool, error) {
logger := ctrl.LoggerFrom(ctx)
clusterIdentifier := common.RayJobRayClusterNamespacedName(rayJobInstance)

cluster := rayv1.RayCluster{}
if err := r.Get(ctx, clusterIdentifier, &cluster); err != nil {
return false, err
}

for i := range cluster.Spec.WorkerGroupSpecs {
cluster.Spec.WorkerGroupSpecs[i].Replicas = ptr.To[int32](0)
cluster.Spec.WorkerGroupSpecs[i].MinReplicas = ptr.To[int32](0)
}

if err := r.Update(ctx, &cluster); err != nil {
r.Recorder.Eventf(rayJobInstance, corev1.EventTypeWarning, string(utils.FailedToUpdateRayCluster), "Failed to update cluster %s/%s: %v", cluster.Namespace, cluster.Name, err)
return false, err
}

logger.Info("All worker groups for RayCluster has been scaled to 0", "RayCluster", clusterIdentifier)
r.Recorder.Eventf(rayJobInstance, corev1.EventTypeNormal, string(utils.UpdatedRayCluster), "Updated cluster %s/%s", cluster.Namespace, cluster.Name)

return true, nil
}

// SetupWithManager sets up the controller with the Manager.
func (r *RayJobReconciler) SetupWithManager(mgr ctrl.Manager, reconcileConcurrency int) error {
return ctrl.NewControllerManagedBy(mgr).
Expand Down
2 changes: 2 additions & 0 deletions ray-operator/controllers/ray/utils/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,9 +261,11 @@ const (
FailedToCreateRayJobSubmitter K8sEventType = "FailedToCreateRayJobSubmitter"
FailedToDeleteRayJobSubmitter K8sEventType = "FailedToDeleteRayJobSubmitter"
CreatedRayCluster K8sEventType = "CreatedRayCluster"
UpdatedRayCluster K8sEventType = "UpdatedRayCluster"
DeletedRayCluster K8sEventType = "DeletedRayCluster"
FailedToCreateRayCluster K8sEventType = "FailedToCreateRayCluster"
FailedToDeleteRayCluster K8sEventType = "FailedToDeleteRayCluster"
FailedToUpdateRayCluster K8sEventType = "FailedToUpdateRayCluster"

// RayService event list
InvalidRayServiceSpec K8sEventType = "InvalidRayServiceSpec"
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions ray-operator/pkg/features/features.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,13 @@ const (
//
// Enables new conditions in RayCluster status
RayClusterStatusConditions featuregate.Feature = "RayClusterStatusConditions"

// owner: @andrewsykim
// rep: N/A
// alpha: v1.3
//
// Enables new deletion policy API in RayJob
RayJobDeletionPolicy featuregate.Feature = "RayJobDeletionPolicy"
)

func init() {
Expand All @@ -25,6 +32,7 @@ func init() {

var defaultFeatureGates = map[featuregate.Feature]featuregate.FeatureSpec{
RayClusterStatusConditions: {Default: true, PreRelease: featuregate.Beta},
RayJobDeletionPolicy: {Default: false, PreRelease: featuregate.Alpha},
}

// SetFeatureGateDuringTest is a helper method to override feature gates in tests.
Expand Down

0 comments on commit 08bdbfb

Please sign in to comment.