Skip to content

Commit

Permalink
Fix race condition in Describe handler (cadence-workflow#6312)
Browse files Browse the repository at this point in the history
  • Loading branch information
Shaddoll authored Sep 27, 2024
1 parent 4424df7 commit 02984fd
Show file tree
Hide file tree
Showing 3 changed files with 317 additions and 3 deletions.
37 changes: 37 additions & 0 deletions common/persistence/data_manager_interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -1997,3 +1997,40 @@ func HasMoreRowsToDelete(rowsDeleted, batchSize int) bool {
}
return true
}

func (e *WorkflowExecutionInfo) CopyMemo() map[string][]byte {
if e.Memo == nil {
return nil
}
memo := make(map[string][]byte)
for k, v := range e.Memo {
val := make([]byte, len(v))
copy(val, v)
memo[k] = val
}
return memo
}

func (e *WorkflowExecutionInfo) CopySearchAttributes() map[string][]byte {
if e.SearchAttributes == nil {
return nil
}
searchAttr := make(map[string][]byte)
for k, v := range e.SearchAttributes {
val := make([]byte, len(v))
copy(val, v)
searchAttr[k] = val
}
return searchAttr
}

func (e *WorkflowExecutionInfo) CopyPartitionConfig() map[string]string {
if e.PartitionConfig == nil {
return nil
}
partitionConfig := make(map[string]string)
for k, v := range e.PartitionConfig {
partitionConfig[k] = v
}
return partitionConfig
}
277 changes: 277 additions & 0 deletions common/persistence/data_manager_interfaces_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package persistence

