From bea4e54c38ea9321f2c6e12b7de825d9b98b7d01 Mon Sep 17 00:00:00 2001 From: Seth Saperstein <99828679+sethsaperstein-lyft@users.noreply.github.com> Date: Wed, 17 May 2023 14:09:12 -0700 Subject: [PATCH] =?UTF-8?q?[STRMHELP-315]=20Rollback=20on=20Failed=20Job?= =?UTF-8?q?=20Monitoring=20=F0=9F=90=9B=20=20(#291)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## overview In the [job monitoring PR](https://github.com/lyft/flinkk8soperator/pull/284) we introduced a bug such that when the job monitoring fails due to timeout or a failed vertex, the state DeployFailed is reached instead of attempting to rollback. This simplifies the logic of submitting job and job monitoring as well as results in the job attempting to roll back ## additional info Errors returned by a state in the state machine are added to the status as the last error. The shouldRollback at the beginning of these states checks to see if it is retryable and moves to rolling back if not. Thus, the change made is to return an error if monitoring results in a failed vertex or vertex timeout --- .../flinkapplication/flink_state_machine.go | 94 ++++++++----------- .../flink_state_machine_test.go | 26 +++-- 2 files changed, 59 insertions(+), 61 deletions(-) diff --git a/pkg/controller/flinkapplication/flink_state_machine.go b/pkg/controller/flinkapplication/flink_state_machine.go index ae288bf7..d7f23555 100644 --- a/pkg/controller/flinkapplication/flink_state_machine.go +++ b/pkg/controller/flinkapplication/flink_state_machine.go @@ -19,6 +19,7 @@ import ( "github.com/lyft/flinkk8soperator/pkg/controller/flink" "github.com/lyft/flinkk8soperator/pkg/controller/flink/client" "github.com/lyft/flinkk8soperator/pkg/controller/k8" + config2 "github.com/lyft/flytestdlib/config" "github.com/lyft/flytestdlib/logger" "github.com/lyft/flytestdlib/promutils" "github.com/lyft/flytestdlib/promutils/labeled" @@ -767,47 +768,38 @@ func (s *FlinkStateMachine) handleSubmittingJob(ctx context.Context, app *v1beta if job == nil { return statusUnchanged, errors.Errorf("Could not find job %s", s.flinkController.GetLatestJobID(ctx, app)) } - - jobStartTime := getJobStartTimeInUTC(job.StartTime) - now := time.Now().UTC() cfg := config.GetConfig() - logger.Info(ctx, "Job vertex timeout config is ", cfg.FlinkJobVertexTimeout) flinkJobVertexTimeout := cfg.FlinkJobVertexTimeout - if now.Before(jobStartTime.Add(flinkJobVertexTimeout.Duration)) { - allVerticesRunning, hasFailure, failedVertexIndex := monitorAllVerticesState(job) - - // fail fast - if hasFailure { - s.flinkController.LogEvent(ctx, app, corev1.EventTypeWarning, "JobRunningFailed", - fmt.Sprintf( - "Vertex %d with name [%s] state is Failed", failedVertexIndex, job.Vertices[failedVertexIndex].Name)) - return s.deployFailed(app) - } - return updateJobAndReturn(ctx, job, s, allVerticesRunning, app, hash) + logger.Info(ctx, "Monitoring job vertices with timeout ", flinkJobVertexTimeout) + jobStarted, err := monitorJobSubmission(job, flinkJobVertexTimeout) + if err != nil { + logger.Info(ctx, "Job monitoring failed with error: %v", err) + s.flinkController.LogEvent(ctx, app, corev1.EventTypeWarning, "JobMonitoringFailed", err.Error()) + return statusUnchanged, err } - - s.flinkController.LogEvent(ctx, app, corev1.EventTypeWarning, "JobRunningFailed", - fmt.Sprintf( - "Not all vertice of the Flink job state is Running before timeout %f minutes", cfg.FlinkJobVertexTimeout.Minutes())) - return s.deployFailed(app) - + if jobStarted { + return updateJobAndReturn(ctx, s, app, hash) + } + return statusUnchanged, err } -func monitorAllVerticesState(job *client.FlinkJobOverview) (bool, bool, int) { - allVerticesRunning := true - // wait until all vertices have been scheduled and running - hasFailure := false - failedVertexIndex := -1 +func monitorJobSubmission(job *client.FlinkJobOverview, timeout config2.Duration) (bool, error) { + jobStartTime := getJobStartTimeInUTC(job.StartTime) + now := time.Now().UTC() + shouldContinueMonitoring := now.Before(jobStartTime.Add(timeout.Duration)) + if !shouldContinueMonitoring { + return false, errors.Errorf("not all vertices of the Flink job state is Running before timeout %f minutes", timeout.Minutes()) + } + for index, v := range job.Vertices { if v.Status == client.Failed { - failedVertexIndex = index - hasFailure = true - allVerticesRunning = false - break + return false, errors.Errorf("vertex %d with name [%s] state is Failed", index, job.Vertices[index].Name) + } + if v.Status != client.Running { + return false, nil } - allVerticesRunning = allVerticesRunning && (v.StartTime > 0) && v.Status == client.Running } - return allVerticesRunning, hasFailure, failedVertexIndex + return true, nil } func getJobStartTimeInUTC(startTime int64) time.Time { @@ -816,30 +808,26 @@ func getJobStartTimeInUTC(startTime int64) time.Time { return time.Unix(jobStartTimeSec, jobStartTimeNSec).UTC() } -func updateJobAndReturn(ctx context.Context, job *client.FlinkJobOverview, s *FlinkStateMachine, allVerticesRunning bool, +func updateJobAndReturn(ctx context.Context, s *FlinkStateMachine, app *v1beta1.FlinkApplication, hash string) (bool, error) { - if job.State == client.Running && allVerticesRunning { - // Update job status - logger.Info(ctx, "Job and all vertices states are RUNNING.") - jobStatus := s.flinkController.GetLatestJobStatus(ctx, app) - jobStatus.JarName = app.Spec.JarName - jobStatus.Parallelism = app.Spec.Parallelism - jobStatus.EntryClass = app.Spec.EntryClass - jobStatus.ProgramArgs = app.Spec.ProgramArgs - jobStatus.AllowNonRestoredState = app.Spec.AllowNonRestoredState - s.flinkController.UpdateLatestJobStatus(ctx, app, jobStatus) - // Update the application status with the running job info - app.Status.SavepointPath = "" - app.Status.SavepointTriggerID = "" - if v1beta1.IsBlueGreenDeploymentMode(app.Status.DeploymentMode) && app.Status.DeployHash != "" { - s.updateApplicationPhase(app, v1beta1.FlinkApplicationDualRunning) - return statusChanged, nil - } - app.Status.DeployHash = hash - s.updateApplicationPhase(app, v1beta1.FlinkApplicationRunning) + // Update job status + jobStatus := s.flinkController.GetLatestJobStatus(ctx, app) + jobStatus.JarName = app.Spec.JarName + jobStatus.Parallelism = app.Spec.Parallelism + jobStatus.EntryClass = app.Spec.EntryClass + jobStatus.ProgramArgs = app.Spec.ProgramArgs + jobStatus.AllowNonRestoredState = app.Spec.AllowNonRestoredState + s.flinkController.UpdateLatestJobStatus(ctx, app, jobStatus) + // Update the application status with the running job info + app.Status.SavepointPath = "" + app.Status.SavepointTriggerID = "" + if v1beta1.IsBlueGreenDeploymentMode(app.Status.DeploymentMode) && app.Status.DeployHash != "" { + s.updateApplicationPhase(app, v1beta1.FlinkApplicationDualRunning) return statusChanged, nil } - return statusUnchanged, nil + app.Status.DeployHash = hash + s.updateApplicationPhase(app, v1beta1.FlinkApplicationRunning) + return statusChanged, nil } // Something has gone wrong during the update, post job-cancellation (and cluster tear-down in single mode). We need diff --git a/pkg/controller/flinkapplication/flink_state_machine_test.go b/pkg/controller/flinkapplication/flink_state_machine_test.go index 500609be..3912f631 100644 --- a/pkg/controller/flinkapplication/flink_state_machine_test.go +++ b/pkg/controller/flinkapplication/flink_state_machine_test.go @@ -756,12 +756,15 @@ func TestSubmittingVertexFailsToStart(t *testing.T) { statusUpdateCount := 0 mockK8Cluster.UpdateStatusFunc = func(ctx context.Context, object runtime.Object) error { - if statusUpdateCount == 0 { + if statusUpdateCount == 1 { application := object.(*v1beta1.FlinkApplication) assert.Equal(t, jobID, mockFlinkController.GetLatestJobID(ctx, application)) - } else if statusUpdateCount == 1 { + } else if statusUpdateCount == 2 { application := object.(*v1beta1.FlinkApplication) - assert.Equal(t, v1beta1.FlinkApplicationDeployFailed, application.Status.Phase) + assert.Equal(t, "", mockFlinkController.GetLatestJobID(ctx, application)) + } else if statusUpdateCount == 3 { + application := object.(*v1beta1.FlinkApplication) + assert.Equal(t, v1beta1.FlinkApplicationRollingBackJob, application.Status.Phase) } statusUpdateCount++ return nil @@ -770,11 +773,13 @@ func TestSubmittingVertexFailsToStart(t *testing.T) { err := stateMachineForTest.Handle(context.Background(), &app) assert.Nil(t, err) err = stateMachineForTest.Handle(context.Background(), &app) + assert.Error(t, err, "vertex 1 with name [Vertex 2] state is Failed") + err = stateMachineForTest.Handle(context.Background(), &app) assert.Nil(t, err) assert.Equal(t, 1, startCount) assert.Equal(t, 3, updateCount) - assert.Equal(t, 2, statusUpdateCount) + assert.Equal(t, 3, statusUpdateCount) } func TestSubmittingVertexStartTimeout(t *testing.T) { @@ -917,12 +922,15 @@ func TestSubmittingVertexStartTimeout(t *testing.T) { statusUpdateCount := 0 mockK8Cluster.UpdateStatusFunc = func(ctx context.Context, object runtime.Object) error { - if statusUpdateCount == 0 { + if statusUpdateCount == 1 { application := object.(*v1beta1.FlinkApplication) assert.Equal(t, jobID, mockFlinkController.GetLatestJobID(ctx, application)) - } else if statusUpdateCount == 1 { + } else if statusUpdateCount == 2 { application := object.(*v1beta1.FlinkApplication) - assert.Equal(t, v1beta1.FlinkApplicationDeployFailed, application.Status.Phase) + assert.Equal(t, "", mockFlinkController.GetLatestJobID(ctx, application)) + } else if statusUpdateCount == 3 { + application := object.(*v1beta1.FlinkApplication) + assert.Equal(t, v1beta1.FlinkApplicationRollingBackJob, application.Status.Phase) } statusUpdateCount++ return nil @@ -931,11 +939,13 @@ func TestSubmittingVertexStartTimeout(t *testing.T) { err := stateMachineForTest.Handle(context.Background(), &app) assert.Nil(t, err) err = stateMachineForTest.Handle(context.Background(), &app) + assert.Error(t, err, "Expected nil, but got: not all vertices of the Flink job state is Running before timeout 3.000000 minutes") + err = stateMachineForTest.Handle(context.Background(), &app) assert.Nil(t, err) assert.Equal(t, 1, startCount) assert.Equal(t, 3, updateCount) - assert.Equal(t, 2, statusUpdateCount) + assert.Equal(t, 3, statusUpdateCount) } func TestHandleNilDeployments(t *testing.T) {