diff --git a/pkg/controller/flinkapplication/flink_state_machine.go b/pkg/controller/flinkapplication/flink_state_machine.go index 1ad395a4..165a1ccb 100644 --- a/pkg/controller/flinkapplication/flink_state_machine.go +++ b/pkg/controller/flinkapplication/flink_state_machine.go @@ -786,7 +786,6 @@ func (s *FlinkStateMachine) handleSubmittingJob(ctx context.Context, app *v1beta s.flinkController.LogEvent(ctx, app, corev1.EventTypeWarning, "JobMonitoringFailed", err.Error()) s.updateApplicationPhase(app, v1beta1.FlinkApplicationRollingBackJob) return statusChanged, err - //return statusUnchanged, err } if jobStarted { return updateJobAndReturn(ctx, s, app, hash) diff --git a/pkg/controller/flinkapplication/flink_state_machine_test.go b/pkg/controller/flinkapplication/flink_state_machine_test.go index a0ffdd45..3aa36eaf 100644 --- a/pkg/controller/flinkapplication/flink_state_machine_test.go +++ b/pkg/controller/flinkapplication/flink_state_machine_test.go @@ -760,13 +760,15 @@ func TestSubmittingVertexFailsToStart(t *testing.T) { if statusUpdateCount == 1 { application := object.(*v1beta1.FlinkApplication) assert.Equal(t, jobID, mockFlinkController.GetLatestJobID(ctx, application)) + assert.Equal(t, v1beta1.FlinkApplicationRollingBackJob, application.Status.Phase) } else if statusUpdateCount == 2 { application := object.(*v1beta1.FlinkApplication) assert.Equal(t, jobID, mockFlinkController.GetLatestJobID(ctx, application)) assert.Equal(t, v1beta1.FlinkApplicationDeployFailed, application.Status.Phase) } else if statusUpdateCount == 3 { application := object.(*v1beta1.FlinkApplication) - assert.Equal(t, v1beta1.FlinkApplicationRollingBackJob, application.Status.Phase) + assert.Equal(t, v1beta1.FlinkApplicationDeployFailed, application.Status.Phase) + assert.Equal(t, jobID, mockFlinkController.GetLatestJobID(ctx, application)) } statusUpdateCount++ return nil @@ -927,6 +929,7 @@ func TestSubmittingVertexStartTimeout(t *testing.T) { if statusUpdateCount == 1 { application := object.(*v1beta1.FlinkApplication) assert.Equal(t, jobID, mockFlinkController.GetLatestJobID(ctx, application)) + assert.Equal(t, v1beta1.FlinkApplicationRollingBackJob, application.Status.Phase) } else if statusUpdateCount == 2 { application := object.(*v1beta1.FlinkApplication) assert.Equal(t, jobID, mockFlinkController.GetLatestJobID(ctx, application)) @@ -934,7 +937,7 @@ func TestSubmittingVertexStartTimeout(t *testing.T) { } else if statusUpdateCount == 3 { application := object.(*v1beta1.FlinkApplication) assert.Equal(t, "", mockFlinkController.GetLatestJobID(ctx, application)) - assert.Equal(t, v1beta1.FlinkApplicationRollingBackJob, application.Status.Phase) + assert.Equal(t, v1beta1.FlinkApplicationDeployFailed, application.Status.Phase) } statusUpdateCount++ return nil