import (
"bytes"
"errors"
"fmt"
"testing"
Expand Down Expand Up @@ -232,3 +233,279 @@ func TestTimeStampConvertion(t *testing.T) {
unixNanoTime := DBTimestampToUnixNano(milisSecond)
assert.Equal(t, timeNow.UnixNano()/(1000*1000), unixNanoTime/(1000*1000)) // unixNano to milisSecond will result in info loss
}

func TestCopyMemo(t *testing.T) {
tests := []struct {
name string
inputMemo map[string][]byte
expectedOutput map[string][]byte
}{
{
name: "TC1: Memo is nil",
inputMemo: nil,
expectedOutput: nil,
},
{
name: "TC2: Memo is empty",
inputMemo: map[string][]byte{},
expectedOutput: map[string][]byte{},
},
{
name: "TC3: Memo contains multiple entries",
inputMemo: map[string][]byte{
"key1": []byte("val1"),
"key2": []byte("val2"),
},
expectedOutput: map[string][]byte{
"key1": []byte("val1"),
"key2": []byte("val2"),
},
},
{
name: "TC4: Memo contains empty byte slices",
inputMemo: map[string][]byte{
"key1": []byte(""),
"key2": []byte{},
},
expectedOutput: map[string][]byte{
"key1": []byte(""),
"key2": []byte{},
},
},
{
name: "TC5: Memo contains duplicate byte slices",
inputMemo: map[string][]byte{
"key1": []byte("dup"),
"key2": []byte("dup"),
},
expectedOutput: map[string][]byte{
"key1": []byte("dup"),
"key2": []byte("dup"),
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
execInfo := &WorkflowExecutionInfo{
Memo: tt.inputMemo,
}

copyMemo := execInfo.CopyMemo()

// Check if both are nil
if tt.expectedOutput == nil {
if copyMemo != nil {
t.Errorf("Expected nil, got %v", copyMemo)
}
return
}

// Check if lengths match
if len(copyMemo) != len(tt.expectedOutput) {
t.Errorf("Expected map length %d, got %d", len(tt.expectedOutput), len(copyMemo))
}

// Check each key-value pair
for key, expectedVal := range tt.expectedOutput {
copiedVal, exists := copyMemo[key]
if !exists {
t.Errorf("Expected key %s not found in copy", key)
continue
}

if !bytes.Equal(copiedVal, expectedVal) {
t.Errorf("For key %s, expected value %v, got %v", key, expectedVal, copiedVal)
}

// Ensure that the byte slices are different underlying arrays (deep copy)
if len(copiedVal) > 0 && &copiedVal[0] == &expectedVal[0] {
t.Errorf("For key %s, byte slices reference the same underlying array", key)
}
}
})
}
}

func TestCopySearchAttributes(t *testing.T) {
tests := []struct {
name string
inputSearchAttrs map[string][]byte
expectedOutputAttrs map[string][]byte
}{
{
name: "TC1: SearchAttributes is nil",
inputSearchAttrs: nil,
expectedOutputAttrs: nil,
},
{
name: "TC2: SearchAttributes is empty",
inputSearchAttrs: map[string][]byte{},
expectedOutputAttrs: map[string][]byte{},
},
{
name: "TC3: SearchAttributes contains multiple entries",
inputSearchAttrs: map[string][]byte{
"attr1": []byte("value1"),
"attr2": []byte("value2"),
},
expectedOutputAttrs: map[string][]byte{
"attr1": []byte("value1"),
"attr2": []byte("value2"),
},
},
{
name: "TC4: SearchAttributes contains empty byte slices",
inputSearchAttrs: map[string][]byte{
"attr1": []byte(""),
"attr2": []byte{},
},
expectedOutputAttrs: map[string][]byte{
"attr1": []byte(""),
"attr2": []byte{},
},
},
{
name: "TC5: SearchAttributes contains duplicate byte slices",
inputSearchAttrs: map[string][]byte{
"attr1": []byte("dup"),
"attr2": []byte("dup"),
},
expectedOutputAttrs: map[string][]byte{
"attr1": []byte("dup"),
"attr2": []byte("dup"),
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
execInfo := &WorkflowExecutionInfo{
SearchAttributes: tt.inputSearchAttrs,
}

copyAttrs := execInfo.CopySearchAttributes()

// Check if both are nil
if tt.expectedOutputAttrs == nil {
if copyAttrs != nil {
t.Errorf("Expected nil, got %v", copyAttrs)
}
return
}

// Check if lengths match
if len(copyAttrs) != len(tt.expectedOutputAttrs) {
t.Errorf("Expected map length %d, got %d", len(tt.expectedOutputAttrs), len(copyAttrs))
}

// Check each key-value pair
for key, expectedVal := range tt.expectedOutputAttrs {
copiedVal, exists := copyAttrs[key]
if !exists {
t.Errorf("Expected key %s not found in copy", key)
continue
}

if !bytes.Equal(copiedVal, expectedVal) {
t.Errorf("For key %s, expected value %v, got %v", key, expectedVal, copiedVal)
}

// Ensure that the byte slices are different underlying arrays (deep copy)
if len(copiedVal) > 0 && &copiedVal[0] == &expectedVal[0] {
t.Errorf("For key %s, byte slices reference the same underlying array", key)
}
}
})
}
}

func TestCopyPartitionConfig(t *testing.T) {
tests := []struct {
name string
inputPartitionConfig map[string]string
expectedOutputConfig map[string]string
}{
{
name: "TC1: PartitionConfig is nil",
inputPartitionConfig: nil,
expectedOutputConfig: nil,
},
{
name: "TC2: PartitionConfig is empty",
inputPartitionConfig: map[string]string{},
expectedOutputConfig: map[string]string{},
},
{
name: "TC3: PartitionConfig contains multiple entries",
inputPartitionConfig: map[string]string{
"partition1": "config1",
"partition2": "config2",
},
expectedOutputConfig: map[string]string{
"partition1": "config1",
"partition2": "config2",
},
},
{
name: "TC4: PartitionConfig contains empty strings",
inputPartitionConfig: map[string]string{
"partition1": "",
"partition2": "",
},
expectedOutputConfig: map[string]string{
"partition1": "",
"partition2": "",
},
},
{
name: "TC5: PartitionConfig contains duplicate values",
inputPartitionConfig: map[string]string{
"partition1": "dup",
"partition2": "dup",
},
expectedOutputConfig: map[string]string{
"partition1": "dup",
"partition2": "dup",
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
execInfo := &WorkflowExecutionInfo{
PartitionConfig: tt.inputPartitionConfig,
}

copyConfig := execInfo.CopyPartitionConfig()

// Check if both are nil
if tt.expectedOutputConfig == nil {
if copyConfig != nil {
t.Errorf("Expected nil, got %v", copyConfig)
}
return
}

// Check if lengths match
if len(copyConfig) != len(tt.expectedOutputConfig) {
t.Errorf("Expected map length %d, got %d", len(tt.expectedOutputConfig), len(copyConfig))
}

// Check each key-value pair
for key, expectedVal := range tt.expectedOutputConfig {
copiedVal, exists := copyConfig[key]
if !exists {
t.Errorf("Expected key %s not found in copy", key)
continue
}

if copiedVal != expectedVal {
t.Errorf("For key %s, expected value %s, got %s", key, expectedVal, copiedVal)
}
}

// Since strings are immutable in Go, no need to check underlying references
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,11 @@ func (e *historyEngineImpl) DescribeWorkflowExecution(
StartTime: common.Int64Ptr(executionInfo.StartTimestamp.UnixNano()),
HistoryLength: mutableState.GetNextEventID() - common.FirstEventID,
AutoResetPoints: executionInfo.AutoResetPoints,
Memo: &types.Memo{Fields: executionInfo.Memo},
Memo: &types.Memo{Fields: executionInfo.CopyMemo()},
IsCron: len(executionInfo.CronSchedule) > 0,
UpdateTime: common.Int64Ptr(executionInfo.LastUpdatedTimestamp.UnixNano()),
SearchAttributes: &types.SearchAttributes{IndexedFields: executionInfo.SearchAttributes},
PartitionConfig: executionInfo.PartitionConfig,
SearchAttributes: &types.SearchAttributes{IndexedFields: executionInfo.CopySearchAttributes()},
PartitionConfig: executionInfo.CopyPartitionConfig(),
},
}

Expand Down

0 comments on commit 02984fd

Please sign in to comment.