Skip to content

Commit

Permalink
Minimize failover impact (cadence-workflow#948)
Browse files Browse the repository at this point in the history
* Add metrics to domain cache
* Use separate max tps for failover
* Add global rate limiter to persistence layer
* Remove noisy log
* Add timer / transfer failover start delay
* Lazy initialize logger
  • Loading branch information
wxing1292 authored Jul 12, 2018
1 parent 1d64401 commit 8156cde
Show file tree
Hide file tree
Showing 41 changed files with 882 additions and 188 deletions.
16 changes: 13 additions & 3 deletions common/cache/domainCache.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/cluster"
"github.com/uber/cadence/common/errors"
"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/common/persistence"

"github.com/uber-common/bark"
Expand All @@ -40,7 +41,7 @@ const (
domainCacheInitialSize = 10 * 1024
domainCacheMaxSize = 64 * 1024
domainCacheTTL = 0 // 0 means infinity
domainCacheEntryTTL = 20 * time.Second
domainCacheEntryTTL = 300 * time.Second
// DomainCacheRefreshInterval domain cache refresh interval
DomainCacheRefreshInterval = 10 * time.Second
domainCacheRefreshPageSize = 100
Expand Down Expand Up @@ -85,6 +86,7 @@ type (
metadataMgr persistence.MetadataManager
clusterMetadata cluster.Metadata
timeSource common.TimeSource
metricsClient metrics.Client
logger bark.Logger

sync.RWMutex
Expand Down Expand Up @@ -113,7 +115,7 @@ type (
)

// NewDomainCache creates a new instance of cache for holding onto domain information to reduce the load on persistence
func NewDomainCache(metadataMgr persistence.MetadataManager, clusterMetadata cluster.Metadata, logger bark.Logger) DomainCache {
func NewDomainCache(metadataMgr persistence.MetadataManager, clusterMetadata cluster.Metadata, metricsClient metrics.Client, logger bark.Logger) DomainCache {
opts := &Options{}
opts.InitialCapacity = domainCacheInitialSize
opts.TTL = domainCacheTTL
Expand All @@ -126,6 +128,7 @@ func NewDomainCache(metadataMgr persistence.MetadataManager, clusterMetadata clu
metadataMgr: metadataMgr,
clusterMetadata: clusterMetadata,
timeSource: common.NewRealTimeSource(),
metricsClient: metricsClient,
logger: logger,
beforeCallbacks: make(map[int]CallbackFn),
afterCallbacks: make(map[int]CallbackFn),
Expand Down Expand Up @@ -305,6 +308,7 @@ func (c *domainCache) refreshDomains() error {
domainNotificationVersion = c.domainNotificationVersion
c.RUnlock()

sw := c.metricsClient.StartTimer(metrics.DomainCacheScope, metrics.DomainCacheTotalCallbacksLatency)
UpdateLoop:
for _, domain := range domains {
if domain.notificationVersion >= domainNotificationVersion {
Expand All @@ -318,7 +322,7 @@ UpdateLoop:
c.updateIDToDomainCache(domain.info.ID, domain)
c.updateNameToIDCache(domain.info.Name, domain.info.ID)
}

sw.Stop()
return nil
}

Expand Down Expand Up @@ -459,6 +463,9 @@ func (c *domainCache) getDomainByID(id string) (*DomainCacheEntry, error) {
}

func (c *domainCache) triggerDomainBeforeChangeCallback(prevDomain *DomainCacheEntry, nextDomain *DomainCacheEntry) {
sw := c.metricsClient.StartTimer(metrics.DomainCacheScope, metrics.DomainCacheBeforeCallbackLatency)
defer sw.Stop()

c.RLock()
defer c.RUnlock()
for _, callback := range c.beforeCallbacks {
Expand All @@ -467,6 +474,9 @@ func (c *domainCache) triggerDomainBeforeChangeCallback(prevDomain *DomainCacheE
}

func (c *domainCache) triggerDomainAfterChangeCallback(prevDomain *DomainCacheEntry, nextDomain *DomainCacheEntry) {
sw := c.metricsClient.StartTimer(metrics.DomainCacheScope, metrics.DomainCacheAfterCallbackLatency)
defer sw.Stop()

c.RLock()
defer c.RUnlock()
for _, callback := range c.afterCallbacks {
Expand Down
5 changes: 4 additions & 1 deletion common/cache/domainCache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@ import (
"testing"

"github.com/pborman/uuid"
"github.com/uber-go/tally"

log "github.com/sirupsen/logrus"
"github.com/stretchr/testify/suite"
"github.com/uber-common/bark"
"github.com/uber/cadence/common/cluster"
"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/common/mocks"
"github.com/uber/cadence/common/persistence"
)
Expand Down Expand Up @@ -68,7 +70,8 @@ func (s *domainCacheSuite) SetupTest() {
s.logger = bark.NewLoggerFromLogrus(log2)
s.clusterMetadata = &mocks.ClusterMetadata{}
s.metadataMgr = &mocks.MetadataManager{}
s.domainCache = NewDomainCache(s.metadataMgr, s.clusterMetadata, s.logger).(*domainCache)
metricsClient := metrics.NewClient(tally.NoopScope, metrics.History)
s.domainCache = NewDomainCache(s.metadataMgr, s.clusterMetadata, metricsClient, s.logger).(*domainCache)
}

func (s *domainCacheSuite) TearDownTest() {
Expand Down
10 changes: 10 additions & 0 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,8 @@ const (
MatchingClientCancelOutstandingPollScope
// MatchingClientDescribeTaskListScope tracks RPC calls to matching service
MatchingClientDescribeTaskListScope
// DomainCacheScope tracks domain cache callbacks
DomainCacheScope

NumCommonScopes
)
Expand Down Expand Up @@ -543,6 +545,7 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{
MatchingClientRespondQueryTaskCompletedScope: {operation: "MatchingClientRespondQueryTaskCompleted"},
MatchingClientCancelOutstandingPollScope: {operation: "MatchingClientCancelOutstandingPoll"},
MatchingClientDescribeTaskListScope: {operation: "MatchingClientDescribeTaskList"},
DomainCacheScope: {operation: "DomainCache"},
},
// Frontend Scope Names
Frontend: {
Expand Down Expand Up @@ -683,6 +686,10 @@ const (
HistoryClientFailures
MatchingClientFailures

DomainCacheTotalCallbacksLatency
DomainCacheBeforeCallbackLatency
DomainCacheAfterCallbackLatency

NumCommonMetrics // Needs to be last on this list for iota numbering
)

Expand Down Expand Up @@ -789,6 +796,9 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
PersistenceErrBusyCounter: {metricName: "persistence.errors.busy", metricType: Counter},
HistoryClientFailures: {metricName: "client.history.errors", metricType: Counter},
MatchingClientFailures: {metricName: "client.matching.errors", metricType: Counter},
DomainCacheTotalCallbacksLatency: {metricName: "domain-cache.total-callbacks.latency", metricType: Timer},
DomainCacheBeforeCallbackLatency: {metricName: "domain-cache.before-callbacks.latency", metricType: Timer},
DomainCacheAfterCallbackLatency: {metricName: "domain-cache.after-callbacks.latency", metricType: Timer},
},
Frontend: {},
History: {
Expand Down
11 changes: 8 additions & 3 deletions common/persistence/cassandraPersistenceClientFactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,14 @@ type (
cassandraPersistenceClientFactory struct {
session *gocql.Session
metricsClient metrics.Client
rateLimiter common.TokenBucket
logger bark.Logger
}
)

// NewCassandraPersistenceClientFactory is used to create an instance of ExecutionManagerFactory implementation
func NewCassandraPersistenceClientFactory(hosts string, port int, user, password, dc string, keyspace string,
numConns int, logger bark.Logger, metricsClient metrics.Client) (ExecutionManagerFactory, error) {
numConns int, logger bark.Logger, rateLimiter common.TokenBucket, metricsClient metrics.Client) (ExecutionManagerFactory, error) {
cluster := common.NewCassandraCluster(hosts, port, user, password, dc)
cluster.Keyspace = keyspace
cluster.ProtoVersion = cassandraProtoVersion
Expand All @@ -51,7 +52,7 @@ func NewCassandraPersistenceClientFactory(hosts string, port int, user, password
return nil, err
}

return &cassandraPersistenceClientFactory{session: session, logger: logger, metricsClient: metricsClient}, nil
return &cassandraPersistenceClientFactory{session: session, logger: logger, metricsClient: metricsClient, rateLimiter: rateLimiter}, nil
}

// CreateExecutionManager implements ExecutionManagerFactory interface
Expand All @@ -62,11 +63,15 @@ func (f *cassandraPersistenceClientFactory) CreateExecutionManager(shardID int)
return nil, err
}

if f.rateLimiter != nil {
mgr = NewWorkflowExecutionPersistenceRateLimitedClient(mgr, f.rateLimiter, f.logger)
}

if f.metricsClient == nil {
return mgr, nil
}

return NewWorkflowExecutionPersistenceClient(mgr, f.metricsClient, f.logger), nil
return NewWorkflowExecutionPersistenceMetricsClient(mgr, f.metricsClient, f.logger), nil
}

// Close releases the underlying resources held by this object
Expand Down
24 changes: 12 additions & 12 deletions common/persistence/persistenceMetricClients.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,53 +72,53 @@ var _ HistoryManager = (*historyPersistenceClient)(nil)
var _ MetadataManager = (*metadataPersistenceClient)(nil)
var _ VisibilityManager = (*visibilityPersistenceClient)(nil)

// NewShardPersistenceClient creates a client to manage shards
func NewShardPersistenceClient(persistence ShardManager, metricClient metrics.Client, logger bark.Logger) ShardManager {
// NewShardPersistenceMetricsClient creates a client to manage shards
func NewShardPersistenceMetricsClient(persistence ShardManager, metricClient metrics.Client, logger bark.Logger) ShardManager {
return &shardPersistenceClient{
persistence: persistence,
metricClient: metricClient,
logger: logger,
}
}

// NewWorkflowExecutionPersistenceClient creates a client to manage executions
func NewWorkflowExecutionPersistenceClient(persistence ExecutionManager, metricClient metrics.Client, logger bark.Logger) ExecutionManager {
// NewWorkflowExecutionPersistenceMetricsClient creates a client to manage executions
func NewWorkflowExecutionPersistenceMetricsClient(persistence ExecutionManager, metricClient metrics.Client, logger bark.Logger) ExecutionManager {
return &workflowExecutionPersistenceClient{
persistence: persistence,
metricClient: metricClient,
logger: logger,
}
}

// NewTaskPersistenceClient creates a client to manage tasks
func NewTaskPersistenceClient(persistence TaskManager, metricClient metrics.Client, logger bark.Logger) TaskManager {
// NewTaskPersistenceMetricsClient creates a client to manage tasks
func NewTaskPersistenceMetricsClient(persistence TaskManager, metricClient metrics.Client, logger bark.Logger) TaskManager {
return &taskPersistenceClient{
persistence: persistence,
metricClient: metricClient,
logger: logger,
}
}

// NewHistoryPersistenceClient creates a HistoryManager client to manage workflow execution history
func NewHistoryPersistenceClient(persistence HistoryManager, metricClient metrics.Client, logger bark.Logger) HistoryManager {
// NewHistoryPersistenceMetricsClient creates a HistoryManager client to manage workflow execution history
func NewHistoryPersistenceMetricsClient(persistence HistoryManager, metricClient metrics.Client, logger bark.Logger) HistoryManager {
return &historyPersistenceClient{
persistence: persistence,
metricClient: metricClient,
logger: logger,
}
}

// NewMetadataPersistenceClient creates a MetadataManager client to manage metadata
func NewMetadataPersistenceClient(persistence MetadataManager, metricClient metrics.Client, logger bark.Logger) MetadataManager {
// NewMetadataPersistenceMetricsClient creates a MetadataManager client to manage metadata
func NewMetadataPersistenceMetricsClient(persistence MetadataManager, metricClient metrics.Client, logger bark.Logger) MetadataManager {
return &metadataPersistenceClient{
persistence: persistence,
metricClient: metricClient,
logger: logger,
}
}

// NewVisibilityPersistenceClient creates a client to manage visibility
func NewVisibilityPersistenceClient(persistence VisibilityManager, metricClient metrics.Client, logger bark.Logger) VisibilityManager {
// NewVisibilityPersistenceMetricsClient creates a client to manage visibility
func NewVisibilityPersistenceMetricsClient(persistence VisibilityManager, metricClient metrics.Client, logger bark.Logger) VisibilityManager {
return &visibilityPersistenceClient{
persistence: persistence,
metricClient: metricClient,
Expand Down
Loading

0 comments on commit 8156cde

Please sign in to comment.