Skip to content

Commit

Permalink
Update matching simulation test to support getting partition config f…
Browse files Browse the repository at this point in the history
…rom database (cadence-workflow#6500)
  • Loading branch information
Shaddoll authored Nov 16, 2024
1 parent 6f6249b commit 0105e2d
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 12 deletions.
39 changes: 27 additions & 12 deletions host/matching_simulation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,17 +105,18 @@ func TestMatchingSimulationSuite(t *testing.T) {
isolationGroups := getIsolationGroups(&clusterConfig.MatchingConfig.SimulationConfig)

clusterConfig.MatchingDynamicConfigOverrides = map[dynamicconfig.Key]interface{}{
dynamicconfig.MatchingNumTasklistWritePartitions: getPartitions(clusterConfig.MatchingConfig.SimulationConfig.TaskListWritePartitions),
dynamicconfig.MatchingNumTasklistReadPartitions: getPartitions(clusterConfig.MatchingConfig.SimulationConfig.TaskListReadPartitions),
dynamicconfig.MatchingForwarderMaxOutstandingPolls: getForwarderMaxOutstandingPolls(clusterConfig.MatchingConfig.SimulationConfig.ForwarderMaxOutstandingPolls),
dynamicconfig.MatchingForwarderMaxOutstandingTasks: getForwarderMaxOutstandingTasks(clusterConfig.MatchingConfig.SimulationConfig.ForwarderMaxOutstandingTasks),
dynamicconfig.MatchingForwarderMaxRatePerSecond: getForwarderMaxRPS(clusterConfig.MatchingConfig.SimulationConfig.ForwarderMaxRatePerSecond),
dynamicconfig.MatchingForwarderMaxChildrenPerNode: getForwarderMaxChildPerNode(clusterConfig.MatchingConfig.SimulationConfig.ForwarderMaxChildrenPerNode),
dynamicconfig.LocalPollWaitTime: clusterConfig.MatchingConfig.SimulationConfig.LocalPollWaitTime,
dynamicconfig.LocalTaskWaitTime: clusterConfig.MatchingConfig.SimulationConfig.LocalTaskWaitTime,
dynamicconfig.EnableTasklistIsolation: len(isolationGroups) > 0,
dynamicconfig.AllIsolationGroups: isolationGroups,
dynamicconfig.TasklistLoadBalancerStrategy: getTasklistLoadBalancerStrategy(clusterConfig.MatchingConfig.SimulationConfig.TasklistLoadBalancerStrategy),
dynamicconfig.MatchingNumTasklistWritePartitions: getPartitions(clusterConfig.MatchingConfig.SimulationConfig.TaskListWritePartitions),
dynamicconfig.MatchingNumTasklistReadPartitions: getPartitions(clusterConfig.MatchingConfig.SimulationConfig.TaskListReadPartitions),
dynamicconfig.MatchingForwarderMaxOutstandingPolls: getForwarderMaxOutstandingPolls(clusterConfig.MatchingConfig.SimulationConfig.ForwarderMaxOutstandingPolls),
dynamicconfig.MatchingForwarderMaxOutstandingTasks: getForwarderMaxOutstandingTasks(clusterConfig.MatchingConfig.SimulationConfig.ForwarderMaxOutstandingTasks),
dynamicconfig.MatchingForwarderMaxRatePerSecond: getForwarderMaxRPS(clusterConfig.MatchingConfig.SimulationConfig.ForwarderMaxRatePerSecond),
dynamicconfig.MatchingForwarderMaxChildrenPerNode: getForwarderMaxChildPerNode(clusterConfig.MatchingConfig.SimulationConfig.ForwarderMaxChildrenPerNode),
dynamicconfig.LocalPollWaitTime: clusterConfig.MatchingConfig.SimulationConfig.LocalPollWaitTime,
dynamicconfig.LocalTaskWaitTime: clusterConfig.MatchingConfig.SimulationConfig.LocalTaskWaitTime,
dynamicconfig.EnableTasklistIsolation: len(isolationGroups) > 0,
dynamicconfig.AllIsolationGroups: isolationGroups,
dynamicconfig.TasklistLoadBalancerStrategy: getTasklistLoadBalancerStrategy(clusterConfig.MatchingConfig.SimulationConfig.TasklistLoadBalancerStrategy),
dynamicconfig.MatchingEnableGetNumberOfPartitionsFromCache: clusterConfig.MatchingConfig.SimulationConfig.GetPartitionConfigFromDB,
}

ctrl := gomock.NewController(t)
Expand Down Expand Up @@ -187,6 +188,19 @@ func (s *MatchingSimulationSuite) TestMatchingSimulation() {
domainID := s.domainID(ctx)
tasklist := "my-tasklist"

if s.testClusterConfig.MatchingConfig.SimulationConfig.GetPartitionConfigFromDB {
_, err := s.testCluster.GetMatchingClient().UpdateTaskListPartitionConfig(ctx, &types.MatchingUpdateTaskListPartitionConfigRequest{
DomainUUID: domainID,
TaskList: &types.TaskList{Name: tasklist, Kind: types.TaskListKindNormal.Ptr()},
TaskListType: types.TaskListTypeDecision.Ptr(),
PartitionConfig: &types.TaskListPartitionConfig{
NumReadPartitions: int32(getPartitions(s.testClusterConfig.MatchingConfig.SimulationConfig.TaskListReadPartitions)),
NumWritePartitions: int32(getPartitions(s.testClusterConfig.MatchingConfig.SimulationConfig.TaskListWritePartitions)),
},
})
s.NoError(err)
}

// Start stat collector
statsCh := make(chan *operationStats, 200000)
aggStats := make(map[operation]*operationAggStats)
Expand Down Expand Up @@ -259,6 +273,7 @@ func (s *MatchingSimulationSuite) TestMatchingSimulation() {
testSummary = append(testSummary, fmt.Sprintf("Record Decision Task Started Time: %v", s.testClusterConfig.MatchingConfig.SimulationConfig.RecordDecisionTaskStartedTime))
testSummary = append(testSummary, fmt.Sprintf("Num of Write Partitions: %d", s.testClusterConfig.MatchingDynamicConfigOverrides[dynamicconfig.MatchingNumTasklistWritePartitions]))
testSummary = append(testSummary, fmt.Sprintf("Num of Read Partitions: %d", s.testClusterConfig.MatchingDynamicConfigOverrides[dynamicconfig.MatchingNumTasklistReadPartitions]))
testSummary = append(testSummary, fmt.Sprintf("Get Num of Partitions from DB: %v", s.testClusterConfig.MatchingDynamicConfigOverrides[dynamicconfig.MatchingEnableGetNumberOfPartitionsFromCache]))
testSummary = append(testSummary, fmt.Sprintf("Tasklist load balancer strategy: %v", s.testClusterConfig.MatchingDynamicConfigOverrides[dynamicconfig.TasklistLoadBalancerStrategy]))
testSummary = append(testSummary, fmt.Sprintf("Forwarder Max Outstanding Polls: %d", s.testClusterConfig.MatchingDynamicConfigOverrides[dynamicconfig.MatchingForwarderMaxOutstandingPolls]))
testSummary = append(testSummary, fmt.Sprintf("Forwarder Max Outstanding Tasks: %d", s.testClusterConfig.MatchingDynamicConfigOverrides[dynamicconfig.MatchingForwarderMaxOutstandingTasks]))
Expand Down Expand Up @@ -318,7 +333,7 @@ func (s *MatchingSimulationSuite) generate(
start := time.Now()
decisionTask := newDecisionTask(domainID, tasklist, isolationGroup, scheduleID)
reqCtx, cancel := context.WithTimeout(ctx, 2*time.Second)
err := matchingClient.AddDecisionTask(reqCtx, decisionTask)
_, err := matchingClient.AddDecisionTask(reqCtx, decisionTask)
statsCh <- &operationStats{
op: operationAddDecisionTask,
dur: time.Since(start),
Expand Down
2 changes: 2 additions & 0 deletions host/onebox.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,8 @@ type (
Pollers []SimulationPollerConfiguration

Tasks []SimulationTaskConfiguration

GetPartitionConfigFromDB bool
}

SimulationPollerConfiguration struct {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
enablearchival: false
clusterno: 1
messagingclientconfig:
usemock: true
historyconfig:
numhistoryshards: 4
numhistoryhosts: 1
matchingconfig:
nummatchinghosts: 4
simulationconfig:
tasklistwritepartitions: 4
tasklistreadpartitions: 4
forwardermaxoutstandingpolls: 1
forwardermaxoutstandingtasks: 1
forwardermaxratepersecond: 10
forwardermaxchildrenpernode: 20
localpollwaittime: 10ms
localtaskwaittime: 10ms
getpartitionconfigfromdb: true
tasks:
- numtaskgenerators: 2
taskspersecond: 80
maxtasktogenerate: 3000
pollers:
- taskprocesstime: 1ms
numpollers: 8
polltimeout: 60s
workerconfig:
enableasyncwfconsumer: false

0 comments on commit 0105e2d

Please sign in to comment.