From c0af8a4ab9f4835e7335fa4cdfa3b70ac074db3a Mon Sep 17 00:00:00 2001 From: Quinn Klassen Date: Wed, 3 Jul 2024 13:21:07 -0700 Subject: [PATCH] Warn on unhandled updates (#1533) Warn on unhandled updates --- internal/internal_update.go | 23 ++-- internal/internal_workflow.go | 39 +++++- internal/protocol/registry.go | 2 +- internal/workflow.go | 20 +++ internal/workflow_testsuite_test.go | 197 ++++++++++++++++++++++++++++ workflow/workflow.go | 24 ++++ 6 files changed, 291 insertions(+), 14 deletions(-) diff --git a/internal/internal_update.go b/internal/internal_update.go index 52e4eb19e..4f98a41bf 100644 --- a/internal/internal_update.go +++ b/internal/internal_update.go @@ -119,9 +119,10 @@ type ( // for a given name. It offers the ability to invoke the associated // execution and validation functions. updateHandler struct { - fn interface{} - validateFn interface{} - name string + fn interface{} + validateFn interface{} + name string + unfinishedPolicy HandlerUnfinishedPolicy } ) @@ -274,10 +275,11 @@ func defaultUpdateHandler( priorityUpdateHandling := env.TryUse(SDKPriorityUpdateHandling) updateRunner := func(ctx Context) { - ctx = WithValue(ctx, updateInfoContextKey, &UpdateInfo{ + updateInfo := UpdateInfo{ ID: id, Name: name, - }) + } + ctx = WithValue(ctx, updateInfoContextKey, &updateInfo) eo := getWorkflowEnvOptions(ctx) if len(eo.updateHandlers) == 0 && !priorityUpdateHandling { @@ -303,6 +305,10 @@ func defaultUpdateHandler( return } input := UpdateInput{Name: name, Args: args} + eo.runningUpdatesHandles[id] = updateInfo + defer func() { + delete(eo.runningUpdatesHandles, id) + }() envInterceptor := getWorkflowEnvironmentInterceptor(ctx) if !IsReplaying(ctx) { @@ -362,9 +368,10 @@ func newUpdateHandler( validateFn = opts.Validator } return &updateHandler{ - fn: handler, - validateFn: validateFn, - name: updateName, + fn: handler, + validateFn: validateFn, + name: updateName, + unfinishedPolicy: opts.UnfinishedPolicy, }, nil } diff --git a/internal/internal_workflow.go b/internal/internal_workflow.go index 3f9bfbbee..e70724da2 100644 --- a/internal/internal_workflow.go +++ b/internal/internal_workflow.go @@ -52,6 +52,12 @@ const ( defaultCoroutineExitTimeout = 100 * time.Millisecond panicIllegalAccessCoroutineState = "getState: illegal access from outside of workflow context" + unhandledUpdateWarningMessage = "Workflow finished while update handlers are still running. This may have interrupted work that the" + + " update handler was doing, and the client that sent the update will receive a 'workflow execution" + + " already completed' RPCError instead of the update result. You can wait for all update" + + " handlers to complete by using `workflow.Await(ctx, func() bool { return workflow.AllHandlersFinished(ctx) })`. Alternatively, if both you and the clients sending the update" + + " are okay with interrupting running handlers when the workflow finishes, and causing clients to" + + " receive errors, then you can disable this warning via UnfinishedPolicy in UpdateHandlerOptions." ) type ( @@ -216,7 +222,9 @@ type ( signalChannels map[string]Channel queryHandlers map[string]*queryHandler updateHandlers map[string]*updateHandler - VersioningIntent VersioningIntent + // runningUpdatesHandles is a map of update handlers that are currently running. + runningUpdatesHandles map[string]UpdateInfo + VersioningIntent VersioningIntent } // ExecuteWorkflowParams parameters of the workflow invocation @@ -289,8 +297,6 @@ var _ Channel = (*channelImpl)(nil) var _ Selector = (*selectorImpl)(nil) var _ WaitGroup = (*waitGroupImpl)(nil) var _ dispatcher = (*dispatcherImpl)(nil) -var _ Mutex = (*mutexImpl)(nil) -var _ Semaphore = (*semaphoreImpl)(nil) // 1MB buffer to fit combined stack trace of all active goroutines var stackBuf [1024 * 1024]byte @@ -661,9 +667,27 @@ func executeDispatcher(ctx Context, dispatcher dispatcher, timeout time.Duration return } - us := getWorkflowEnvOptions(ctx).getUnhandledSignalNames() + weo := getWorkflowEnvOptions(ctx) + us := weo.getUnhandledSignalNames() if len(us) > 0 { - env.GetLogger().Info("Workflow has unhandled signals", "SignalNames", us) + env.GetLogger().Warn("Workflow has unhandled signals", "SignalNames", us) + } + // + type warnUpdate struct { + Name string `json:"name"` + ID string `json:"id"` + } + var updatesToWarn []warnUpdate + for _, info := range weo.getRunningUpdateHandles() { + if weo.updateHandlers[info.Name].unfinishedPolicy == HandlerUnfinishedPolicyWarnAndAbandon { + updatesToWarn = append(updatesToWarn, warnUpdate{ + Name: info.Name, + ID: info.ID, + }) + } + } + if len(updatesToWarn) > 0 { + env.GetLogger().Warn(unhandledUpdateWarningMessage, "Updates", updatesToWarn) } env.Complete(rp.workflowResult, rp.error) @@ -1487,6 +1511,7 @@ func setWorkflowEnvOptionsIfNotExist(ctx Context) Context { newOptions.signalChannels = make(map[string]Channel) newOptions.queryHandlers = make(map[string]*queryHandler) newOptions.updateHandlers = make(map[string]*updateHandler) + newOptions.runningUpdatesHandles = make(map[string]UpdateInfo) } if newOptions.DataConverter == nil { newOptions.DataConverter = converter.GetDefaultDataConverter() @@ -1542,6 +1567,10 @@ func (w *WorkflowOptions) getUnhandledSignalNames() []string { return unhandledSignals } +func (w *WorkflowOptions) getRunningUpdateHandles() map[string]UpdateInfo { + return w.runningUpdatesHandles +} + func (d *decodeFutureImpl) Get(ctx Context, valuePtr interface{}) error { more := d.futureImpl.channel.Receive(ctx, nil) if more { diff --git a/internal/protocol/registry.go b/internal/protocol/registry.go index 09f4cc2d9..acc680067 100644 --- a/internal/protocol/registry.go +++ b/internal/protocol/registry.go @@ -61,7 +61,7 @@ func (r *Registry) FindOrAdd(instID string, ctor func() Instance) Instance { return p } -// ClearCopmleted walks the registered protocols and removes those that have +// ClearCompleted walks the registered protocols and removes those that have // completed. func (r *Registry) ClearCompleted() { r.mut.Lock() diff --git a/internal/workflow.go b/internal/workflow.go index 85f869847..5eb13d9ef 100644 --- a/internal/workflow.go +++ b/internal/workflow.go @@ -45,6 +45,19 @@ import ( "go.temporal.io/sdk/log" ) +// HandlerUnfinishedPolicy actions taken if a workflow completes with running handlers. +// +// Policy defining actions taken when a workflow exits while update or signal handlers are running. +// The workflow exit may be due to successful return, failure, cancellation, or continue-as-new +type HandlerUnfinishedPolicy int + +const ( + // WarnAndAbandon issues a warning in addition to abandoning. + HandlerUnfinishedPolicyWarnAndAbandon HandlerUnfinishedPolicy = iota + // ABANDON abandons the handler. + HandlerUnfinishedPolicyAbandon +) + var ( errWorkflowIDNotSet = errors.New("workflowId is not set") errLocalActivityParamsBadRequest = errors.New("missing local activity parameters through context, check LocalActivityOptions") @@ -386,6 +399,9 @@ type ( // performing side-effects. A panic from this function will be treated // as equivalent to returning an error. Validator interface{} + // UnfinishedPolicy is the policy to apply when a workflow exits while + // the update handler is still running. + UnfinishedPolicy HandlerUnfinishedPolicy } ) @@ -2165,3 +2181,7 @@ func DeterministicKeysFunc[K comparable, V any](m map[K]V, cmp func(a K, b K) in slices.SortStableFunc(r, cmp) return r } + +func AllHandlersFinished(ctx Context) bool { + return len(getWorkflowEnvOptions(ctx).getRunningUpdateHandles()) == 0 +} diff --git a/internal/workflow_testsuite_test.go b/internal/workflow_testsuite_test.go index 465ab41fd..483a428e4 100644 --- a/internal/workflow_testsuite_test.go +++ b/internal/workflow_testsuite_test.go @@ -25,8 +25,12 @@ package internal import ( + "bytes" "context" + "encoding/json" "errors" + "fmt" + "log/slog" "strings" "sync/atomic" "testing" @@ -35,6 +39,7 @@ import ( "github.com/stretchr/testify/assert" failurepb "go.temporal.io/api/failure/v1" "go.temporal.io/sdk/converter" + "go.temporal.io/sdk/log" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" @@ -425,6 +430,198 @@ func TestWorkflowUpdateOrderAcceptReject(t *testing.T) { require.Equal(t, "unknown update bad update. KnownUpdates=[update]", updateRejectionErr.Error()) } +func TestAllHandlersFinished(t *testing.T) { + var suite WorkflowTestSuite + env := suite.NewTestWorkflowEnvironment() + + env.RegisterDelayedCallback(func() { + env.UpdateWorkflow("update", "id_1", &updateCallback{ + reject: func(err error) { + require.Fail(t, "update should not be rejected") + }, + accept: func() {}, + complete: func(interface{}, error) {}, + }) + }, 0) + + env.RegisterDelayedCallback(func() { + env.UpdateWorkflow("update", "id_2", &updateCallback{ + reject: func(err error) { + require.Fail(t, "update should not be rejected") + }, + accept: func() {}, + complete: func(interface{}, error) {}, + }) + }, time.Minute) + + env.ExecuteWorkflow(func(ctx Context) (int, error) { + var inflightUpdates int + var ranUpdates int + err := SetUpdateHandler(ctx, "update", func(ctx Context) error { + inflightUpdates++ + ranUpdates++ + defer func() { + inflightUpdates-- + }() + return Sleep(ctx, time.Hour) + }, UpdateHandlerOptions{}) + if err != nil { + return 0, err + } + err = Await(ctx, func() bool { return AllHandlersFinished(ctx) }) + return ranUpdates, err + }) + require.NoError(t, env.GetWorkflowError()) + var result int + require.NoError(t, env.GetWorkflowResult(&result)) + require.Equal(t, 2, result) +} + +func TestWorkflowAllHandlersFinished(t *testing.T) { + // runWf runs a workflow that sends two updates and then signals the workflow to complete + runWf := func(completionType string, buf *bytes.Buffer) (int, error) { + var suite WorkflowTestSuite + th := slog.NewJSONHandler(buf, &slog.HandlerOptions{Level: slog.LevelWarn}) + suite.SetLogger(log.NewStructuredLogger(slog.New(th))) + env := suite.NewTestWorkflowEnvironment() + + env.RegisterDelayedCallback(func() { + env.UpdateWorkflow("update", "id_1", &updateCallback{ + reject: func(err error) { + require.Fail(t, "update should not be rejected") + }, + accept: func() {}, + complete: func(interface{}, error) {}, + }) + }, 0) + + env.RegisterDelayedCallback(func() { + env.UpdateWorkflow("update", "id_2", &updateCallback{ + reject: func(err error) { + require.Fail(t, "update should not be rejected") + }, + accept: func() {}, + complete: func(interface{}, error) {}, + }) + }, time.Minute) + + env.RegisterDelayedCallback(func() { + if completionType == "cancel" { + env.CancelWorkflow() + } else { + env.SignalWorkflow("completion", completionType) + } + }, time.Minute*2) + + env.ExecuteWorkflow(func(ctx Context) (int, error) { + var inflightUpdates int + var ranUpdates int + err := SetUpdateHandler(ctx, "update", func(ctx Context) error { + inflightUpdates++ + ranUpdates++ + defer func() { + inflightUpdates-- + }() + return Sleep(ctx, time.Hour) + }, UpdateHandlerOptions{}) + if err != nil { + return 0, err + } + + var completeType string + s := NewSelector(ctx) + s.AddReceive(ctx.Done(), func(c ReceiveChannel, more bool) { + completeType = "cancel" + }).AddReceive(GetSignalChannel(ctx, "completion"), func(c ReceiveChannel, more bool) { + c.Receive(ctx, &completeType) + }).Select(ctx) + + if completeType == "cancel" { + return 0, ctx.Err() + } else if completeType == "complete" { + return ranUpdates, nil + } else if completeType == "failure" { + return 0, errors.New("test workflow failed") + } else if completeType == "continue-as-new" { + return 0, NewContinueAsNewError(ctx, "continue-as-new", nil) + } else { + panic("unknown completion type") + } + }) + err := env.GetWorkflowError() + if err != nil { + return 0, err + } + var result int + require.NoError(t, env.GetWorkflowResult(&result)) + return result, nil + } + // parseLogs parses the logs from the buffer and returns the logs as a slice of maps + parseLogs := func(buf *bytes.Buffer) []map[string]any { + var ms []map[string]any + for _, line := range bytes.Split(buf.Bytes(), []byte{'\n'}) { + if len(line) == 0 { + continue + } + var m map[string]any + err := json.Unmarshal(line, &m) + require.NoError(t, err) + fmt.Println(m) + ms = append(ms, m) + } + return ms + } + // parseWarnedUpdates parses the warned updates from the logs and returns them as a slice of maps + parseWarnedUpdates := func(updates interface{}) []map[string]interface{} { + var warnedUpdates []map[string]interface{} + for _, update := range updates.([]interface{}) { + warnedUpdates = append(warnedUpdates, update.(map[string]interface{})) + } + return warnedUpdates + + } + // assertExpectedLogs asserts that the logs in the buffer are as expected + assertExpectedLogs := func(t *testing.T, buf *bytes.Buffer) { + logs := parseLogs(buf) + require.Len(t, logs, 1) + require.Equal(t, unhandledUpdateWarningMessage, logs[0]["msg"]) + warnedUpdates := parseWarnedUpdates(logs[0]["Updates"]) + require.Len(t, warnedUpdates, 2) + // Order of updates is not guaranteed + require.Equal(t, "update", warnedUpdates[0]["name"]) + require.True(t, warnedUpdates[0]["id"] == "id_1" || warnedUpdates[0]["id"] == "id_2") + require.Equal(t, "update", warnedUpdates[1]["name"]) + require.True(t, warnedUpdates[1]["id"] != warnedUpdates[0]["id"]) + require.True(t, warnedUpdates[1]["id"] == "id_1" || warnedUpdates[1]["id"] == "id_2") + } + + t.Run("complete", func(t *testing.T) { + var buf bytes.Buffer + result, err := runWf("complete", &buf) + require.NoError(t, err) + require.Equal(t, 2, result) + assertExpectedLogs(t, &buf) + }) + t.Run("cancel", func(t *testing.T) { + var buf bytes.Buffer + _, err := runWf("cancel", &buf) + require.Error(t, err) + assertExpectedLogs(t, &buf) + }) + t.Run("failure", func(t *testing.T) { + var buf bytes.Buffer + _, err := runWf("failure", &buf) + require.Error(t, err) + assertExpectedLogs(t, &buf) + }) + t.Run("continue-as-new", func(t *testing.T) { + var buf bytes.Buffer + _, err := runWf("continue-as-new", &buf) + require.Error(t, err) + assertExpectedLogs(t, &buf) + }) +} + func TestWorkflowStartTimeInsideTestWorkflow(t *testing.T) { var suite WorkflowTestSuite env := suite.NewTestWorkflowEnvironment() diff --git a/workflow/workflow.go b/workflow/workflow.go index f8823a391..17e509799 100644 --- a/workflow/workflow.go +++ b/workflow/workflow.go @@ -35,6 +35,21 @@ import ( "golang.org/x/exp/constraints" ) +// HandlerUnfinishedPolicy defines the actions taken when a workflow exits while update handlers are +// running. The workflow exit may be due to successful return, failure, cancellation, or +// continue-as-new. +type HandlerUnfinishedPolicy = internal.HandlerUnfinishedPolicy + +const ( + // WarnAndAbandon issue a warning in addition to abandoning. + HandlerUnfinishedPolicyWarnAndAbandon = internal.HandlerUnfinishedPolicyWarnAndAbandon + // ABANDON the handler. + // + // In the case of an update handler this means that the client will receive an error rather + // than the update result. + HandlerUnfinishedPolicyAbandon = internal.HandlerUnfinishedPolicyAbandon +) + type ( // ChildWorkflowFuture represents the result of a child workflow execution @@ -692,3 +707,12 @@ func DeterministicKeys[K constraints.Ordered, V any](m map[K]V) []K { func DeterministicKeysFunc[K comparable, V any](m map[K]V, cmp func(K, K) int) []K { return internal.DeterministicKeysFunc(m, cmp) } + +// AllHandlersFinished returns true if all update handlers have finished execution. +// Consider waiting on this condition before workflow return or continue-as-new, to prevent +// interruption of in-progress handlers by workflow exit: +// +// workflow.Await(ctx, func() bool { return workflow.AllHandlersFinished(ctx) }) +func AllHandlersFinished(ctx Context) bool { + return internal.AllHandlersFinished(ctx) +}