Skip to content

Commit

Permalink
Fix bug that headers are removed in indexes for closed workflows (cad…
Browse files Browse the repository at this point in the history
…ence-workflow#6234)

What changed?

Cause:
For recording closed workflow task, headers are not appended into search attributes and thus the last closed event would remove the header in visibility.

Fix

move the logic to base transfer task
header append logic should exist in every record update (recordstart, upsert, recordclose)
Why?

From staging and production rolled out environment, we found header are not indexed correctly for closed workflows.

How did you test it?
unit test
  • Loading branch information
shijiesheng authored Aug 22, 2024
1 parent b689bad commit c0cd4c5
Show file tree
Hide file tree
Showing 5 changed files with 137 additions and 35 deletions.
15 changes: 5 additions & 10 deletions service/history/task/transfer_active_task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,7 @@ func (t *transferActiveTaskExecutor) processCloseExecutionTaskHelper(
workflowExecutionTimestamp := getWorkflowExecutionTimestamp(mutableState, startEvent)
visibilityMemo := getWorkflowMemo(executionInfo.Memo)
searchAttr := executionInfo.SearchAttributes
headers := getWorkflowHeaders(startEvent)
domainName := mutableState.GetDomainEntry().GetInfo().Name
children, err := filterPendingChildExecutions(
task.TargetDomainIDs,
Expand Down Expand Up @@ -475,6 +476,7 @@ func (t *transferActiveTaskExecutor) processCloseExecutionTaskHelper(
numClusters,
updateTimestamp.UnixNano(),
searchAttr,
headers,
); err != nil {
return err
}
Expand Down Expand Up @@ -992,16 +994,7 @@ func (t *transferActiveTaskExecutor) processRecordWorkflowStartedOrUpsertHelper(
executionTimestamp := getWorkflowExecutionTimestamp(mutableState, startEvent)
visibilityMemo := getWorkflowMemo(executionInfo.Memo)
searchAttr := copySearchAttributes(executionInfo.SearchAttributes)
if t.config.EnableContextHeaderInVisibility(domainEntry.GetInfo().Name) {
if attributes := startEvent.GetWorkflowExecutionStartedEventAttributes(); attributes != nil && attributes.Header != nil {
// fail open to avoid blocking the task processing
if newSearchAttr, err := appendContextHeaderToSearchAttributes(searchAttr, attributes.Header.Fields, t.config.ValidSearchAttributes()); err != nil {
t.logger.Error("failed to add headers to search attributes", tag.Error(err))
} else {
searchAttr = newSearchAttr
}
}
}
headers := getWorkflowHeaders(startEvent)
isCron := len(executionInfo.CronSchedule) > 0
numClusters := (int16)(len(domainEntry.GetReplicationConfig().Clusters))
updateTimestamp := t.shard.GetTimeSource().Now()
Expand All @@ -1028,6 +1021,7 @@ func (t *transferActiveTaskExecutor) processRecordWorkflowStartedOrUpsertHelper(
visibilityMemo,
updateTimestamp.UnixNano(),
searchAttr,
headers,
)
}
return t.upsertWorkflowExecution(
Expand All @@ -1046,6 +1040,7 @@ func (t *transferActiveTaskExecutor) processRecordWorkflowStartedOrUpsertHelper(
numClusters,
updateTimestamp.UnixNano(),
searchAttr,
headers,
)
}

Expand Down
69 changes: 68 additions & 1 deletion service/history/task/transfer_active_task_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -644,6 +644,8 @@ func (s *transferActiveTaskExecutorSuite) TestProcessCloseExecution_NoParent() {
workflowExecution, mutableState, decisionCompletionID, err := test.SetupWorkflowWithCompletedDecision(s.mockShard, s.domainID)
s.NoError(err)

startEvent, err := mutableState.GetStartEvent(context.Background())
s.Require().NoError(err)
event := test.AddCompleteWorkflowEvent(mutableState, decisionCompletionID, nil)

transferTask := s.newTransferTaskFromInfo(&persistence.TransferTaskInfo{
Expand All @@ -660,9 +662,22 @@ func (s *transferActiveTaskExecutorSuite) TestProcessCloseExecution_NoParent() {
persistenceMutableState, err := test.CreatePersistenceMutableState(mutableState, event.ID, event.Version)
s.NoError(err)
s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything, mock.Anything).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil)
s.mockVisibilityMgr.On("RecordWorkflowExecutionClosed", mock.Anything, mock.Anything).Return(nil).Once()
s.mockVisibilityMgr.On("RecordWorkflowExecutionClosed", mock.Anything, createRecordWorkflowExecutionClosedRequest(
s.T(),
s.domainName, startEvent, transferTask, mutableState, 2, s.mockShard.GetTimeSource().Now(), event.Timestamp,
true),
).Return(nil).Once()
s.mockArchivalMetadata.On("GetVisibilityConfig").Return(archiver.NewArchivalConfig("enabled", dc.GetStringPropertyFn("enabled"), true, dc.GetBoolPropertyFn(true), "disabled", "random URI"))
s.mockArchivalClient.On("Archive", mock.Anything, mock.Anything).Return(nil, nil).Once()
// switch on context header in viz
s.mockShard.GetConfig().EnableContextHeaderInVisibility = func(domain string) bool {
return true
}
s.mockShard.GetConfig().ValidSearchAttributes = func(opts ...dc.FilterOption) map[string]interface{} {
return map[string]interface{}{
"Header_context_key": struct{}{},
}
}

err = s.transferActiveTaskExecutor.Execute(transferTask, true)
s.Nil(err)
Expand Down Expand Up @@ -1544,6 +1559,7 @@ func (s *transferActiveTaskExecutorSuite) TestProcessRecordWorkflowStartedTaskWi
s.mockShard.GetConfig().ValidSearchAttributes = func(opts ...dc.FilterOption) map[string]interface{} {
return map[string]interface{}{
"Header_context_key": struct{}{},
"123456": struct{}{}, // unsanitizable key
}
}

Expand Down Expand Up @@ -1802,6 +1818,57 @@ func createRecordWorkflowExecutionStartedRequest(
}
}

func createRecordWorkflowExecutionClosedRequest(
t *testing.T,
domainName string,
startEvent *types.HistoryEvent,
transferTask Task,
mutableState execution.MutableState,
numClusters int16,
updateTime time.Time,
closeTimestamp *int64,
enableContextHeaderInVisibility bool,
) *persistence.RecordWorkflowExecutionClosedRequest {
taskInfo := transferTask.GetInfo().(*persistence.TransferTaskInfo)
workflowExecution := types.WorkflowExecution{
WorkflowID: taskInfo.WorkflowID,
RunID: taskInfo.RunID,
}
executionInfo := mutableState.GetExecutionInfo()
backoffSeconds := startEvent.WorkflowExecutionStartedEventAttributes.GetFirstDecisionTaskBackoffSeconds()
executionTimestamp := int64(0)
if backoffSeconds != 0 {
executionTimestamp = startEvent.GetTimestamp() + int64(backoffSeconds)*int64(time.Second)
}
var searchAttributes map[string][]byte
if enableContextHeaderInVisibility {
contextValueJSONString, err := json.Marshal("contextValue")
if err != nil {
t.Fatal(err)
}
searchAttributes = map[string][]byte{
"Header_context_key": contextValueJSONString,
}
}
return &persistence.RecordWorkflowExecutionClosedRequest{
Domain: domainName,
DomainUUID: taskInfo.DomainID,
Execution: workflowExecution,
HistoryLength: mutableState.GetNextEventID() - 1,
WorkflowTypeName: executionInfo.WorkflowTypeName,
StartTimestamp: startEvent.GetTimestamp(),
ExecutionTimestamp: executionTimestamp,
TaskID: taskInfo.TaskID,
TaskList: taskInfo.TaskList,
IsCron: len(executionInfo.CronSchedule) > 0,
NumClusters: numClusters,
UpdateTimestamp: updateTime.UnixNano(),
CloseTimestamp: *closeTimestamp,
RetentionSeconds: int64(mutableState.GetDomainEntry().GetRetentionDays(taskInfo.GetWorkflowID()) * 24 * 3600),
SearchAttributes: searchAttributes,
}
}

func createTestRequestCancelWorkflowExecutionRequest(
targetDomainName string,
taskInfo *persistence.TransferTaskInfo,
Expand Down
15 changes: 5 additions & 10 deletions service/history/task/transfer_standby_task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,7 @@ func (t *transferStandbyTaskExecutor) processCloseExecution(
workflowExecutionTimestamp := getWorkflowExecutionTimestamp(mutableState, startEvent)
visibilityMemo := getWorkflowMemo(executionInfo.Memo)
searchAttr := executionInfo.SearchAttributes
headers := getWorkflowHeaders(startEvent)
isCron := len(executionInfo.CronSchedule) > 0
updateTimestamp := t.shard.GetTimeSource().Now()

Expand Down Expand Up @@ -290,6 +291,7 @@ func (t *transferStandbyTaskExecutor) processCloseExecution(
numClusters,
updateTimestamp.UnixNano(),
searchAttr,
headers,
)
}

Expand Down Expand Up @@ -493,16 +495,7 @@ func (t *transferStandbyTaskExecutor) processRecordWorkflowStartedOrUpsertHelper
numClusters := (int16)(len(domainEntry.GetReplicationConfig().Clusters))

searchAttr := copySearchAttributes(executionInfo.SearchAttributes)
if t.config.EnableContextHeaderInVisibility(domainEntry.GetInfo().Name) {
if attributes := startEvent.GetWorkflowExecutionStartedEventAttributes(); attributes != nil && attributes.Header != nil {
// fail open to avoid blocking the task processing
if newSearchAttr, err := appendContextHeaderToSearchAttributes(searchAttr, attributes.Header.Fields, t.config.ValidSearchAttributes()); err != nil {
t.logger.Error("failed to add headers to search attributes", tag.Error(err))
} else {
searchAttr = newSearchAttr
}
}
}
headers := getWorkflowHeaders(startEvent)

if isRecordStart {
workflowStartedScope.IncCounter(metrics.WorkflowStartedCount)
Expand All @@ -522,6 +515,7 @@ func (t *transferStandbyTaskExecutor) processRecordWorkflowStartedOrUpsertHelper
visibilityMemo,
updateTimestamp.UnixNano(),
searchAttr,
headers,
)
}
return t.upsertWorkflowExecution(
Expand All @@ -540,6 +534,7 @@ func (t *transferStandbyTaskExecutor) processRecordWorkflowStartedOrUpsertHelper
numClusters,
updateTimestamp.UnixNano(),
searchAttr,
headers,
)

}
Expand Down
72 changes: 58 additions & 14 deletions service/history/task/transfer_task_executor_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
"fmt"
"time"

"go.uber.org/multierr"

"github.com/uber/cadence/client/matching"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/backoff"
Expand Down Expand Up @@ -160,11 +162,13 @@ func (t *transferTaskExecutorBase) recordWorkflowStarted(
visibilityMemo *types.Memo,
updateTimeUnixNano int64,
searchAttributes map[string][]byte,
headers map[string][]byte,
) error {

domain := defaultDomainName

if domainEntry, err := t.shard.GetDomainCache().GetDomainByID(domainID); err != nil {
domainEntry, err := t.shard.GetDomainCache().GetDomainByID(domainID)
if err != nil {
if _, ok := err.(*types.EntityNotExistsError); !ok {
return err
}
Expand All @@ -177,6 +181,14 @@ func (t *transferTaskExecutorBase) recordWorkflowStarted(
}
}

// headers are appended into search attributes if enabled
if t.config.EnableContextHeaderInVisibility(domainEntry.GetInfo().Name) {
// fail open, if error occurs, just log it; successfully appended headers will be stored
if searchAttributes, err = appendContextHeaderToSearchAttributes(searchAttributes, headers, t.config.ValidSearchAttributes()); err != nil {
t.logger.Error("failed to add headers to search attributes", tag.Error(err))
}
}

request := &persistence.RecordWorkflowExecutionStartedRequest{
DomainUUID: domainID,
Domain: domain,
Expand Down Expand Up @@ -234,6 +246,7 @@ func (t *transferTaskExecutorBase) upsertWorkflowExecution(
numClusters int16,
updateTimeUnixNano int64,
searchAttributes map[string][]byte,
headers map[string][]byte,
) error {

domain, err := t.shard.GetDomainCache().GetDomainName(domainID)
Expand All @@ -244,6 +257,14 @@ func (t *transferTaskExecutorBase) upsertWorkflowExecution(
domain = defaultDomainName
}

// headers are appended into search attributes if enabled
if t.config.EnableContextHeaderInVisibility(domain) {
// fail open, if error occurs, just log it; successfully appended headers will be stored
if searchAttributes, err = appendContextHeaderToSearchAttributes(searchAttributes, headers, t.config.ValidSearchAttributes()); err != nil {
t.logger.Error("failed to add headers to search attributes", tag.Error(err))
}
}

request := &persistence.UpsertWorkflowExecutionRequest{
DomainUUID: domainID,
Domain: domain,
Expand Down Expand Up @@ -286,6 +307,7 @@ func (t *transferTaskExecutorBase) recordWorkflowClosed(
numClusters int16,
updateTimeUnixNano int64,
searchAttributes map[string][]byte,
headers map[string][]byte,
) error {

// Record closing in visibility store
Expand Down Expand Up @@ -315,6 +337,13 @@ func (t *transferTaskExecutorBase) recordWorkflowClosed(
}

if recordWorkflowClose {
// headers are appended into search attributes if enabled
if t.config.EnableContextHeaderInVisibility(domainEntry.GetInfo().Name) {
// fail open, if error occurs, just log it; successfully appended headers will be stored
if searchAttributes, err = appendContextHeaderToSearchAttributes(searchAttributes, headers, t.config.ValidSearchAttributes()); err != nil {
t.logger.Error("failed to add headers to search attributes", tag.Error(err))
}
}
if err := t.visibilityMgr.RecordWorkflowExecutionClosed(ctx, &persistence.RecordWorkflowExecutionClosedRequest{
DomainUUID: domainID,
Domain: domain,
Expand Down Expand Up @@ -401,31 +430,46 @@ func getWorkflowMemo(
return &types.Memo{Fields: memo}
}

// context headers are appended to search attributes if in allow list; return errors when all context key is processed
func appendContextHeaderToSearchAttributes(attr, context map[string][]byte, allowedKeys map[string]interface{}) (map[string][]byte, error) {
// sanity check
if attr == nil {
attr = make(map[string][]byte)
}
var errGroup error
for k, v := range context {
unsanitizedKey := fmt.Sprintf(definition.HeaderFormat, k)
key, err := visibility.SanitizeSearchAttributeKey(unsanitizedKey)
if err != nil {
return nil, fmt.Errorf("fail to sanitize context key %s: %w", key, err)
sanitizedKey, err := visibility.SanitizeSearchAttributeKey(k)
if err != nil { // This could never happen
multierr.Append(errGroup, fmt.Errorf("fail to sanitize context key %s: %w", k, err))
continue
}

key := fmt.Sprintf(definition.HeaderFormat, sanitizedKey)
if _, ok := attr[key]; ok { // skip if key already exists
continue
}
if _, allowed := allowedKeys[key]; !allowed { // skip if not allowed
continue
}
if attr == nil {
attr = make(map[string][]byte)
}
// context header are raw string bytes, need to be json encoded to be stored in search attributes
data, err := json.Marshal(string(v))
if err != nil {
return nil, fmt.Errorf("fail to json encoding context key %s, val %v: %w", k, v, err)
}
// ignore error as it can't happen to err on json encoding string
data, _ := json.Marshal(string(v))
attr[key] = data
}
return attr, nil
return attr, errGroup
}

func getWorkflowHeaders(startEvent *types.HistoryEvent) map[string][]byte {
attr := startEvent.GetWorkflowExecutionStartedEventAttributes()
if attr == nil || attr.Header == nil {
return nil
}
headers := make(map[string][]byte, len(attr.Header.Fields))
for k, v := range attr.Header.Fields {
val := make([]byte, len(v))
copy(val, v)
headers[k] = val
}
return headers
}

func copySearchAttributes(
Expand Down
1 change: 1 addition & 0 deletions service/history/testing/workflow_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ func StartWorkflow(
TaskStartToCloseTimeoutSeconds: common.Int32Ptr(1),
Header: &types.Header{Fields: map[string][]byte{
"context-key": []byte("contextValue"),
"123456": []byte("123456"), // unsanitizable key
"invalid-context-key": []byte("invalidContextValue"),
}},
},
Expand Down

0 comments on commit c0cd4c5

Please sign in to comment.