Skip to content

Commit

Permalink
Added tests for visibility sampling wrapper (cadence-workflow#5564)
Browse files Browse the repository at this point in the history
* Simplify sampled visibility manager
  • Loading branch information
3vilhamster authored Dec 29, 2023
1 parent 85d6afc commit efef2e0
Show file tree
Hide file tree
Showing 6 changed files with 497 additions and 137 deletions.
18 changes: 13 additions & 5 deletions common/persistence/client/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"sync"

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/config"
es "github.com/uber/cadence/common/elasticsearch"
"github.com/uber/cadence/common/log"
Expand All @@ -38,6 +39,7 @@ import (
"github.com/uber/cadence/common/persistence/sql"
"github.com/uber/cadence/common/persistence/wrappers/errorinjectors"
"github.com/uber/cadence/common/persistence/wrappers/ratelimited"
"github.com/uber/cadence/common/persistence/wrappers/sampled"
pnt "github.com/uber/cadence/common/pinot"
"github.com/uber/cadence/common/quotas"
"github.com/uber/cadence/common/service"
Expand Down Expand Up @@ -399,11 +401,17 @@ func (f *factoryImpl) newDBVisibilityManager(
result = ratelimited.NewVisibilityManager(result, ds.ratelimit)
}
if visibilityConfig.EnableDBVisibilitySampling != nil && visibilityConfig.EnableDBVisibilitySampling() {
result = p.NewVisibilitySamplingClient(result, &p.SamplingConfig{
VisibilityClosedMaxQPS: visibilityConfig.WriteDBVisibilityClosedMaxQPS,
VisibilityListMaxQPS: visibilityConfig.DBVisibilityListMaxQPS,
VisibilityOpenMaxQPS: visibilityConfig.WriteDBVisibilityOpenMaxQPS,
}, f.metricsClient, f.logger)
result = sampled.NewVisibilityManager(result, sampled.Params{
Config: &sampled.Config{
VisibilityClosedMaxQPS: visibilityConfig.WriteDBVisibilityClosedMaxQPS,
VisibilityListMaxQPS: visibilityConfig.DBVisibilityListMaxQPS,
VisibilityOpenMaxQPS: visibilityConfig.WriteDBVisibilityOpenMaxQPS,
},
MetricClient: f.metricsClient,
Logger: f.logger,
TimeSource: clock.NewRealTimeSource(),
RateLimiterFactoryFunc: sampled.NewDomainToBucketMap,
})
}
if f.metricsClient != nil {
result = p.NewVisibilityPersistenceMetricsClient(result, f.metricsClient, f.logger, f.config)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,14 @@ import (
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"

"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/dynamicconfig"
"github.com/uber/cadence/common/log/testlogger"
"github.com/uber/cadence/common/metrics"
mmocks "github.com/uber/cadence/common/metrics/mocks"
"github.com/uber/cadence/common/mocks"
p "github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/persistence/wrappers/sampled"
"github.com/uber/cadence/common/types"
)

Expand Down Expand Up @@ -66,13 +68,19 @@ func (s *VisibilitySamplingSuite) SetupTest() {
s.Assertions = require.New(s.T()) // Have to define our overridden assertions in the test setup. If we did it earlier, s.T() will return nil

s.persistence = &mocks.VisibilityManager{}
config := &p.SamplingConfig{
config := &sampled.Config{
VisibilityOpenMaxQPS: dynamicconfig.GetIntPropertyFilteredByDomain(1),
VisibilityClosedMaxQPS: dynamicconfig.GetIntPropertyFilteredByDomain(10),
VisibilityListMaxQPS: dynamicconfig.GetIntPropertyFilteredByDomain(1),
}
s.metricClient = &mmocks.Client{}
s.client = p.NewVisibilitySamplingClient(s.persistence, config, s.metricClient, testlogger.New(s.T()))
s.client = sampled.NewVisibilityManager(s.persistence, sampled.Params{
Config: config,
MetricClient: s.metricClient,
Logger: testlogger.New(s.T()),
TimeSource: clock.NewRealTimeSource(),
RateLimiterFactoryFunc: sampled.NewDomainToBucketMap,
})
}

func (s *VisibilitySamplingSuite) TearDownTest() {
Expand Down
75 changes: 75 additions & 0 deletions common/persistence/wrappers/sampled/tokenbucketfactory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// The MIT License (MIT)

// Copyright (c) 2017-2020 Uber Technologies Inc.

// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.

package sampled

import (
"sync"

"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/dynamicconfig"
"github.com/uber/cadence/common/tokenbucket"
)

type RateLimiterFactoryFunc func(timeSource clock.TimeSource, numOfPriority int, qpsConfig dynamicconfig.IntPropertyFnWithDomainFilter) RateLimiterFactory

type RateLimiterFactory interface {
GetRateLimiter(domain string) tokenbucket.PriorityTokenBucket
}

type domainToBucketMap struct {
sync.RWMutex
timeSource clock.TimeSource
qpsConfig dynamicconfig.IntPropertyFnWithDomainFilter
numOfPriority int
mappings map[string]tokenbucket.PriorityTokenBucket
}

// NewDomainToBucketMap returns a rate limiter factory.
func NewDomainToBucketMap(timeSource clock.TimeSource, numOfPriority int, qpsConfig dynamicconfig.IntPropertyFnWithDomainFilter) RateLimiterFactory {
return &domainToBucketMap{
timeSource: timeSource,
qpsConfig: qpsConfig,
numOfPriority: numOfPriority,
mappings: make(map[string]tokenbucket.PriorityTokenBucket),
}
}

func (m *domainToBucketMap) GetRateLimiter(domain string) tokenbucket.PriorityTokenBucket {
m.RLock()
rateLimiter, exist := m.mappings[domain]
m.RUnlock()

if exist {
return rateLimiter
}

m.Lock()
if rateLimiter, ok := m.mappings[domain]; ok { // read again to ensure no duplicate create
m.Unlock()
return rateLimiter
}
rateLimiter = tokenbucket.NewFullPriorityTokenBucket(m.numOfPriority, m.qpsConfig(domain), m.timeSource)
m.mappings[domain] = rateLimiter
m.Unlock()
return rateLimiter
}
42 changes: 42 additions & 0 deletions common/persistence/wrappers/sampled/tokenbucketfactory_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// The MIT License (MIT)

// Copyright (c) 2017-2020 Uber Technologies Inc.

// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.

package sampled

import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/dynamicconfig"
)

func TestDomainToBucketMap(t *testing.T) {
mockedTime := clock.NewMockedTimeSource()
factory := NewDomainToBucketMap(mockedTime, 1, dynamicconfig.GetIntPropertyFilteredByDomain(1))

// Test that the factory returns the same bucket for the same domain
bucket1 := factory.GetRateLimiter("domain1")
bucket2 := factory.GetRateLimiter("domain1")
assert.Equal(t, bucket1, bucket2, "domain bucket should return the same bucket for the same domain")
}
Loading

0 comments on commit efef2e0

Please sign in to comment.