Skip to content

Commit

Permalink
[STRMHELP-315] Rollback on Failed Job Monitoring 🐛 (#291)
Browse files Browse the repository at this point in the history
## overview
In the [job monitoring
PR](#284) we introduced a
bug such that when the job monitoring fails due to timeout or a failed
vertex, the state DeployFailed is reached instead of attempting to
rollback. This simplifies the logic of submitting job and job monitoring
as well as results in the job attempting to roll back

## additional info
Errors returned by a state in the state machine are added to the status
as the last error. The shouldRollback at the beginning of these states
checks to see if it is retryable and moves to rolling back if not. Thus,
the change made is to return an error if monitoring results in a failed
vertex or vertex timeout
  • Loading branch information
sethsaperstein-lyft authored May 17, 2023
1 parent db5a5de commit bea4e54
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 61 deletions.
94 changes: 41 additions & 53 deletions pkg/controller/flinkapplication/flink_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/lyft/flinkk8soperator/pkg/controller/flink"
"github.com/lyft/flinkk8soperator/pkg/controller/flink/client"
"github.com/lyft/flinkk8soperator/pkg/controller/k8"
config2 "github.com/lyft/flytestdlib/config"
"github.com/lyft/flytestdlib/logger"
"github.com/lyft/flytestdlib/promutils"
"github.com/lyft/flytestdlib/promutils/labeled"
Expand Down Expand Up @@ -767,47 +768,38 @@ func (s *FlinkStateMachine) handleSubmittingJob(ctx context.Context, app *v1beta
if job == nil {
return statusUnchanged, errors.Errorf("Could not find job %s", s.flinkController.GetLatestJobID(ctx, app))
}

jobStartTime := getJobStartTimeInUTC(job.StartTime)
now := time.Now().UTC()
cfg := config.GetConfig()
logger.Info(ctx, "Job vertex timeout config is ", cfg.FlinkJobVertexTimeout)
flinkJobVertexTimeout := cfg.FlinkJobVertexTimeout
if now.Before(jobStartTime.Add(flinkJobVertexTimeout.Duration)) {
allVerticesRunning, hasFailure, failedVertexIndex := monitorAllVerticesState(job)

// fail fast
if hasFailure {
s.flinkController.LogEvent(ctx, app, corev1.EventTypeWarning, "JobRunningFailed",
fmt.Sprintf(
"Vertex %d with name [%s] state is Failed", failedVertexIndex, job.Vertices[failedVertexIndex].Name))
return s.deployFailed(app)
}
return updateJobAndReturn(ctx, job, s, allVerticesRunning, app, hash)
logger.Info(ctx, "Monitoring job vertices with timeout ", flinkJobVertexTimeout)
jobStarted, err := monitorJobSubmission(job, flinkJobVertexTimeout)
if err != nil {
logger.Info(ctx, "Job monitoring failed with error: %v", err)
s.flinkController.LogEvent(ctx, app, corev1.EventTypeWarning, "JobMonitoringFailed", err.Error())
return statusUnchanged, err
}

s.flinkController.LogEvent(ctx, app, corev1.EventTypeWarning, "JobRunningFailed",
fmt.Sprintf(
"Not all vertice of the Flink job state is Running before timeout %f minutes", cfg.FlinkJobVertexTimeout.Minutes()))
return s.deployFailed(app)

if jobStarted {
return updateJobAndReturn(ctx, s, app, hash)
}
return statusUnchanged, err
}

func monitorAllVerticesState(job *client.FlinkJobOverview) (bool, bool, int) {
allVerticesRunning := true
// wait until all vertices have been scheduled and running
hasFailure := false
failedVertexIndex := -1
func monitorJobSubmission(job *client.FlinkJobOverview, timeout config2.Duration) (bool, error) {
jobStartTime := getJobStartTimeInUTC(job.StartTime)
now := time.Now().UTC()
shouldContinueMonitoring := now.Before(jobStartTime.Add(timeout.Duration))
if !shouldContinueMonitoring {
return false, errors.Errorf("not all vertices of the Flink job state is Running before timeout %f minutes", timeout.Minutes())
}

for index, v := range job.Vertices {
if v.Status == client.Failed {
failedVertexIndex = index
hasFailure = true
allVerticesRunning = false
break
return false, errors.Errorf("vertex %d with name [%s] state is Failed", index, job.Vertices[index].Name)
}
if v.Status != client.Running {
return false, nil
}
allVerticesRunning = allVerticesRunning && (v.StartTime > 0) && v.Status == client.Running
}
return allVerticesRunning, hasFailure, failedVertexIndex
return true, nil
}

func getJobStartTimeInUTC(startTime int64) time.Time {
Expand All @@ -816,30 +808,26 @@ func getJobStartTimeInUTC(startTime int64) time.Time {
return time.Unix(jobStartTimeSec, jobStartTimeNSec).UTC()
}

func updateJobAndReturn(ctx context.Context, job *client.FlinkJobOverview, s *FlinkStateMachine, allVerticesRunning bool,
func updateJobAndReturn(ctx context.Context, s *FlinkStateMachine,
app *v1beta1.FlinkApplication, hash string) (bool, error) {
if job.State == client.Running && allVerticesRunning {
// Update job status
logger.Info(ctx, "Job and all vertices states are RUNNING.")
jobStatus := s.flinkController.GetLatestJobStatus(ctx, app)
jobStatus.JarName = app.Spec.JarName
jobStatus.Parallelism = app.Spec.Parallelism
jobStatus.EntryClass = app.Spec.EntryClass
jobStatus.ProgramArgs = app.Spec.ProgramArgs
jobStatus.AllowNonRestoredState = app.Spec.AllowNonRestoredState
s.flinkController.UpdateLatestJobStatus(ctx, app, jobStatus)
// Update the application status with the running job info
app.Status.SavepointPath = ""
app.Status.SavepointTriggerID = ""
if v1beta1.IsBlueGreenDeploymentMode(app.Status.DeploymentMode) && app.Status.DeployHash != "" {
s.updateApplicationPhase(app, v1beta1.FlinkApplicationDualRunning)
return statusChanged, nil
}
app.Status.DeployHash = hash
s.updateApplicationPhase(app, v1beta1.FlinkApplicationRunning)
// Update job status
jobStatus := s.flinkController.GetLatestJobStatus(ctx, app)
jobStatus.JarName = app.Spec.JarName
jobStatus.Parallelism = app.Spec.Parallelism
jobStatus.EntryClass = app.Spec.EntryClass
jobStatus.ProgramArgs = app.Spec.ProgramArgs
jobStatus.AllowNonRestoredState = app.Spec.AllowNonRestoredState
s.flinkController.UpdateLatestJobStatus(ctx, app, jobStatus)
// Update the application status with the running job info
app.Status.SavepointPath = ""
app.Status.SavepointTriggerID = ""
if v1beta1.IsBlueGreenDeploymentMode(app.Status.DeploymentMode) && app.Status.DeployHash != "" {
s.updateApplicationPhase(app, v1beta1.FlinkApplicationDualRunning)
return statusChanged, nil
}
return statusUnchanged, nil
app.Status.DeployHash = hash
s.updateApplicationPhase(app, v1beta1.FlinkApplicationRunning)
return statusChanged, nil
}

// Something has gone wrong during the update, post job-cancellation (and cluster tear-down in single mode). We need
Expand Down
26 changes: 18 additions & 8 deletions pkg/controller/flinkapplication/flink_state_machine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -756,12 +756,15 @@ func TestSubmittingVertexFailsToStart(t *testing.T) {

statusUpdateCount := 0
mockK8Cluster.UpdateStatusFunc = func(ctx context.Context, object runtime.Object) error {
if statusUpdateCount == 0 {
if statusUpdateCount == 1 {
application := object.(*v1beta1.FlinkApplication)
assert.Equal(t, jobID, mockFlinkController.GetLatestJobID(ctx, application))
} else if statusUpdateCount == 1 {
} else if statusUpdateCount == 2 {
application := object.(*v1beta1.FlinkApplication)
assert.Equal(t, v1beta1.FlinkApplicationDeployFailed, application.Status.Phase)
assert.Equal(t, "", mockFlinkController.GetLatestJobID(ctx, application))
} else if statusUpdateCount == 3 {
application := object.(*v1beta1.FlinkApplication)
assert.Equal(t, v1beta1.FlinkApplicationRollingBackJob, application.Status.Phase)
}
statusUpdateCount++
return nil
Expand All @@ -770,11 +773,13 @@ func TestSubmittingVertexFailsToStart(t *testing.T) {
err := stateMachineForTest.Handle(context.Background(), &app)
assert.Nil(t, err)
err = stateMachineForTest.Handle(context.Background(), &app)
assert.Error(t, err, "vertex 1 with name [Vertex 2] state is Failed")
err = stateMachineForTest.Handle(context.Background(), &app)
assert.Nil(t, err)

assert.Equal(t, 1, startCount)
assert.Equal(t, 3, updateCount)
assert.Equal(t, 2, statusUpdateCount)
assert.Equal(t, 3, statusUpdateCount)
}

func TestSubmittingVertexStartTimeout(t *testing.T) {
Expand Down Expand Up @@ -917,12 +922,15 @@ func TestSubmittingVertexStartTimeout(t *testing.T) {

statusUpdateCount := 0
mockK8Cluster.UpdateStatusFunc = func(ctx context.Context, object runtime.Object) error {
if statusUpdateCount == 0 {
if statusUpdateCount == 1 {
application := object.(*v1beta1.FlinkApplication)
assert.Equal(t, jobID, mockFlinkController.GetLatestJobID(ctx, application))
} else if statusUpdateCount == 1 {
} else if statusUpdateCount == 2 {
application := object.(*v1beta1.FlinkApplication)
assert.Equal(t, v1beta1.FlinkApplicationDeployFailed, application.Status.Phase)
assert.Equal(t, "", mockFlinkController.GetLatestJobID(ctx, application))
} else if statusUpdateCount == 3 {
application := object.(*v1beta1.FlinkApplication)
assert.Equal(t, v1beta1.FlinkApplicationRollingBackJob, application.Status.Phase)
}
statusUpdateCount++
return nil
Expand All @@ -931,11 +939,13 @@ func TestSubmittingVertexStartTimeout(t *testing.T) {
err := stateMachineForTest.Handle(context.Background(), &app)
assert.Nil(t, err)
err = stateMachineForTest.Handle(context.Background(), &app)
assert.Error(t, err, "Expected nil, but got: not all vertices of the Flink job state is Running before timeout 3.000000 minutes")
err = stateMachineForTest.Handle(context.Background(), &app)
assert.Nil(t, err)

assert.Equal(t, 1, startCount)
assert.Equal(t, 3, updateCount)
assert.Equal(t, 2, statusUpdateCount)
assert.Equal(t, 3, statusUpdateCount)
}

func TestHandleNilDeployments(t *testing.T) {
Expand Down

0 comments on commit bea4e54

Please sign in to comment.