Skip to content

Commit

Permalink
Support adaptive tasklist partitioner in matching simulations (cadenc…
Browse files Browse the repository at this point in the history
  • Loading branch information
taylanisikdemir authored Dec 10, 2024
1 parent 536caf7 commit 0e96b3e
Show file tree
Hide file tree
Showing 13 changed files with 158 additions and 42 deletions.
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ test_eventsV2.log
test_eventsV2_xdc
test_eventsV2_xdc.log
matching-simulator-output/
simulation_comparison.csv

# Executables produced by cadence repo
/cadence
Expand Down
24 changes: 18 additions & 6 deletions common/stats/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/service/matching/event"
)

type (
Expand All @@ -43,13 +44,13 @@ type (
done chan struct{}
status *atomic.Int32
firstBucket bool

qps *atomic.Float64
counter *atomic.Int64
baseEvent event.E
qps *atomic.Float64
counter *atomic.Int64
}
)

func NewEmaFixedWindowQPSTracker(timeSource clock.TimeSource, exp float64, bucketInterval time.Duration) QPSTracker {
func NewEmaFixedWindowQPSTracker(timeSource clock.TimeSource, exp float64, bucketInterval time.Duration, baseEvent event.E) QPSTracker {
return &emaFixedWindowQPSTracker{
timeSource: timeSource,
exp: exp,
Expand All @@ -60,6 +61,7 @@ func NewEmaFixedWindowQPSTracker(timeSource clock.TimeSource, exp float64, bucke
firstBucket: true,
counter: atomic.NewInt64(0),
qps: atomic.NewFloat64(0),
baseEvent: baseEvent,
}
}

Expand Down Expand Up @@ -89,13 +91,23 @@ func (r *emaFixedWindowQPSTracker) reportLoop() {
func (r *emaFixedWindowQPSTracker) report() {
if r.firstBucket {
counter := r.counter.Swap(0)
r.qps.Store(float64(counter) / r.bucketIntervalSeconds)
r.store(float64(counter) / r.bucketIntervalSeconds)
r.firstBucket = false
return
}
counter := r.counter.Swap(0)
qps := r.qps.Load()
r.qps.Store(qps*(1-r.exp) + float64(counter)*r.exp/r.bucketIntervalSeconds)
r.store(qps*(1-r.exp) + float64(counter)*r.exp/r.bucketIntervalSeconds)
}

func (r *emaFixedWindowQPSTracker) store(qps float64) {
r.qps.Store(qps)
e := r.baseEvent
e.EventName = "QPSTrackerUpdate"
e.Payload = map[string]any{
"QPS": qps,
}
event.Log(e)
}

func (r *emaFixedWindowQPSTracker) Stop() {
Expand Down
7 changes: 4 additions & 3 deletions common/stats/stats_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,14 @@ import (
"time"

"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/service/matching/event"
)

// Benchmark the ReportCounter function to see how it handles frequent updates
func BenchmarkReportCounter(b *testing.B) {
timeSource := clock.NewRealTimeSource()
// Initialize the QPS reporter with a smoothing factor and a 1 second bucket interval
reporter := NewEmaFixedWindowQPSTracker(timeSource, 0.5, time.Second)
reporter := NewEmaFixedWindowQPSTracker(timeSource, 0.5, time.Second, event.E{})
reporter.Start()

// Run the benchmark for b.N iterations
Expand All @@ -52,7 +53,7 @@ func BenchmarkReportCounter(b *testing.B) {
func BenchmarkQPS(b *testing.B) {
timeSource := clock.NewRealTimeSource()
// Initialize the QPS reporter
reporter := NewEmaFixedWindowQPSTracker(timeSource, 0.5, time.Second)
reporter := NewEmaFixedWindowQPSTracker(timeSource, 0.5, time.Second, event.E{})
reporter.Start()

// Simulate a number of report updates before calling QPS
Expand All @@ -75,7 +76,7 @@ func BenchmarkQPS(b *testing.B) {
func BenchmarkFullReport(b *testing.B) {
timeSource := clock.NewRealTimeSource()
// Initialize the QPS reporter
reporter := NewEmaFixedWindowQPSTracker(timeSource, 0.5, time.Millisecond*100) // 100ms bucket interval
reporter := NewEmaFixedWindowQPSTracker(timeSource, 0.5, time.Millisecond*100, event.E{}) // 100ms bucket interval
reporter.Start()

var wg sync.WaitGroup
Expand Down
3 changes: 2 additions & 1 deletion common/stats/stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"time"

"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/service/matching/event"
)

const floatResolution = 1e-6
Expand All @@ -37,7 +38,7 @@ func TestEmaFixedWindowQPSTracker(t *testing.T) {
exp := 0.4
bucketInterval := time.Second

r := NewEmaFixedWindowQPSTracker(timeSource, exp, bucketInterval)
r := NewEmaFixedWindowQPSTracker(timeSource, exp, bucketInterval, event.E{})
r.Start()
defer r.Stop()

Expand Down
11 changes: 9 additions & 2 deletions host/matching_simulation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ To run locally:
Full test logs can be found at test.log file. Event json logs can be found at matching-simulator-output folder.
See the run_matching_simulator.sh script for more details about how to parse events.
If you want to run all the scenarios and compare them refer to tools/matchingsimulationcomparison/README.md
If you want to run multiple scenarios and compare them refer to tools/matchingsimulationcomparison/README.md
*/
package host

Expand Down Expand Up @@ -119,6 +119,12 @@ func TestMatchingSimulationSuite(t *testing.T) {
dynamicconfig.AllIsolationGroups: isolationGroups,
dynamicconfig.TasklistLoadBalancerStrategy: getTasklistLoadBalancerStrategy(clusterConfig.MatchingConfig.SimulationConfig.TasklistLoadBalancerStrategy),
dynamicconfig.MatchingEnableGetNumberOfPartitionsFromCache: clusterConfig.MatchingConfig.SimulationConfig.GetPartitionConfigFromDB,
dynamicconfig.MatchingEnableAdaptiveScaler: clusterConfig.MatchingConfig.SimulationConfig.EnableAdaptiveScaler,
dynamicconfig.MatchingPartitionDownscaleFactor: clusterConfig.MatchingConfig.SimulationConfig.PartitionDownscaleFactor,
dynamicconfig.MatchingPartitionUpscaleRPS: clusterConfig.MatchingConfig.SimulationConfig.PartitionUpscaleRPS,
dynamicconfig.MatchingPartitionUpscaleSustainedDuration: clusterConfig.MatchingConfig.SimulationConfig.PartitionUpscaleSustainedDuration,
dynamicconfig.MatchingPartitionDownscaleSustainedDuration: clusterConfig.MatchingConfig.SimulationConfig.PartitionDownscaleSustainedDuration,
dynamicconfig.MatchingAdaptiveScalerUpdateInterval: clusterConfig.MatchingConfig.SimulationConfig.AdaptiveScalerUpdateInterval,
}

ctrl := gomock.NewController(t)
Expand Down Expand Up @@ -190,7 +196,8 @@ func (s *MatchingSimulationSuite) TestMatchingSimulation() {
domainID := s.domainID(ctx)
tasklist := "my-tasklist"

if s.testClusterConfig.MatchingConfig.SimulationConfig.GetPartitionConfigFromDB {
if s.testClusterConfig.MatchingConfig.SimulationConfig.GetPartitionConfigFromDB &&
!s.testClusterConfig.MatchingConfig.SimulationConfig.EnableAdaptiveScaler {
_, err := s.testCluster.GetMatchingClient().UpdateTaskListPartitionConfig(ctx, &types.MatchingUpdateTaskListPartitionConfigRequest{
DomainUUID: domainID,
TaskList: &types.TaskList{Name: tasklist, Kind: types.TaskListKindNormal.Ptr()},
Expand Down
10 changes: 10 additions & 0 deletions host/onebox.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,17 @@ type (

Backlogs []SimulationBacklogConfiguration

// GetPartitionConfigFromDB indicates whether to get the partition config from DB or not.
// This is a prerequisite for adaptive scaler.
GetPartitionConfigFromDB bool

// Adaptive scaler configurations
EnableAdaptiveScaler bool
PartitionDownscaleFactor float64
PartitionUpscaleRPS float64
PartitionUpscaleSustainedDuration time.Duration
PartitionDownscaleSustainedDuration time.Duration
AdaptiveScalerUpdateInterval time.Duration
}

SimulationPollerConfiguration struct {
Expand Down
35 changes: 35 additions & 0 deletions host/testdata/matching_simulation_burst_adaptive.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
enablearchival: false
clusterno: 1
messagingclientconfig:
usemock: true
historyconfig:
numhistoryshards: 4
numhistoryhosts: 1
matchingconfig:
nummatchinghosts: 4
simulationconfig:
tasklistwritepartitions: 0 # this doesn't matter. adaptive scaler will start from 1
tasklistreadpartitions: 0 # this doesn't matter. adaptive scaler will start from 1
forwardermaxoutstandingpolls: 1
forwardermaxoutstandingtasks: 1
forwardermaxratepersecond: 10
forwardermaxchildrenpernode: 20
localpollwaittime: 10ms
localtaskwaittime: 10ms
tasks:
- numtaskgenerators: 10
taskspersecond: 500
maxtasktogenerate: 10000
pollers:
- taskprocesstime: 1ms
numpollers: 8
polltimeout: 60s
getpartitionconfigfromdb: true
enableadaptivescaler: true
partitiondownscalefactor: 0.75
partitionupscalerps: 200
partitionupscalesustainedduration: 5s
partitiondownscalesustainedduration: 5s
adaptivescalerupdateinterval: 1s
workerconfig:
enableasyncwfconsumer: false
2 changes: 1 addition & 1 deletion scripts/run_matching_simulator.sh
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ timestamp="${2:-$now}"
testName="test-$testCase-$timestamp"
resultFolder="matching-simulator-output"
mkdir -p "$resultFolder"
eventLogsFile="$resultFolder/events.json"
eventLogsFile="$resultFolder/$testName-events.json"
testSummaryFile="$resultFolder/$testName-summary.txt"

echo "Building test image"
Expand Down
15 changes: 14 additions & 1 deletion service/matching/tasklist/adaptive_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/uber/cadence/common/stats"
"github.com/uber/cadence/common/types"
"github.com/uber/cadence/service/matching/config"
"github.com/uber/cadence/service/matching/event"
)

type (
Expand All @@ -65,6 +66,7 @@ type (
overLoadStartTime time.Time
underLoad bool
underLoadStartTime time.Time
baseEvent event.E
}
)

Expand All @@ -77,6 +79,7 @@ func NewAdaptiveScaler(
logger log.Logger,
scope metrics.Scope,
matchingClient matching.Client,
baseEvent event.E,
) AdaptiveScaler {
ctx, cancel := context.WithCancel(context.Background())
return &adaptiveScalerImpl{
Expand All @@ -93,6 +96,7 @@ func NewAdaptiveScaler(
cancel: cancel,
overLoad: false,
underLoad: false,
baseEvent: baseEvent,
}
}

Expand Down Expand Up @@ -140,10 +144,19 @@ func (a *adaptiveScalerImpl) run() {
// adjust the number of read partitions
numReadPartitions := a.adjustReadPartitions(partitionConfig.NumReadPartitions, numWritePartitions)

e := a.baseEvent
e.EventName = "AdaptiveScalerCalculationResult"
e.Payload = map[string]any{
"NumReadPartitions": numReadPartitions,
"NumWritePartitions": numWritePartitions,
"QPS": qps,
}
event.Log(e)

if numReadPartitions == partitionConfig.NumReadPartitions && numWritePartitions == partitionConfig.NumWritePartitions {
return
}
a.logger.Info("update the number of partitions", tag.CurrentQPS(qps), tag.NumReadPartitions(numReadPartitions), tag.NumWritePartitions(numWritePartitions), tag.Dynamic("task-list-partition-config", partitionConfig))
a.logger.Info("adaptive scaler is updating number of partitions", tag.CurrentQPS(qps), tag.NumReadPartitions(numReadPartitions), tag.NumWritePartitions(numWritePartitions), tag.Dynamic("task-list-partition-config", partitionConfig))
a.scope.IncCounter(metrics.CadenceRequests)
err := a.tlMgr.UpdateTaskListPartitionConfig(a.ctx, &types.TaskListPartitionConfig{
NumReadPartitions: numReadPartitions,
Expand Down
3 changes: 2 additions & 1 deletion service/matching/tasklist/adaptive_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/uber/cadence/common/stats"
"github.com/uber/cadence/common/types"
"github.com/uber/cadence/service/matching/config"
"github.com/uber/cadence/service/matching/event"
)

type mockAdaptiveScalerDeps struct {
Expand Down Expand Up @@ -73,7 +74,7 @@ func setupMocksForAdaptiveScaler(t *testing.T, taskListID *Identifier) (*adaptiv
}

cfg := newTaskListConfig(taskListID, config.NewConfig(dynamicconfig.NewCollection(dynamicClient, logger), "test-host", func() []string { return nil }), "test-domain")
scaler := NewAdaptiveScaler(taskListID, mockManager, mockQPSTracker, cfg, mockTimeSource, logger, scope, mockMatchingClient).(*adaptiveScalerImpl)
scaler := NewAdaptiveScaler(taskListID, mockManager, mockQPSTracker, cfg, mockTimeSource, logger, scope, mockMatchingClient, event.E{}).(*adaptiveScalerImpl)
return scaler, deps
}

Expand Down
12 changes: 10 additions & 2 deletions service/matching/tasklist/task_list_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,11 +212,17 @@ func NewManager(
tlMgr.logger.Info("Task list manager stopping because no recent events", tag.Dynamic("interval", livenessInterval))
tlMgr.Stop()
})
tlMgr.qpsTracker = stats.NewEmaFixedWindowQPSTracker(timeSource, 0.5, 10*time.Second)

baseEvent := event.E{
TaskListName: taskList.GetName(),
TaskListKind: taskListKind,
TaskListType: taskList.GetType(),
}
tlMgr.qpsTracker = stats.NewEmaFixedWindowQPSTracker(timeSource, 0.5, 10*time.Second, baseEvent)
if taskList.IsRoot() && *taskListKind == types.TaskListKindNormal {
adaptiveScalerScope := common.NewPerTaskListScope(domainName, taskList.GetName(), *taskListKind, metricsClient, metrics.MatchingAdaptiveScalerScope).
Tagged(getTaskListTypeTag(taskList.GetType()))
tlMgr.adaptiveScaler = NewAdaptiveScaler(taskList, tlMgr, tlMgr.qpsTracker, taskListConfig, timeSource, tlMgr.logger, adaptiveScalerScope, matchingClient)
tlMgr.adaptiveScaler = NewAdaptiveScaler(taskList, tlMgr, tlMgr.qpsTracker, taskListConfig, timeSource, tlMgr.logger, adaptiveScalerScope, matchingClient, baseEvent)
}
var isolationGroups []string
if tlMgr.isIsolationMatcherEnabled() {
Expand Down Expand Up @@ -388,6 +394,8 @@ func (c *taskListManagerImpl) RefreshTaskListPartitionConfig(ctx context.Context
return nil
}

// UpdateTaskListPartitionConfig updates the task list partition config. It is called by adaptive scaler component on the root partition.
// Root tasklist manager will update the partition config in the database and notify all non-root partitions.
func (c *taskListManagerImpl) UpdateTaskListPartitionConfig(ctx context.Context, config *types.TaskListPartitionConfig) error {
c.startWG.Wait()
numberOfPartitionsToRefresh, currentConfig, err := c.updatePartitionConfig(ctx, config)
Expand Down
14 changes: 10 additions & 4 deletions tools/matchingsimulationcomparison/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,18 @@ Note: The parsing logic might break in the future if the `run_matching_simulator

Run all the scenarios and compare:
```
go run tools/matchingsimulationcomparison/*.go --out simulation_comparison.csv
go run tools/matchingsimulationcomparison/*.go
```

If you have already run some scenarios before and made changes in the output/comparison then run in Compare mode
Run subset of scenarios and compare:
```
go run tools/matchingsimulationcomparison/*.go --out simulation_comparison.csv \
--ts 2024-09-12-18-16-44 \
go run tools/matchingsimulationcomparison/*.go \
--scenarios "burst"
```

If you have already run some scenarios before and made changes in the csv output then run in Compare mode
```
go run tools/matchingsimulationcomparison/*.go \
--ts 2024-11-27-21-29-55 \
--mode Compare
```
Loading

0 comments on commit 0e96b3e

Please sign in to comment.