Skip to content

Commit

Permalink
Matching simulation improvements (cadence-workflow#6224)
Browse files Browse the repository at this point in the history
Fix sync matching by increasing the deadline for AddDecisionTask. Sync matching subtracts 1 second from the deadline to determine how long to spend sync matching, and with a deadline of 0.5 seconds it immediately times out.

Add support for parameterizing the configuration file to use. This will allow us to have several scenarios that we evaluate any matching changes against.

Rather than running for a fixed time, run until the expected number of tasks have been consumed. This allows for the benchmark to be used as a measure of throughput and also makes it run faster for short lived scenarios.
  • Loading branch information
natemort authored Aug 13, 2024
1 parent de281a6 commit e7bd499
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 10 deletions.
31 changes: 22 additions & 9 deletions host/matching_simulation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"context"
"flag"
"fmt"
"os"
"reflect"
"strings"
"sync"
Expand All @@ -63,6 +64,7 @@ type operation string

const (
operationPollForDecisionTask operation = "PollForDecisionTask"
defaultTestCase = "testdata/matching_simulation_default.yaml"
)

type operationStats struct {
Expand All @@ -82,7 +84,10 @@ type operationAggStats struct {
func TestMatchingSimulationSuite(t *testing.T) {
flag.Parse()

confPath := "testdata/matching_simulation.yaml"
confPath := os.Getenv("MATCHING_SIMULATION_CASE")
if confPath == "" {
confPath = defaultTestCase
}
clusterConfig, err := GetTestClusterConfig(confPath)
if err != nil {
t.Fatalf("failed creating cluster config from %s, err: %v", confPath, err)
Expand Down Expand Up @@ -176,31 +181,37 @@ func (s *MatchingSimulationSuite) TestMatchingSimulation() {
numPollers := getNumPollers(s.testClusterConfig.MatchingConfig.SimulationConfig.NumPollers)
pollDuration := getPollDuration(s.testClusterConfig.MatchingConfig.SimulationConfig.PollTimeout)
polledTasksCounter := int32(0)
maxTasksToGenerate := getMaxTaskstoGenerate(s.testClusterConfig.MatchingConfig.SimulationConfig.MaxTaskToGenerate)
var tasksToReceive sync.WaitGroup
tasksToReceive.Add(maxTasksToGenerate)
var pollerWG sync.WaitGroup
for i := 0; i < numPollers; i++ {
pollerWG.Add(1)
go s.poll(ctx, matchingClient, domainID, tasklist, &polledTasksCounter, &pollerWG, pollDuration, statsCh)
go s.poll(ctx, matchingClient, domainID, tasklist, &polledTasksCounter, &pollerWG, pollDuration, statsCh, &tasksToReceive)
}

// wait a bit for pollers to start.
time.Sleep(300 * time.Millisecond)

startTime := time.Now()
// Start task generators
generatedTasksCounter := int32(0)
lastTaskScheduleID := int32(0)
numGenerators := getNumGenerators(s.testClusterConfig.MatchingConfig.SimulationConfig.NumTaskGenerators)
taskGenerateInterval := getTaskGenerateInterval(s.testClusterConfig.MatchingConfig.SimulationConfig.TaskGeneratorTickInterval)
maxTasksToGenerate := getMaxTaskstoGenerate(s.testClusterConfig.MatchingConfig.SimulationConfig.MaxTaskToGenerate)
var generatorWG sync.WaitGroup
for i := 1; i <= numGenerators; i++ {
generatorWG.Add(1)
go s.generate(ctx, matchingClient, domainID, tasklist, maxTasksToGenerate, taskGenerateInterval, &generatedTasksCounter, &lastTaskScheduleID, &generatorWG)
}

// Let it run for a while
sleepDuration := 60 * time.Second
s.log("Wait %v for simulation to run", sleepDuration)
time.Sleep(sleepDuration)
// Let it run until all tasks have been polled.
// There's a test timeout configured in docker/buildkite/docker-compose-local-matching-simulation.yml that you
// can change if your test case needs more time
s.log("Waiting until all tasks are received")
tasksToReceive.Wait()
executionTime := time.Now().Sub(startTime)
s.log("Completed benchmark in %v", (time.Now().Sub(startTime)))
s.log("Canceling context to stop pollers and task generators")
cancel()
pollerWG.Wait()
Expand All @@ -216,7 +227,7 @@ func (s *MatchingSimulationSuite) TestMatchingSimulation() {
// Don't change the start/end line format as it is used by scripts to parse the summary info
testSummary := []string{}
testSummary = append(testSummary, "Simulation Summary:")
testSummary = append(testSummary, fmt.Sprintf("Simulation Duration: %v", sleepDuration))
testSummary = append(testSummary, fmt.Sprintf("Simulation Duration: %v", executionTime))
testSummary = append(testSummary, fmt.Sprintf("Num of Pollers: %d", numPollers))
testSummary = append(testSummary, fmt.Sprintf("Poll Timeout: %v", pollDuration))
testSummary = append(testSummary, fmt.Sprintf("Num of Task Generators: %d", numGenerators))
Expand Down Expand Up @@ -277,7 +288,7 @@ func (s *MatchingSimulationSuite) generate(
return
}
decisionTask := newDecisionTask(domainID, tasklist, scheduleID)
reqCtx, cancel := context.WithTimeout(ctx, 500*time.Millisecond)
reqCtx, cancel := context.WithTimeout(ctx, 2*time.Second)
err := matchingClient.AddDecisionTask(reqCtx, decisionTask)
cancel()
if err != nil {
Expand All @@ -299,6 +310,7 @@ func (s *MatchingSimulationSuite) poll(
wg *sync.WaitGroup,
pollDuration time.Duration,
statsCh chan *operationStats,
tasksToReceive *sync.WaitGroup,
) {
defer wg.Done()
t := time.NewTicker(50 * time.Millisecond)
Expand Down Expand Up @@ -344,6 +356,7 @@ func (s *MatchingSimulationSuite) poll(

atomic.AddInt32(polledTasksCounter, 1)
s.log("PollForDecisionTask got a task with startedid: %d. resp: %+v", resp.StartedEventID, resp)
tasksToReceive.Done()
}
}
}
Expand Down
23 changes: 23 additions & 0 deletions host/testdata/matching_simulation_burst.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
enablearchival: false
clusterno: 1
messagingclientconfig:
usemock: true
historyconfig:
numhistoryshards: 4
numhistoryhosts: 1
matchingconfig:
nummatchinghosts: 4
simulationconfig:
tasklistwritepartitions: 2
tasklistreadpartitions: 2
numpollers: 10
numtaskgenerators: 2
taskgeneratortickinterval: 10ms
maxtasktogenerate: 1500
polltimeout: 5s
forwardermaxoutstandingpolls: 20
forwardermaxoutstandingtasks: 1
forwardermaxratepersecond: 10
forwardermaxchildrenpernode: 20
workerconfig:
enableasyncwfconsumer: false
File renamed without changes.
3 changes: 2 additions & 1 deletion scripts/run_matching_simulator.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ resultFolder="matching-simulator-output"
mkdir -p "$resultFolder"
eventLogsFile="$resultFolder/events.json"
testSummaryFile="$resultFolder/$testName-summary.txt"
testCase="testdata/matching_simulation_${1:-default}.yaml"


echo "Building test image"
Expand All @@ -19,7 +20,7 @@ docker-compose -f docker/buildkite/docker-compose-local-matching-simulation.yml
echo "Running the test"
docker-compose \
-f docker/buildkite/docker-compose-local-matching-simulation.yml \
run --rm matching-simulator \
run -e MATCHING_SIMULATION_CASE=$testCase --rm matching-simulator \
| grep -a --line-buffered "Matching New Event" \
| sed "s/Matching New Event: //" \
| jq . > "$eventLogsFile"
Expand Down

0 comments on commit e7bd499

Please sign in to comment.