Skip to content

Commit

Permalink
Update matching simulation test to test weighted load balancer (caden…
Browse files Browse the repository at this point in the history
  • Loading branch information
Shaddoll authored Dec 2, 2024
1 parent 018fff9 commit 291be5d
Show file tree
Hide file tree
Showing 4 changed files with 131 additions and 1 deletion.
54 changes: 53 additions & 1 deletion host/matching_simulation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"errors"
"flag"
"fmt"
"math/rand"
"os"
"reflect"
"strings"
Expand All @@ -57,6 +58,7 @@ import (
"golang.org/x/time/rate"

"github.com/uber/cadence/client/history"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/dynamicconfig"
"github.com/uber/cadence/common/partition"
"github.com/uber/cadence/common/persistence"
Expand Down Expand Up @@ -209,11 +211,31 @@ func (s *MatchingSimulationSuite) TestMatchingSimulation() {
go s.collectStats(statsCh, aggStats, &collectorWG)

totalTaskCount := getTotalTasks(s.testClusterConfig.MatchingConfig.SimulationConfig.Tasks)
seed := time.Now().UnixNano()
rand.Seed(seed)
totalBacklogCount := 0
for idx, backlogConfig := range s.testClusterConfig.MatchingConfig.SimulationConfig.Backlogs {
totalBacklogCount += backlogConfig.BacklogCount
partition := getPartitionTaskListName(tasklist, backlogConfig.Partition)
for i := 0; i < backlogConfig.BacklogCount; i++ {
isolationGroup := ""
if len(backlogConfig.IsolationGroups) > 0 {
isolationGroup = randomlyPickKey(backlogConfig.IsolationGroups)
}
decisionTask := newDecisionTask(domainID, partition, isolationGroup, idx)
reqCtx, cancel := context.WithTimeout(ctx, 2*time.Second)
_, err := matchingClients[0].AddDecisionTask(reqCtx, decisionTask)
cancel()
if err != nil {
s.log("Error when adding decision task, err: %v", err)
}
}
}

// Start pollers
numPollers := 0
var tasksToReceive sync.WaitGroup
tasksToReceive.Add(totalTaskCount)
tasksToReceive.Add(totalTaskCount + totalBacklogCount)
var pollerWG sync.WaitGroup
for idx, pollerConfig := range s.testClusterConfig.MatchingConfig.SimulationConfig.Pollers {
for i := 0; i < pollerConfig.getNumPollers(); i++ {
Expand Down Expand Up @@ -266,6 +288,7 @@ func (s *MatchingSimulationSuite) TestMatchingSimulation() {
// Don't change the start/end line format as it is used by scripts to parse the summary info
testSummary := []string{}
testSummary = append(testSummary, "Simulation Summary:")
testSummary = append(testSummary, fmt.Sprintf("Random seed: %v", seed))
testSummary = append(testSummary, fmt.Sprintf("Task generate Duration: %v", aggStats[operationAddDecisionTask].lastUpdated.Sub(startTime)))
testSummary = append(testSummary, fmt.Sprintf("Simulation Duration: %v", executionTime))
testSummary = append(testSummary, fmt.Sprintf("Num of Pollers: %d", numPollers))
Expand Down Expand Up @@ -615,3 +638,32 @@ func getTasklistLoadBalancerStrategy(strategy string) string {
}
return strategy
}

func getPartitionTaskListName(root string, partition int) string {
if partition <= 0 {
return root
}
return fmt.Sprintf("%v%v/%v", common.ReservedTaskListPrefix, root, partition)
}

func randomlyPickKey(weights map[string]int) string {
// Calculate the total weight
totalWeight := 0
for _, weight := range weights {
totalWeight += weight
}

// Generate a random number between 0 and totalWeight - 1
randomWeight := rand.Intn(totalWeight)

// Iterate through the map to find the key corresponding to the random weight
for key, weight := range weights {
if randomWeight < weight {
return key
}
randomWeight -= weight
}

// Return an empty string as a fallback (should not happen if weights are positive)
return ""
}
11 changes: 11 additions & 0 deletions host/onebox.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,8 @@ type (

Tasks []SimulationTaskConfiguration

Backlogs []SimulationBacklogConfiguration

GetPartitionConfigFromDB bool
}

Expand Down Expand Up @@ -225,6 +227,15 @@ type (
MaxTaskToGenerate int
}

SimulationBacklogConfiguration struct {
// The partition number
Partition int // Do not set it to 0, because it's not guaranteed to add backlog to partition 0
// The backlog count
BacklogCount int
// The weight of each isolation group, can be empty
IsolationGroups map[string]int
}

// CadenceParams contains everything needed to bootstrap Cadence
CadenceParams struct {
ClusterMetadata cluster.Metadata
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
enablearchival: false
clusterno: 1
messagingclientconfig:
usemock: true
historyconfig:
numhistoryshards: 4
numhistoryhosts: 1
matchingconfig:
nummatchinghosts: 4
simulationconfig:
tasklistwritepartitions: 4
tasklistreadpartitions: 4
forwardermaxoutstandingpolls: 1
forwardermaxoutstandingtasks: 1
forwardermaxratepersecond: 10
forwardermaxchildrenpernode: 20
localpollwaittime: 10ms
localtaskwaittime: 10ms
tasklistloadbalancerstrategy: weighted
tasks:
- numtaskgenerators: 2
taskspersecond: 80
maxtasktogenerate: 3000
pollers:
- taskprocesstime: 1ms
numpollers: 8
polltimeout: 60s
backlogs:
- partition: 1
backlogcount: 1000
- partition: 2
backlogcount: 2000
workerconfig:
enableasyncwfconsumer: false
33 changes: 33 additions & 0 deletions host/testdata/matching_simulation_with_backlog.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
enablearchival: false
clusterno: 1
messagingclientconfig:
usemock: true
historyconfig:
numhistoryshards: 4
numhistoryhosts: 1
matchingconfig:
nummatchinghosts: 4
simulationconfig:
tasklistwritepartitions: 4
tasklistreadpartitions: 4
forwardermaxoutstandingpolls: 1
forwardermaxoutstandingtasks: 1
forwardermaxratepersecond: 10
forwardermaxchildrenpernode: 20
localpollwaittime: 10ms
localtaskwaittime: 10ms
tasks:
- numtaskgenerators: 2
taskspersecond: 80
maxtasktogenerate: 3000
pollers:
- taskprocesstime: 1ms
numpollers: 8
polltimeout: 60s
backlogs:
- partition: 1
backlogcount: 1000
- partition: 2
backlogcount: 2000
workerconfig:
enableasyncwfconsumer: false

0 comments on commit 291be5d

Please sign in to comment.