diff --git a/pkg/controller/flinkapplication/flink_state_machine.go b/pkg/controller/flinkapplication/flink_state_machine.go index ae288bf7..d7f23555 100644 --- a/pkg/controller/flinkapplication/flink_state_machine.go +++ b/pkg/controller/flinkapplication/flink_state_machine.go @@ -19,6 +19,7 @@ import ( "github.com/lyft/flinkk8soperator/pkg/controller/flink" "github.com/lyft/flinkk8soperator/pkg/controller/flink/client" "github.com/lyft/flinkk8soperator/pkg/controller/k8" + config2 "github.com/lyft/flytestdlib/config" "github.com/lyft/flytestdlib/logger" "github.com/lyft/flytestdlib/promutils" "github.com/lyft/flytestdlib/promutils/labeled" @@ -767,47 +768,38 @@ func (s *FlinkStateMachine) handleSubmittingJob(ctx context.Context, app *v1beta if job == nil { return statusUnchanged, errors.Errorf("Could not find job %s", s.flinkController.GetLatestJobID(ctx, app)) } - - jobStartTime := getJobStartTimeInUTC(job.StartTime) - now := time.Now().UTC() cfg := config.GetConfig() - logger.Info(ctx, "Job vertex timeout config is ", cfg.FlinkJobVertexTimeout) flinkJobVertexTimeout := cfg.FlinkJobVertexTimeout - if now.Before(jobStartTime.Add(flinkJobVertexTimeout.Duration)) { - allVerticesRunning, hasFailure, failedVertexIndex := monitorAllVerticesState(job) - - // fail fast - if hasFailure { - s.flinkController.LogEvent(ctx, app, corev1.EventTypeWarning, "JobRunningFailed", - fmt.Sprintf( - "Vertex %d with name [%s] state is Failed", failedVertexIndex, job.Vertices[failedVertexIndex].Name)) - return s.deployFailed(app) - } - return updateJobAndReturn(ctx, job, s, allVerticesRunning, app, hash) + logger.Info(ctx, "Monitoring job vertices with timeout ", flinkJobVertexTimeout) + jobStarted, err := monitorJobSubmission(job, flinkJobVertexTimeout) + if err != nil { + logger.Info(ctx, "Job monitoring failed with error: %v", err) + s.flinkController.LogEvent(ctx, app, corev1.EventTypeWarning, "JobMonitoringFailed", err.Error()) + return statusUnchanged, err } - - s.flinkController.LogEvent(ctx, app, corev1.EventTypeWarning, "JobRunningFailed", - fmt.Sprintf( - "Not all vertice of the Flink job state is Running before timeout %f minutes", cfg.FlinkJobVertexTimeout.Minutes())) - return s.deployFailed(app) - + if jobStarted { + return updateJobAndReturn(ctx, s, app, hash) + } + return statusUnchanged, err } -func monitorAllVerticesState(job *client.FlinkJobOverview) (bool, bool, int) { - allVerticesRunning := true - // wait until all vertices have been scheduled and running - hasFailure := false - failedVertexIndex := -1 +func monitorJobSubmission(job *client.FlinkJobOverview, timeout config2.Duration) (bool, error) { + jobStartTime := getJobStartTimeInUTC(job.StartTime) + now := time.Now().UTC() + shouldContinueMonitoring := now.Before(jobStartTime.Add(timeout.Duration)) + if !shouldContinueMonitoring { + return false, errors.Errorf("not all vertices of the Flink job state is Running before timeout %f minutes", timeout.Minutes()) + } + for index, v := range job.Vertices { if v.Status == client.Failed { - failedVertexIndex = index - hasFailure = true - allVerticesRunning = false - break + return false, errors.Errorf("vertex %d with name [%s] state is Failed", index, job.Vertices[index].Name) + } + if v.Status != client.Running { + return false, nil } - allVerticesRunning = allVerticesRunning && (v.StartTime > 0) && v.Status == client.Running } - return allVerticesRunning, hasFailure, failedVertexIndex + return true, nil } func getJobStartTimeInUTC(startTime int64) time.Time { @@ -816,30 +808,26 @@ func getJobStartTimeInUTC(startTime int64) time.Time { return time.Unix(jobStartTimeSec, jobStartTimeNSec).UTC() } -func updateJobAndReturn(ctx context.Context, job *client.FlinkJobOverview, s *FlinkStateMachine, allVerticesRunning bool, +func updateJobAndReturn(ctx context.Context, s *FlinkStateMachine, app *v1beta1.FlinkApplication, hash string) (bool, error) { - if job.State == client.Running && allVerticesRunning { - // Update job status - logger.Info(ctx, "Job and all vertices states are RUNNING.") - jobStatus := s.flinkController.GetLatestJobStatus(ctx, app) - jobStatus.JarName = app.Spec.JarName - jobStatus.Parallelism = app.Spec.Parallelism - jobStatus.EntryClass = app.Spec.EntryClass - jobStatus.ProgramArgs = app.Spec.ProgramArgs - jobStatus.AllowNonRestoredState = app.Spec.AllowNonRestoredState - s.flinkController.UpdateLatestJobStatus(ctx, app, jobStatus) - // Update the application status with the running job info - app.Status.SavepointPath = "" - app.Status.SavepointTriggerID = "" - if v1beta1.IsBlueGreenDeploymentMode(app.Status.DeploymentMode) && app.Status.DeployHash != "" { - s.updateApplicationPhase(app, v1beta1.FlinkApplicationDualRunning) - return statusChanged, nil - } - app.Status.DeployHash = hash - s.updateApplicationPhase(app, v1beta1.FlinkApplicationRunning) + // Update job status + jobStatus := s.flinkController.GetLatestJobStatus(ctx, app) + jobStatus.JarName = app.Spec.JarName + jobStatus.Parallelism = app.Spec.Parallelism + jobStatus.EntryClass = app.Spec.EntryClass + jobStatus.ProgramArgs = app.Spec.ProgramArgs + jobStatus.AllowNonRestoredState = app.Spec.AllowNonRestoredState + s.flinkController.UpdateLatestJobStatus(ctx, app, jobStatus) + // Update the application status with the running job info + app.Status.SavepointPath = "" + app.Status.SavepointTriggerID = "" + if v1beta1.IsBlueGreenDeploymentMode(app.Status.DeploymentMode) && app.Status.DeployHash != "" { + s.updateApplicationPhase(app, v1beta1.FlinkApplicationDualRunning) return statusChanged, nil } - return statusUnchanged, nil + app.Status.DeployHash = hash + s.updateApplicationPhase(app, v1beta1.FlinkApplicationRunning) + return statusChanged, nil } // Something has gone wrong during the update, post job-cancellation (and cluster tear-down in single mode). We need diff --git a/pkg/controller/flinkapplication/flink_state_machine_test.go b/pkg/controller/flinkapplication/flink_state_machine_test.go index 500609be..3912f631 100644 --- a/pkg/controller/flinkapplication/flink_state_machine_test.go +++ b/pkg/controller/flinkapplication/flink_state_machine_test.go @@ -756,12 +756,15 @@ func TestSubmittingVertexFailsToStart(t *testing.T) { statusUpdateCount := 0 mockK8Cluster.UpdateStatusFunc = func(ctx context.Context, object runtime.Object) error { - if statusUpdateCount == 0 { + if statusUpdateCount == 1 { application := object.(*v1beta1.FlinkApplication) assert.Equal(t, jobID, mockFlinkController.GetLatestJobID(ctx, application)) - } else if statusUpdateCount == 1 { + } else if statusUpdateCount == 2 { application := object.(*v1beta1.FlinkApplication) - assert.Equal(t, v1beta1.FlinkApplicationDeployFailed, application.Status.Phase) + assert.Equal(t, "", mockFlinkController.GetLatestJobID(ctx, application)) + } else if statusUpdateCount == 3 { + application := object.(*v1beta1.FlinkApplication) + assert.Equal(t, v1beta1.FlinkApplicationRollingBackJob, application.Status.Phase) } statusUpdateCount++ return nil @@ -770,11 +773,13 @@ func TestSubmittingVertexFailsToStart(t *testing.T) { err := stateMachineForTest.Handle(context.Background(), &app) assert.Nil(t, err) err = stateMachineForTest.Handle(context.Background(), &app) + assert.Error(t, err, "vertex 1 with name [Vertex 2] state is Failed") + err = stateMachineForTest.Handle(context.Background(), &app) assert.Nil(t, err) assert.Equal(t, 1, startCount) assert.Equal(t, 3, updateCount) - assert.Equal(t, 2, statusUpdateCount) + assert.Equal(t, 3, statusUpdateCount) } func TestSubmittingVertexStartTimeout(t *testing.T) { @@ -917,12 +922,15 @@ func TestSubmittingVertexStartTimeout(t *testing.T) { statusUpdateCount := 0 mockK8Cluster.UpdateStatusFunc = func(ctx context.Context, object runtime.Object) error { - if statusUpdateCount == 0 { + if statusUpdateCount == 1 { application := object.(*v1beta1.FlinkApplication) assert.Equal(t, jobID, mockFlinkController.GetLatestJobID(ctx, application)) - } else if statusUpdateCount == 1 { + } else if statusUpdateCount == 2 { application := object.(*v1beta1.FlinkApplication) - assert.Equal(t, v1beta1.FlinkApplicationDeployFailed, application.Status.Phase) + assert.Equal(t, "", mockFlinkController.GetLatestJobID(ctx, application)) + } else if statusUpdateCount == 3 { + application := object.(*v1beta1.FlinkApplication) + assert.Equal(t, v1beta1.FlinkApplicationRollingBackJob, application.Status.Phase) } statusUpdateCount++ return nil @@ -931,11 +939,13 @@ func TestSubmittingVertexStartTimeout(t *testing.T) { err := stateMachineForTest.Handle(context.Background(), &app) assert.Nil(t, err) err = stateMachineForTest.Handle(context.Background(), &app) + assert.Error(t, err, "Expected nil, but got: not all vertices of the Flink job state is Running before timeout 3.000000 minutes") + err = stateMachineForTest.Handle(context.Background(), &app) assert.Nil(t, err) assert.Equal(t, 1, startCount) assert.Equal(t, 3, updateCount) - assert.Equal(t, 2, statusUpdateCount) + assert.Equal(t, 3, statusUpdateCount) } func TestHandleNilDeployments(t *testing.T) {