Skip to content

Commit

Permalink
Experimental user metadata and workflow metadata query support (tempo…
Browse files Browse the repository at this point in the history
  • Loading branch information
cretz authored Aug 30, 2024
1 parent e85a098 commit f47e644
Show file tree
Hide file tree
Showing 20 changed files with 586 additions and 40 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ jobs:
DISABLE_NEXUS_TESTS: "1"
# TODO(bergundy): Remove this flag too once server 1.25.0 is out. Thanks Roey! :)
DISABLE_BACKLOG_STATS_TESTS: "1"
# TODO(cretz): Remove this flag once server 1.25.0 is out.
DISABLE_USER_METADATA_TESTS: "1"
working-directory: ./internal/cmd/build

cloud-test:
Expand Down
20 changes: 20 additions & 0 deletions internal/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ const (
// QueryTypeOpenSessions is the build in query type for Client.QueryWorkflow() call. Use this query type to get all open
// sessions in the workflow. The result will be a list of SessionInfo encoded in the EncodedValue.
QueryTypeOpenSessions string = "__open_sessions"

// QueryTypeWorkflowMetadata is the query name for the workflow metadata.
QueryTypeWorkflowMetadata string = "__temporal_workflow_metadata"
)

type (
Expand Down Expand Up @@ -721,6 +724,23 @@ type (
// Cannot be set the same time as a CronSchedule.
StartDelay time.Duration

// StaticSummary - Single-line fixed summary for this workflow execution that will appear in UI/CLI. This can be
// in single-line Temporal markdown format.
//
// Optional: defaults to none/empty.
//
// NOTE: Experimental
StaticSummary string

// Details - General fixed details for this workflow execution that will appear in UI/CLI. This can be in
// Temporal markdown format and can span multiple lines. This is a fixed value on the workflow that cannot be
// updated. For details that can be updated, use SetCurrentDetails within the workflow.
//
// Optional: defaults to none/empty.
//
// NOTE: Experimental
StaticDetails string

// request ID. Only settable by the SDK - e.g. [temporalnexus.workflowRunOperation].
requestID string
// workflow completion callback. Only settable by the SDK - e.g. [temporalnexus.workflowRunOperation].
Expand Down
13 changes: 13 additions & 0 deletions internal/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,9 @@ type WorkflowOutboundInterceptor interface {
// NewTimer intercepts workflow.NewTimer.
NewTimer(ctx Context, d time.Duration) Future

// NewTimer intercepts workflow.NewTimerWithOptions.
NewTimerWithOptions(ctx Context, d time.Duration, options TimerOptions) Future

// Sleep intercepts workflow.Sleep.
Sleep(ctx Context, d time.Duration) (err error)

Expand Down Expand Up @@ -276,6 +279,11 @@ type WorkflowOutboundInterceptor interface {
// GetSignalChannel intercepts workflow.GetSignalChannel.
GetSignalChannel(ctx Context, signalName string) ReceiveChannel

// GetSignalChannelWithOptions intercepts workflow.GetSignalChannelWithOptions.
//
// NOTE: Experimental
GetSignalChannelWithOptions(ctx Context, signalName string, options SignalChannelOptions) ReceiveChannel

// SideEffect intercepts workflow.SideEffect.
SideEffect(ctx Context, f func(ctx Context) interface{}) converter.EncodedValue

Expand All @@ -293,6 +301,11 @@ type WorkflowOutboundInterceptor interface {
// SetQueryHandler intercepts workflow.SetQueryHandler.
SetQueryHandler(ctx Context, queryType string, handler interface{}) error

// SetQueryHandlerWithOptions intercepts workflow.SetQueryHandlerWithOptions.
//
// NOTE: Experimental
SetQueryHandlerWithOptions(ctx Context, queryType string, handler interface{}, options QueryHandlerOptions) error

// SetUpdateHandler intercepts workflow.SetUpdateHandler.
//
// NOTE: Experimental
Expand Down
32 changes: 32 additions & 0 deletions internal/interceptor_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,15 @@ func (w *WorkflowOutboundInterceptorBase) NewTimer(ctx Context, d time.Duration)
return w.Next.NewTimer(ctx, d)
}

// NewTimerWithOptions implements WorkflowOutboundInterceptor.NewTimerWithOptions.
func (w *WorkflowOutboundInterceptorBase) NewTimerWithOptions(
ctx Context,
d time.Duration,
options TimerOptions,
) Future {
return w.Next.NewTimerWithOptions(ctx, d, options)
}

// Sleep implements WorkflowOutboundInterceptor.Sleep.
func (w *WorkflowOutboundInterceptorBase) Sleep(ctx Context, d time.Duration) (err error) {
return w.Next.Sleep(ctx, d)
Expand Down Expand Up @@ -319,6 +328,17 @@ func (w *WorkflowOutboundInterceptorBase) GetSignalChannel(ctx Context, signalNa
return w.Next.GetSignalChannel(ctx, signalName)
}

// GetSignalChannelWithOptions implements WorkflowOutboundInterceptor.GetSignalChannelWithOptions.
//
// NOTE: Experimental
func (w *WorkflowOutboundInterceptorBase) GetSignalChannelWithOptions(
ctx Context,
signalName string,
options SignalChannelOptions,
) ReceiveChannel {
return w.Next.GetSignalChannelWithOptions(ctx, signalName, options)
}

// SideEffect implements WorkflowOutboundInterceptor.SideEffect.
func (w *WorkflowOutboundInterceptorBase) SideEffect(
ctx Context,
Expand Down Expand Up @@ -352,6 +372,18 @@ func (w *WorkflowOutboundInterceptorBase) SetQueryHandler(ctx Context, queryType
return w.Next.SetQueryHandler(ctx, queryType, handler)
}

// SetQueryHandlerWithOptions implements WorkflowOutboundInterceptor.SetQueryHandlerWithOptions.
//
// NOTE: Experimental
func (w *WorkflowOutboundInterceptorBase) SetQueryHandlerWithOptions(
ctx Context,
queryType string,
handler interface{},
options QueryHandlerOptions,
) error {
return w.Next.SetQueryHandlerWithOptions(ctx, queryType, handler, options)
}

// SetUpdateHandler implements WorkflowOutboundInterceptor.SetUpdateHandler.
func (w *WorkflowOutboundInterceptorBase) SetUpdateHandler(ctx Context, updateName string, handler interface{}, opts UpdateHandlerOptions) error {
return w.Next.SetUpdateHandler(ctx, updateName, handler, opts)
Expand Down
40 changes: 32 additions & 8 deletions internal/internal_command_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
enumspb "go.temporal.io/api/enums/v1"
failurepb "go.temporal.io/api/failure/v1"
historypb "go.temporal.io/api/history/v1"
"go.temporal.io/api/sdk/v1"

"go.temporal.io/sdk/converter"
"go.temporal.io/sdk/internal/common/util"
Expand Down Expand Up @@ -89,7 +90,8 @@ type (

timerCommandStateMachine struct {
*commandStateMachineBase
attributes *commandpb.StartTimerCommandAttributes
attributes *commandpb.StartTimerCommandAttributes
startMetadata *sdk.UserMetadata
}

cancelTimerCommandStateMachine struct {
Expand All @@ -99,7 +101,8 @@ type (

childWorkflowCommandStateMachine struct {
*commandStateMachineBase
attributes *commandpb.StartChildWorkflowExecutionCommandAttributes
attributes *commandpb.StartChildWorkflowExecutionCommandAttributes
startMetadata *sdk.UserMetadata
}

naiveCommandStateMachine struct {
Expand Down Expand Up @@ -385,11 +388,15 @@ func (h *commandsHelper) newRequestCancelNexusOperationStateMachine(attributes *
}
}

func (h *commandsHelper) newTimerCommandStateMachine(attributes *commandpb.StartTimerCommandAttributes) *timerCommandStateMachine {
func (h *commandsHelper) newTimerCommandStateMachine(
attributes *commandpb.StartTimerCommandAttributes,
startMetadata *sdk.UserMetadata,
) *timerCommandStateMachine {
base := h.newCommandStateMachineBase(commandTypeTimer, attributes.GetTimerId())
return &timerCommandStateMachine{
commandStateMachineBase: base,
attributes: attributes,
startMetadata: startMetadata,
}
}

Expand All @@ -401,11 +408,15 @@ func (h *commandsHelper) newCancelTimerCommandStateMachine(attributes *commandpb
}
}

func (h *commandsHelper) newChildWorkflowCommandStateMachine(attributes *commandpb.StartChildWorkflowExecutionCommandAttributes) *childWorkflowCommandStateMachine {
func (h *commandsHelper) newChildWorkflowCommandStateMachine(
attributes *commandpb.StartChildWorkflowExecutionCommandAttributes,
startMetadata *sdk.UserMetadata,
) *childWorkflowCommandStateMachine {
base := h.newCommandStateMachineBase(commandTypeChildWorkflow, attributes.GetWorkflowId())
return &childWorkflowCommandStateMachine{
commandStateMachineBase: base,
attributes: attributes,
startMetadata: startMetadata,
}
}

Expand Down Expand Up @@ -692,6 +703,7 @@ func (d *timerCommandStateMachine) getCommand() *commandpb.Command {
case commandStateCreated, commandStateCanceledBeforeSent:
command := createNewCommand(enumspb.COMMAND_TYPE_START_TIMER)
command.Attributes = &commandpb.Command_StartTimerCommandAttributes{StartTimerCommandAttributes: d.attributes}
command.UserMetadata = d.startMetadata
return command
default:
return nil
Expand All @@ -714,6 +726,7 @@ func (d *childWorkflowCommandStateMachine) getCommand() *commandpb.Command {
case commandStateCreated:
command := createNewCommand(enumspb.COMMAND_TYPE_START_CHILD_WORKFLOW_EXECUTION)
command.Attributes = &commandpb.Command_StartChildWorkflowExecutionCommandAttributes{StartChildWorkflowExecutionCommandAttributes: d.attributes}
command.UserMetadata = d.startMetadata
return command
case commandStateCanceledAfterStarted:
command := createNewCommand(enumspb.COMMAND_TYPE_REQUEST_CANCEL_EXTERNAL_WORKFLOW_EXECUTION)
Expand Down Expand Up @@ -1365,8 +1378,11 @@ func (h *commandsHelper) recordMutableSideEffectMarker(mutableSideEffectID strin
// to server, and have it reject it - but here the command ID is exactly equal to the child's wf ID,
// and changing that without potentially blowing up backwards compatability is difficult. So we
// return the error eagerly locally, which is at least an improvement on panicking.
func (h *commandsHelper) startChildWorkflowExecution(attributes *commandpb.StartChildWorkflowExecutionCommandAttributes) (commandStateMachine, error) {
command := h.newChildWorkflowCommandStateMachine(attributes)
func (h *commandsHelper) startChildWorkflowExecution(
attributes *commandpb.StartChildWorkflowExecutionCommandAttributes,
startMetadata *sdk.UserMetadata,
) (commandStateMachine, error) {
command := h.newChildWorkflowCommandStateMachine(attributes, startMetadata)
if h.commands[command.getID()] != nil {
return nil, &childWorkflowExistsWithId{id: attributes.WorkflowId}
}
Expand Down Expand Up @@ -1556,8 +1572,16 @@ func (h *commandsHelper) getSignalID(initiatedEventID int64) string {
return signalID
}

func (h *commandsHelper) startTimer(attributes *commandpb.StartTimerCommandAttributes) commandStateMachine {
command := h.newTimerCommandStateMachine(attributes)
func (h *commandsHelper) startTimer(
attributes *commandpb.StartTimerCommandAttributes,
options TimerOptions,
dc converter.DataConverter,
) commandStateMachine {
startMetadata, err := buildUserMetadata(options.Summary, "", dc)
if err != nil {
panic(err)
}
command := h.newTimerCommandStateMachine(attributes, startMetadata)
h.addCommand(command)
return command
}
Expand Down
20 changes: 10 additions & 10 deletions internal/internal_command_state_machine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func Test_TimerStateMachine_CancelBeforeSent(t *testing.T) {
TimerId: timerID,
}
h := newCommandsHelper()
d := h.startTimer(attributes)
d := h.startTimer(attributes, TimerOptions{}, converter.GetDefaultDataConverter())
require.Equal(t, commandStateCreated, d.getState())
h.cancelTimer(TimerID{timerID})
require.Equal(t, commandStateCanceledBeforeSent, d.getState())
Expand All @@ -60,7 +60,7 @@ func Test_TimerStateMachine_CancelAfterInitiated(t *testing.T) {
TimerId: timerID,
}
h := newCommandsHelper()
d := h.startTimer(attributes)
d := h.startTimer(attributes, TimerOptions{}, converter.GetDefaultDataConverter())
require.Equal(t, commandStateCreated, d.getState())
commands := h.getCommands(true)
require.Equal(t, commandStateCommandSent, d.getState())
Expand All @@ -86,7 +86,7 @@ func Test_TimerStateMachine_CompletedAfterCancel(t *testing.T) {
TimerId: timerID,
}
h := newCommandsHelper()
d := h.startTimer(attributes)
d := h.startTimer(attributes, TimerOptions{}, converter.GetDefaultDataConverter())
require.Equal(t, commandStateCreated, d.getState())
commands := h.getCommands(true)
require.Equal(t, commandStateCommandSent, d.getState())
Expand Down Expand Up @@ -114,7 +114,7 @@ func Test_TimerStateMachine_CompleteWithoutCancel(t *testing.T) {
TimerId: timerID,
}
h := newCommandsHelper()
d := h.startTimer(attributes)
d := h.startTimer(attributes, TimerOptions{}, converter.GetDefaultDataConverter())
require.Equal(t, commandStateCreated, d.getState())
commands := h.getCommands(true)
require.Equal(t, commandStateCommandSent, d.getState())
Expand All @@ -135,7 +135,7 @@ func Test_TimerCancelEventOrdering(t *testing.T) {
TimerId: timerID,
}
h := newCommandsHelper()
d := h.startTimer(attributes)
d := h.startTimer(attributes, TimerOptions{}, converter.GetDefaultDataConverter())
require.Equal(t, commandStateCreated, d.getState())
commands := h.getCommands(true)
require.Equal(t, commandStateCommandSent, d.getState())
Expand Down Expand Up @@ -353,7 +353,7 @@ func Test_ChildWorkflowStateMachine_Basic(t *testing.T) {
h := newCommandsHelper()

// start child workflow
d, err := h.startChildWorkflowExecution(attributes)
d, err := h.startChildWorkflowExecution(attributes, nil)
require.NoError(t, err)
require.Equal(t, commandStateCreated, d.getState())

Expand Down Expand Up @@ -392,7 +392,7 @@ func Test_ChildWorkflowStateMachine_CancelSucceed(t *testing.T) {
h := newCommandsHelper()

// start child workflow
d, err := h.startChildWorkflowExecution(attributes)
d, err := h.startChildWorkflowExecution(attributes, nil)
require.NoError(t, err)
// send command
_ = h.getCommands(true)
Expand Down Expand Up @@ -437,7 +437,7 @@ func Test_ChildWorkflowStateMachine_InvalidStates(t *testing.T) {
h := newCommandsHelper()

// start child workflow
d, err := h.startChildWorkflowExecution(attributes)
d, err := h.startChildWorkflowExecution(attributes, nil)
require.NoError(t, err)
require.Equal(t, commandStateCreated, d.getState())

Expand Down Expand Up @@ -514,7 +514,7 @@ func Test_ChildWorkflow_UnusualCancelationOrdering(t *testing.T) {
h := newCommandsHelper()

// start child workflow
_, err := h.startChildWorkflowExecution(attributes)
_, err := h.startChildWorkflowExecution(attributes, nil)
require.NoError(t, err)
// send command
h.getCommands(true)
Expand Down Expand Up @@ -548,7 +548,7 @@ func Test_ChildWorkflowStateMachine_CancelFailed(t *testing.T) {
h := newCommandsHelper()

// start child workflow
d, err := h.startChildWorkflowExecution(attributes)
d, err := h.startChildWorkflowExecution(attributes, nil)
require.NoError(t, err)
// send command
h.getCommands(true)
Expand Down
17 changes: 14 additions & 3 deletions internal/internal_event_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -602,7 +602,13 @@ func (wc *workflowEnvironmentImpl) ExecuteChildWorkflow(
attributes.InheritBuildId = determineInheritBuildIdFlagForCommand(
params.VersioningIntent, wc.workflowInfo.TaskQueueName, params.TaskQueueName)

command, err := wc.commandsHelper.startChildWorkflowExecution(attributes)
startMetadata, err := buildUserMetadata(params.staticSummary, params.staticDetails, wc.dataConverter)
if err != nil {
callback(nil, err)
return
}

command, err := wc.commandsHelper.startChildWorkflowExecution(attributes, startMetadata)
if _, ok := err.(*childWorkflowExistsWithId); ok {
if wc.sdkFlags.tryUse(SDKFlagChildWorkflowErrorExecution, !wc.isReplay) {
startedHandler(WorkflowExecution{}, &ChildWorkflowExecutionAlreadyStartedError{})
Expand Down Expand Up @@ -824,7 +830,7 @@ func (wc *workflowEnvironmentImpl) Now() time.Time {
return wc.currentReplayTime
}

func (wc *workflowEnvironmentImpl) NewTimer(d time.Duration, callback ResultHandler) *TimerID {
func (wc *workflowEnvironmentImpl) NewTimer(d time.Duration, options TimerOptions, callback ResultHandler) *TimerID {
if d < 0 {
callback(nil, fmt.Errorf("negative duration provided %v", d))
return nil
Expand All @@ -839,7 +845,7 @@ func (wc *workflowEnvironmentImpl) NewTimer(d time.Duration, callback ResultHand
startTimerAttr.TimerId = timerID
startTimerAttr.StartToFireTimeout = durationpb.New(d)

command := wc.commandsHelper.startTimer(startTimerAttr)
command := wc.commandsHelper.startTimer(startTimerAttr, options, wc.GetDataConverter())
command.setData(&scheduledTimer{callback: callback})

wc.logger.Debug("NewTimer",
Expand Down Expand Up @@ -1392,6 +1398,11 @@ func (weh *workflowExecutionEventHandlerImpl) ProcessQuery(
return weh.encodeArg(weh.StackTrace())
case QueryTypeOpenSessions:
return weh.encodeArg(weh.getOpenSessions())
case QueryTypeWorkflowMetadata:
// We are intentionally not handling this here but rather in the
// normal handler so it has access to the options/context as
// needed.
fallthrough
default:
result, err := weh.queryHandler(queryType, queryArgs, header)
if err != nil {
Expand Down
6 changes: 6 additions & 0 deletions internal/internal_schedule_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -637,6 +637,11 @@ func convertToPBScheduleAction(
return nil, err
}

userMetadata, err := buildUserMetadata(action.staticSummary, action.staticDetails, dataConverter)
if err != nil {
return nil, err
}

return &schedulepb.ScheduleAction{
Action: &schedulepb.ScheduleAction_StartWorkflow{
StartWorkflow: &workflowpb.NewWorkflowExecutionInfo{
Expand All @@ -651,6 +656,7 @@ func convertToPBScheduleAction(
Memo: memo,
SearchAttributes: searchAttrs,
Header: header,
UserMetadata: userMetadata,
},
},
}, nil
Expand Down
Loading

0 comments on commit f47e644

Please sign in to comment.