Skip to content

Commit

Permalink
[RayService][refactor] Remove updateState (#2705)
Browse files Browse the repository at this point in the history
  • Loading branch information
kevin85421 authored Jan 2, 2025
1 parent a9beafb commit 39d1456
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 42 deletions.
10 changes: 3 additions & 7 deletions ray-operator/apis/ray/v1/rayservice_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,9 @@ import (
type ServiceStatus string

const (
FailedToGetOrCreateRayCluster ServiceStatus = "FailedToGetOrCreateRayCluster"
WaitForServeDeploymentReady ServiceStatus = "WaitForServeDeploymentReady"
FailedToGetServeDeploymentStatus ServiceStatus = "FailedToGetServeDeploymentStatus"
Running ServiceStatus = "Running"
Restarting ServiceStatus = "Restarting"
FailedToUpdateServingPodLabel ServiceStatus = "FailedToUpdateServingPodLabel"
FailedToUpdateService ServiceStatus = "FailedToUpdateService"
WaitForServeDeploymentReady ServiceStatus = "WaitForServeDeploymentReady"
Running ServiceStatus = "Running"
Restarting ServiceStatus = "Restarting"
)

type RayServiceUpgradeStrategy string
Expand Down
11 changes: 3 additions & 8 deletions ray-operator/apis/ray/v1alpha1/rayservice_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,9 @@ import (
type ServiceStatus string

const (
FailedToGetOrCreateRayCluster ServiceStatus = "FailedToGetOrCreateRayCluster"
WaitForServeDeploymentReady ServiceStatus = "WaitForServeDeploymentReady"
FailedToGetServeDeploymentStatus ServiceStatus = "FailedToGetServeDeploymentStatus"
Running ServiceStatus = "Running"
Restarting ServiceStatus = "Restarting"
FailedToUpdateIngress ServiceStatus = "FailedToUpdateIngress"
FailedToUpdateServingPodLabel ServiceStatus = "FailedToUpdateServingPodLabel"
FailedToUpdateService ServiceStatus = "FailedToUpdateService"
WaitForServeDeploymentReady ServiceStatus = "WaitForServeDeploymentReady"
Running ServiceStatus = "Running"
Restarting ServiceStatus = "Restarting"
)

// These statuses should match Ray Serve's application statuses
Expand Down
37 changes: 10 additions & 27 deletions ray-operator/controllers/ray/rayservice_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
cmap "github.com/orcaman/concurrent-map/v2"

"github.com/go-logr/logr"
fmtErrors "github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
Expand Down Expand Up @@ -142,7 +141,6 @@ func (r *RayServiceReconciler) Reconcile(ctx context.Context, request ctrl.Reque
var activeRayClusterInstance *rayv1.RayCluster
var pendingRayClusterInstance *rayv1.RayCluster
if activeRayClusterInstance, pendingRayClusterInstance, err = r.reconcileRayCluster(ctx, rayServiceInstance); err != nil {
err = r.updateState(ctx, rayServiceInstance, rayv1.FailedToGetOrCreateRayCluster, err)
return ctrl.Result{RequeueAfter: ServiceDefaultRequeueDuration}, client.IgnoreNotFound(err)
}

Expand Down Expand Up @@ -215,15 +213,12 @@ func (r *RayServiceReconciler) Reconcile(ctx context.Context, request ctrl.Reque

if rayClusterInstance != nil {
if err := r.reconcileServices(ctx, rayServiceInstance, rayClusterInstance, utils.HeadService); err != nil {
err = r.updateState(ctx, rayServiceInstance, rayv1.FailedToUpdateService, err)
return ctrl.Result{RequeueAfter: ServiceDefaultRequeueDuration}, err
}
if err := r.labelHeadPodForServeStatus(ctx, rayClusterInstance, rayServiceInstance.Spec.ExcludeHeadPodFromServeSvc); err != nil {
err = r.updateState(ctx, rayServiceInstance, rayv1.FailedToUpdateServingPodLabel, err)
return ctrl.Result{RequeueAfter: ServiceDefaultRequeueDuration}, err
}
if err := r.reconcileServices(ctx, rayServiceInstance, rayClusterInstance, utils.ServingService); err != nil {
err = r.updateState(ctx, rayServiceInstance, rayv1.FailedToUpdateService, err)
return ctrl.Result{RequeueAfter: ServiceDefaultRequeueDuration}, err
}
}
Expand Down Expand Up @@ -395,15 +390,6 @@ func (r *RayServiceReconciler) getRayServiceInstance(ctx context.Context, reques
return rayServiceInstance, nil
}

func (r *RayServiceReconciler) updateState(ctx context.Context, rayServiceInstance *rayv1.RayService, status rayv1.ServiceStatus, err error) error {
rayServiceInstance.Status.ServiceStatus = status
if errStatus := r.Status().Update(ctx, rayServiceInstance); errStatus != nil {
return fmtErrors.Errorf("combined error: %v %v", err, errStatus)
}
r.Recorder.Event(rayServiceInstance, "Normal", string(status), err.Error())
return err
}

// reconcileRayCluster checks the active and pending ray cluster instances. It includes 3 parts.
// 1. It will decide whether to generate a pending cluster name.
// 2. It will delete the old pending ray cluster instance.
Expand Down Expand Up @@ -521,14 +507,13 @@ func (r *RayServiceReconciler) cleanUpRayClusterInstance(ctx context.Context, ra
}

func (r *RayServiceReconciler) getRayClusterByNamespacedName(ctx context.Context, clusterKey client.ObjectKey) (*rayv1.RayCluster, error) {
if clusterKey.Name == "" {
return nil, nil
}

rayCluster := &rayv1.RayCluster{}
if clusterKey.Name != "" {
// Ignore not found since in that case we should return RayCluster as nil.
if err := r.Get(ctx, clusterKey, rayCluster); client.IgnoreNotFound(err) != nil {
return nil, err
}
} else {
rayCluster = nil
if err := r.Get(ctx, clusterKey, rayCluster); client.IgnoreNotFound(err) != nil {
return nil, err
}

return rayCluster, nil
Expand All @@ -545,14 +530,14 @@ func (r *RayServiceReconciler) cleanUpServeConfigCache(ctx context.Context, rayS
if !exist {
return
}
serveConfigs := cacheValue.(cmap.ConcurrentMap[string, string])
clusterNameToServeConfig := cacheValue.(cmap.ConcurrentMap[string, string])

for key := range serveConfigs.Items() {
for key := range clusterNameToServeConfig.Items() {
if key == activeRayClusterName || key == pendingRayClusterName {
continue
}
logger.Info("cleanUpServeConfigCache", "activeRayClusterName", activeRayClusterName, "pendingRayClusterName", pendingRayClusterName, "remove key", key)
serveConfigs.Remove(key)
logger.Info("Remove stale serve application config", "remove key", key, "activeRayClusterName", activeRayClusterName, "pendingRayClusterName", pendingRayClusterName)
clusterNameToServeConfig.Remove(key)
}
}

Expand Down Expand Up @@ -1162,14 +1147,12 @@ func (r *RayServiceReconciler) reconcileServe(ctx context.Context, rayServiceIns
shouldUpdate := r.checkIfNeedSubmitServeDeployment(ctx, rayServiceInstance, rayClusterInstance, rayServiceStatus)
if shouldUpdate {
if err = r.updateServeDeployment(ctx, rayServiceInstance, rayDashboardClient, rayClusterInstance.Name); err != nil {
err = r.updateState(ctx, rayServiceInstance, rayv1.WaitForServeDeploymentReady, err)
return false, err
}
}

var isReady bool
if isReady, err = r.getAndCheckServeStatus(ctx, rayDashboardClient, rayServiceStatus); err != nil {
err = r.updateState(ctx, rayServiceInstance, rayv1.FailedToGetServeDeploymentStatus, err)
return false, err
}

Expand Down

0 comments on commit 39d1456

Please sign in to comment.