Skip to content

Commit

Permalink
Add header parameter to allow context propagation (cadence-workflow#1731
Browse files Browse the repository at this point in the history
)

* Add header parameter to start workflow/activity methods
* Header parameter on StartChildWorkflow
* Header parameter on ContinueAsNewDecision
  • Loading branch information
shreyassrivatsan authored Apr 30, 2019
1 parent 3e9afde commit 14fd1a1
Show file tree
Hide file tree
Showing 15 changed files with 540 additions and 46 deletions.
4 changes: 2 additions & 2 deletions .gen/go/shared/idl.go

Large diffs are not rendered by default.

472 changes: 446 additions & 26 deletions .gen/go/shared/types.go

Large diffs are not rendered by default.

3 changes: 0 additions & 3 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 17 additions & 0 deletions host/activity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,18 @@ func (s *integrationSuite) TestActivityHeartBeatWorkflow_Success() {
taskList := &workflow.TaskList{}
taskList.Name = common.StringPtr(tl)

header := &workflow.Header{
Fields: map[string][]byte{"tracing": []byte("sample data")},
}

request := &workflow.StartWorkflowExecutionRequest{
RequestId: common.StringPtr(uuid.New()),
Domain: common.StringPtr(s.domainName),
WorkflowId: common.StringPtr(id),
WorkflowType: workflowType,
TaskList: taskList,
Input: nil,
Header: header,
ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(100),
TaskStartToCloseTimeoutSeconds: common.Int32Ptr(1),
Identity: common.StringPtr(identity),
Expand Down Expand Up @@ -84,6 +89,7 @@ func (s *integrationSuite) TestActivityHeartBeatWorkflow_Success() {
ActivityType: &workflow.ActivityType{Name: common.StringPtr(activityName)},
TaskList: &workflow.TaskList{Name: &tl},
Input: buf.Bytes(),
Header: header,
ScheduleToCloseTimeoutSeconds: common.Int32Ptr(15),
ScheduleToStartTimeoutSeconds: common.Int32Ptr(1),
StartToCloseTimeoutSeconds: common.Int32Ptr(15),
Expand Down Expand Up @@ -143,6 +149,17 @@ func (s *integrationSuite) TestActivityHeartBeatWorkflow_Success() {
s.Nil(err)
s.True(workflowComplete)
s.True(activityExecutedCount == 1)

// go over history and verify that the activity task scheduled event has header on it
events := s.getHistory(s.domainName, &workflow.WorkflowExecution{
WorkflowId: common.StringPtr(id),
RunId: common.StringPtr(we.GetRunId()),
})
for _, event := range events {
if *event.EventType == workflow.EventTypeActivityTaskScheduled {
s.Equal(header, event.ActivityTaskScheduledEventAttributes.Header)
}
}
}

func (s *integrationSuite) TestActivityHeartbeatDetailsDuringRetry() {
Expand Down
7 changes: 7 additions & 0 deletions host/continueasnew_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,18 @@ func (s *integrationSuite) TestContinueAsNewWorkflow() {
taskList := &workflow.TaskList{}
taskList.Name = common.StringPtr(tl)

header := &workflow.Header{
Fields: map[string][]byte{"tracing": []byte("sample payload")},
}

request := &workflow.StartWorkflowExecutionRequest{
RequestId: common.StringPtr(uuid.New()),
Domain: common.StringPtr(s.domainName),
WorkflowId: common.StringPtr(id),
WorkflowType: workflowType,
TaskList: taskList,
Input: nil,
Header: header,
ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(100),
TaskStartToCloseTimeoutSeconds: common.Int32Ptr(10),
Identity: common.StringPtr(identity),
Expand Down Expand Up @@ -80,6 +85,7 @@ func (s *integrationSuite) TestContinueAsNewWorkflow() {
WorkflowType: workflowType,
TaskList: &workflow.TaskList{Name: &tl},
Input: buf.Bytes(),
Header: header,
ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(100),
TaskStartToCloseTimeoutSeconds: common.Int32Ptr(10),
},
Expand Down Expand Up @@ -117,6 +123,7 @@ func (s *integrationSuite) TestContinueAsNewWorkflow() {
s.Nil(err)
s.True(workflowComplete)
s.Equal(previousRunID, lastRunStartedEvent.WorkflowExecutionStartedEventAttributes.GetContinuedExecutionRunId())
s.Equal(header, lastRunStartedEvent.WorkflowExecutionStartedEventAttributes.Header)
}

func (s *integrationSuite) TestContinueAsNewWorkflow_Timeout() {
Expand Down
8 changes: 8 additions & 0 deletions host/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1580,13 +1580,18 @@ func (s *integrationSuite) TestChildWorkflowExecution() {
taskListChild := &workflow.TaskList{}
taskListChild.Name = common.StringPtr(tlChild)

header := &workflow.Header{
Fields: map[string][]byte{"tracing": []byte("sample payload")},
}

request := &workflow.StartWorkflowExecutionRequest{
RequestId: common.StringPtr(uuid.New()),
Domain: common.StringPtr(s.domainName),
WorkflowId: common.StringPtr(parentID),
WorkflowType: parentWorkflowType,
TaskList: taskListParent,
Input: nil,
Header: header,
ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(100),
TaskStartToCloseTimeoutSeconds: common.Int32Ptr(1),
ChildPolicy: common.ChildPolicyPtr(workflow.ChildPolicyRequestCancel),
Expand Down Expand Up @@ -1622,6 +1627,7 @@ func (s *integrationSuite) TestChildWorkflowExecution() {
WorkflowType: childWorkflowType,
TaskList: taskListChild,
Input: []byte("child-workflow-input"),
Header: header,
ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(200),
TaskStartToCloseTimeoutSeconds: common.Int32Ptr(2),
ChildPolicy: common.ChildPolicyPtr(workflow.ChildPolicyRequestCancel),
Expand Down Expand Up @@ -1715,6 +1721,8 @@ func (s *integrationSuite) TestChildWorkflowExecution() {
s.Equal(startedEvent.ChildWorkflowExecutionStartedEventAttributes.GetInitiatedEventId(),
childStartedEvent.WorkflowExecutionStartedEventAttributes.GetParentInitiatedEventId())
s.Equal(workflow.ChildPolicyRequestCancel, childStartedEvent.WorkflowExecutionStartedEventAttributes.GetChildPolicy())
s.Equal(header, startedEvent.ChildWorkflowExecutionStartedEventAttributes.Header)
s.Equal(header, childStartedEvent.WorkflowExecutionStartedEventAttributes.Header)

// Process ChildExecution completed event and complete parent execution
_, err = pollerParent.PollAndProcessDecisionTask(false, false)
Expand Down
10 changes: 10 additions & 0 deletions idl/github.com/uber/cadence/shared.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,7 @@ struct ScheduleActivityTaskDecisionAttributes {
55: optional i32 startToCloseTimeoutSeconds
60: optional i32 heartbeatTimeoutSeconds
70: optional RetryPolicy retryPolicy
80: optional Header header
}

struct RequestCancelActivityTaskDecisionAttributes {
Expand Down Expand Up @@ -397,6 +398,7 @@ struct ContinueAsNewWorkflowExecutionDecisionAttributes {
100: optional binary failureDetails
110: optional binary lastCompletionResult
120: optional string cronSchedule
130: optional Header header
}

struct StartChildWorkflowExecutionDecisionAttributes {
Expand All @@ -412,6 +414,7 @@ struct StartChildWorkflowExecutionDecisionAttributes {
100: optional WorkflowIdReusePolicy workflowIdReusePolicy
110: optional RetryPolicy retryPolicy
120: optional string cronSchedule
130: optional Header header
}

struct Decision {
Expand Down Expand Up @@ -453,6 +456,7 @@ struct WorkflowExecutionStartedEventAttributes {
110: optional i32 firstDecisionTaskBackoffSeconds
120: optional Memo memo
130: optional ResetPoints prevAutoResetPoints
140: optional Header header
}

struct ResetPoints{
Expand Down Expand Up @@ -502,6 +506,7 @@ struct WorkflowExecutionContinuedAsNewEventAttributes {
100: optional string failureReason
110: optional binary failureDetails
120: optional binary lastCompletionResult
130: optional Header header
}

struct DecisionTaskScheduledEventAttributes {
Expand Down Expand Up @@ -555,6 +560,7 @@ struct ActivityTaskScheduledEventAttributes {
60: optional i32 heartbeatTimeoutSeconds
90: optional i64 (js.type = "Long") decisionTaskCompletedEventId
110: optional RetryPolicy retryPolicy
120: optional Header header
}

struct ActivityTaskStartedEventAttributes {
Expand Down Expand Up @@ -724,6 +730,7 @@ struct StartChildWorkflowExecutionInitiatedEventAttributes {
110: optional WorkflowIdReusePolicy workflowIdReusePolicy
120: optional RetryPolicy retryPolicy
130: optional string cronSchedule
140: optional Header header
}

struct StartChildWorkflowExecutionFailedEventAttributes {
Expand All @@ -741,6 +748,7 @@ struct ChildWorkflowExecutionStartedEventAttributes {
20: optional i64 (js.type = "Long") initiatedEventId
30: optional WorkflowExecution workflowExecution
40: optional WorkflowType workflowType
50: optional Header header
}

struct ChildWorkflowExecutionCompletedEventAttributes {
Expand Down Expand Up @@ -975,6 +983,7 @@ struct StartWorkflowExecutionRequest {
120: optional RetryPolicy retryPolicy
130: optional string cronSchedule
140: optional Memo memo
150: optional Header header
}

struct StartWorkflowExecutionResponse {
Expand Down Expand Up @@ -1051,6 +1060,7 @@ struct PollForActivityTaskResponse {
140: optional binary heartbeatDetails
150: optional WorkflowType workflowType
160: optional string workflowDomain
170: optional Header header
}

struct RecordActivityTaskHeartbeatRequest {
Expand Down
8 changes: 4 additions & 4 deletions service/history/MockMutableState.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,12 +239,12 @@ func (_m *mockMutableState) AddChildWorkflowExecutionFailedEvent(_a0 int64, _a1
}

// AddChildWorkflowExecutionStartedEvent provides a mock function with given fields: _a0, _a1, _a2, _a3
func (_m *mockMutableState) AddChildWorkflowExecutionStartedEvent(_a0 *string, _a1 *shared.WorkflowExecution, _a2 *shared.WorkflowType, _a3 int64) *shared.HistoryEvent {
ret := _m.Called(_a0, _a1, _a2, _a3)
func (_m *mockMutableState) AddChildWorkflowExecutionStartedEvent(_a0 *string, _a1 *shared.WorkflowExecution, _a2 *shared.WorkflowType, _a3 int64, _a4 *shared.Header) *shared.HistoryEvent {
ret := _m.Called(_a0, _a1, _a2, _a3, _a4)

var r0 *shared.HistoryEvent
if rf, ok := ret.Get(0).(func(*string, *shared.WorkflowExecution, *shared.WorkflowType, int64) *shared.HistoryEvent); ok {
r0 = rf(_a0, _a1, _a2, _a3)
if rf, ok := ret.Get(0).(func(*string, *shared.WorkflowExecution, *shared.WorkflowType, int64, *shared.Header) *shared.HistoryEvent); ok {
r0 = rf(_a0, _a1, _a2, _a3, _a4)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*shared.HistoryEvent)
Expand Down
25 changes: 20 additions & 5 deletions service/history/historyBuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,9 +376,14 @@ func (b *historyBuilder) AddStartChildWorkflowExecutionInitiatedEvent(decisionCo
return b.addEventToHistory(event)
}

func (b *historyBuilder) AddChildWorkflowExecutionStartedEvent(domain *string, execution *workflow.WorkflowExecution,
workflowType *workflow.WorkflowType, initiatedID int64) *workflow.HistoryEvent {
event := b.newChildWorkflowExecutionStartedEvent(domain, execution, workflowType, initiatedID)
func (b *historyBuilder) AddChildWorkflowExecutionStartedEvent(
domain *string,
execution *workflow.WorkflowExecution,
workflowType *workflow.WorkflowType,
initiatedID int64,
header *workflow.Header,
) *workflow.HistoryEvent {
event := b.newChildWorkflowExecutionStartedEvent(domain, execution, workflowType, initiatedID, header)

return b.addEventToHistory(event)
}
Expand Down Expand Up @@ -453,6 +458,7 @@ func (b *historyBuilder) newWorkflowExecutionStartedEvent(
attributes := &workflow.WorkflowExecutionStartedEventAttributes{}
attributes.WorkflowType = request.WorkflowType
attributes.TaskList = request.TaskList
attributes.Header = request.Header
attributes.Input = request.Input
attributes.ExecutionStartToCloseTimeoutSeconds = common.Int32Ptr(*request.ExecutionStartToCloseTimeoutSeconds)
attributes.TaskStartToCloseTimeoutSeconds = common.Int32Ptr(*request.TaskStartToCloseTimeoutSeconds)
Expand Down Expand Up @@ -547,6 +553,7 @@ func (b *historyBuilder) newActivityTaskScheduledEvent(decisionTaskCompletedEven
attributes.ActivityId = common.StringPtr(common.StringDefault(scheduleAttributes.ActivityId))
attributes.ActivityType = scheduleAttributes.ActivityType
attributes.TaskList = scheduleAttributes.TaskList
attributes.Header = scheduleAttributes.Header
attributes.Input = scheduleAttributes.Input
attributes.ScheduleToCloseTimeoutSeconds = common.Int32Ptr(common.Int32Default(scheduleAttributes.ScheduleToCloseTimeoutSeconds))
attributes.ScheduleToStartTimeoutSeconds = common.Int32Ptr(common.Int32Default(scheduleAttributes.ScheduleToStartTimeoutSeconds))
Expand Down Expand Up @@ -818,6 +825,7 @@ func (b *historyBuilder) newWorkflowExecutionContinuedAsNewEvent(decisionTaskCom
attributes.NewExecutionRunId = common.StringPtr(newRunID)
attributes.WorkflowType = request.WorkflowType
attributes.TaskList = request.TaskList
attributes.Header = request.Header
attributes.Input = request.Input
attributes.ExecutionStartToCloseTimeoutSeconds = common.Int32Ptr(*request.ExecutionStartToCloseTimeoutSeconds)
attributes.TaskStartToCloseTimeoutSeconds = common.Int32Ptr(*request.TaskStartToCloseTimeoutSeconds)
Expand All @@ -843,6 +851,7 @@ func (b *historyBuilder) newStartChildWorkflowExecutionInitiatedEvent(decisionTa
attributes.WorkflowId = startAttributes.WorkflowId
attributes.WorkflowType = startAttributes.WorkflowType
attributes.TaskList = startAttributes.TaskList
attributes.Header = startAttributes.Header
attributes.Input = startAttributes.Input
attributes.ExecutionStartToCloseTimeoutSeconds = startAttributes.ExecutionStartToCloseTimeoutSeconds
attributes.TaskStartToCloseTimeoutSeconds = startAttributes.TaskStartToCloseTimeoutSeconds
Expand All @@ -857,14 +866,20 @@ func (b *historyBuilder) newStartChildWorkflowExecutionInitiatedEvent(decisionTa
return historyEvent
}

func (b *historyBuilder) newChildWorkflowExecutionStartedEvent(domain *string, execution *workflow.WorkflowExecution,
workflowType *workflow.WorkflowType, initiatedID int64) *workflow.HistoryEvent {
func (b *historyBuilder) newChildWorkflowExecutionStartedEvent(
domain *string,
execution *workflow.WorkflowExecution,
workflowType *workflow.WorkflowType,
initiatedID int64,
header *workflow.Header,
) *workflow.HistoryEvent {
historyEvent := b.msBuilder.CreateNewHistoryEvent(workflow.EventTypeChildWorkflowExecutionStarted)
attributes := &workflow.ChildWorkflowExecutionStartedEventAttributes{}
attributes.Domain = domain
attributes.WorkflowExecution = execution
attributes.WorkflowType = workflowType
attributes.InitiatedEventId = common.Int64Ptr(initiatedID)
attributes.Header = header
historyEvent.ChildWorkflowExecutionStartedEventAttributes = attributes

return historyEvent
Expand Down
1 change: 1 addition & 0 deletions service/history/historyEngine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4962,6 +4962,7 @@ func addChildWorkflowExecutionStartedEvent(builder mutableState, initiatedID int
},
&workflow.WorkflowType{Name: common.StringPtr(workflowType)},
initiatedID,
&workflow.Header{},
)
return event
}
Expand Down
2 changes: 1 addition & 1 deletion service/history/mutableState.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ type (
AddChildWorkflowExecutionCanceledEvent(int64, *workflow.WorkflowExecution, *workflow.WorkflowExecutionCanceledEventAttributes) *workflow.HistoryEvent
AddChildWorkflowExecutionCompletedEvent(int64, *workflow.WorkflowExecution, *workflow.WorkflowExecutionCompletedEventAttributes) *workflow.HistoryEvent
AddChildWorkflowExecutionFailedEvent(int64, *workflow.WorkflowExecution, *workflow.WorkflowExecutionFailedEventAttributes) *workflow.HistoryEvent
AddChildWorkflowExecutionStartedEvent(*string, *workflow.WorkflowExecution, *workflow.WorkflowType, int64) *workflow.HistoryEvent
AddChildWorkflowExecutionStartedEvent(*string, *workflow.WorkflowExecution, *workflow.WorkflowType, int64, *workflow.Header) *workflow.HistoryEvent
AddChildWorkflowExecutionTerminatedEvent(int64, *workflow.WorkflowExecution, *workflow.WorkflowExecutionTerminatedEventAttributes) *workflow.HistoryEvent
AddChildWorkflowExecutionTimedOutEvent(int64, *workflow.WorkflowExecution, *workflow.WorkflowExecutionTimedOutEventAttributes) *workflow.HistoryEvent
AddCompletedWorkflowEvent(int64, *workflow.CompleteWorkflowExecutionDecisionAttributes) *workflow.HistoryEvent
Expand Down
12 changes: 9 additions & 3 deletions service/history/mutableStateBuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1327,6 +1327,7 @@ func (e *mutableStateBuilder) addWorkflowExecutionStartedEventForContinueAsNew(d
TaskStartToCloseTimeoutSeconds: common.Int32Ptr(decisionTimeout),
ExecutionStartToCloseTimeoutSeconds: attributes.ExecutionStartToCloseTimeoutSeconds,
Input: attributes.Input,
Header: attributes.Header,
RetryPolicy: attributes.RetryPolicy,
CronSchedule: attributes.CronSchedule,
}
Expand Down Expand Up @@ -2754,8 +2755,13 @@ func (e *mutableStateBuilder) ReplicateStartChildWorkflowExecutionInitiatedEvent
return ci
}

func (e *mutableStateBuilder) AddChildWorkflowExecutionStartedEvent(domain *string, execution *workflow.WorkflowExecution,
workflowType *workflow.WorkflowType, initiatedID int64) *workflow.HistoryEvent {
func (e *mutableStateBuilder) AddChildWorkflowExecutionStartedEvent(
domain *string,
execution *workflow.WorkflowExecution,
workflowType *workflow.WorkflowType,
initiatedID int64,
header *workflow.Header,
) *workflow.HistoryEvent {
ci, ok := e.GetChildExecutionInfo(initiatedID)
if !ok || ci.StartedID != common.EmptyEventID {
e.logger.Warn("Invalid history builder state for action",
Expand All @@ -2767,7 +2773,7 @@ func (e *mutableStateBuilder) AddChildWorkflowExecutionStartedEvent(domain *stri
return nil
}

event := e.hBuilder.AddChildWorkflowExecutionStartedEvent(domain, execution, workflowType, initiatedID)
event := e.hBuilder.AddChildWorkflowExecutionStartedEvent(domain, execution, workflowType, initiatedID, header)
if err := e.ReplicateChildWorkflowExecutionStartedEvent(event); err != nil {
return nil
}
Expand Down
10 changes: 8 additions & 2 deletions service/history/transferQueueActiveProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -777,6 +777,7 @@ func (t *transferQueueActiveProcessorImpl) processStartChildExecution(task *pers
WorkflowType: attributes.WorkflowType,
TaskList: attributes.TaskList,
Input: attributes.Input,
Header: attributes.Header,
ExecutionStartToCloseTimeoutSeconds: attributes.ExecutionStartToCloseTimeoutSeconds,
TaskStartToCloseTimeoutSeconds: attributes.TaskStartToCloseTimeoutSeconds,
// Use the same request ID to dedupe StartWorkflowExecution calls
Expand Down Expand Up @@ -898,11 +899,16 @@ func (t *transferQueueActiveProcessorImpl) recordChildExecutionStarted(task *per
return &workflow.EntityNotExistsError{Message: "Pending child execution not found."}
}

msBuilder.AddChildWorkflowExecutionStartedEvent(domain,
msBuilder.AddChildWorkflowExecutionStartedEvent(
domain,
&workflow.WorkflowExecution{
WorkflowId: common.StringPtr(task.TargetWorkflowID),
RunId: common.StringPtr(runID),
}, initiatedAttributes.WorkflowType, initiatedEventID)
},
initiatedAttributes.WorkflowType,
initiatedEventID,
initiatedAttributes.Header,
)

return nil
})
Expand Down
1 change: 1 addition & 0 deletions service/matching/matchingEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -646,6 +646,7 @@ func (e *matchingEngineImpl) createPollForActivityTaskResponse(context *taskCont
response := &workflow.PollForActivityTaskResponse{}
response.ActivityId = attributes.ActivityId
response.ActivityType = attributes.ActivityType
response.Header = attributes.Header
response.Input = attributes.Input
response.WorkflowExecution = workflowExecutionPtr(context.workflowExecution)
response.ScheduledTimestampOfThisAttempt = historyResponse.ScheduledTimestampOfThisAttempt
Expand Down
Loading

0 comments on commit 14fd1a1

Please sign in to comment.