Skip to content

Commit

Permalink
add helper functions in domain cache for events replication (cadence-…
Browse files Browse the repository at this point in the history
…workflow#618)

* add helper functions in domain cache for events replication
* bugfix: domain cache should be per host, not per shard
  • Loading branch information
wxing1292 authored Mar 20, 2018
1 parent 1d939e6 commit ae6b94c
Show file tree
Hide file tree
Showing 20 changed files with 428 additions and 69 deletions.
4 changes: 2 additions & 2 deletions .gen/go/shared/idl.go

Large diffs are not rendered by default.

188 changes: 188 additions & 0 deletions .gen/go/shared/types.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

105 changes: 84 additions & 21 deletions common/cache/domainCache.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,11 @@ import (
"sync"
"time"

workflow "github.com/uber/cadence/.gen/go/shared"

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/cluster"
"github.com/uber/cadence/common/errors"
"github.com/uber/cadence/common/persistence"

"github.com/uber-common/bark"
Expand All @@ -50,34 +54,40 @@ type (
}

domainCache struct {
cacheByName Cache
cacheByID Cache
metadataMgr persistence.MetadataManager
timeSource common.TimeSource
logger bark.Logger
cacheByName Cache
cacheByID Cache
metadataMgr persistence.MetadataManager
clusterMetadata cluster.Metadata
timeSource common.TimeSource
logger bark.Logger
}

domainCacheEntry struct {
Info *persistence.DomainInfo
Config *persistence.DomainConfig
ReplicationConfig *persistence.DomainReplicationConfig
clusterMetadata cluster.Metadata
info *persistence.DomainInfo
config *persistence.DomainConfig
replicationConfig *persistence.DomainReplicationConfig
configVersion int64
failoverVersion int64
isGlobalDomain bool
expiry time.Time
sync.RWMutex
}
)

// NewDomainCache creates a new instance of cache for holding onto domain information to reduce the load on persistence
func NewDomainCache(metadataMgr persistence.MetadataManager, logger bark.Logger) DomainCache {
func NewDomainCache(metadataMgr persistence.MetadataManager, clusterMetadata cluster.Metadata, logger bark.Logger) DomainCache {
opts := &Options{}
opts.InitialCapacity = domainCacheInitialSize
opts.TTL = domainCacheTTL

return &domainCache{
cacheByName: New(domainCacheMaxSize, opts),
cacheByID: New(domainCacheMaxSize, opts),
metadataMgr: metadataMgr,
timeSource: common.NewRealTimeSource(),
logger: logger,
cacheByName: New(domainCacheMaxSize, opts),
cacheByID: New(domainCacheMaxSize, opts),
metadataMgr: metadataMgr,
clusterMetadata: clusterMetadata,
timeSource: common.NewRealTimeSource(),
logger: logger,
}
}

Expand All @@ -103,7 +113,7 @@ func (c *domainCache) GetDomainID(name string) (string, error) {
if err != nil {
return "", err
}
return entry.Info.ID, nil
return entry.info.ID, nil
}

// GetDomain retrieves the information from the cache if it exists, otherwise retrieves the information from metadata
Expand Down Expand Up @@ -151,9 +161,12 @@ func (c *domainCache) getDomain(key, id, name string, cache Cache) (*domainCache
return nil, err
}

entry.Info = response.Info
entry.Config = response.Config
entry.ReplicationConfig = response.ReplicationConfig
entry.info = response.Info
entry.config = response.Config
entry.replicationConfig = response.ReplicationConfig
entry.configVersion = response.ConfigVersion
entry.failoverVersion = response.FailoverVersion
entry.isGlobalDomain = response.IsGlobalDomain
entry.expiry = now.Add(domainEntryRefreshInterval)
}

Expand All @@ -162,12 +175,62 @@ func (c *domainCache) getDomain(key, id, name string, cache Cache) (*domainCache

func (entry *domainCacheEntry) duplicate() *domainCacheEntry {
result := newDomainCacheEntry()
result.Info = entry.Info
result.Config = entry.Config
result.ReplicationConfig = entry.ReplicationConfig
result.info = entry.info
result.config = entry.config
result.replicationConfig = entry.replicationConfig
result.configVersion = entry.configVersion
result.failoverVersion = entry.failoverVersion
result.isGlobalDomain = entry.isGlobalDomain
return result
}

func (entry *domainCacheEntry) isExpired(now time.Time) bool {
return entry.expiry.IsZero() || now.After(entry.expiry)
}

func (entry *domainCacheEntry) GetInfo() *persistence.DomainInfo {
return entry.info
}

func (entry *domainCacheEntry) GetConfig() *persistence.DomainConfig {
return entry.config
}

func (entry *domainCacheEntry) GetReplicationConfig() *persistence.DomainReplicationConfig {
return entry.replicationConfig
}

func (entry *domainCacheEntry) GetConfigVersion() int64 {
return entry.configVersion
}

func (entry *domainCacheEntry) GetFailoverVersion() int64 {
return entry.failoverVersion
}

func (entry *domainCacheEntry) GetIsGlobalDomain() bool {
return entry.isGlobalDomain
}

func (entry *domainCacheEntry) IsDomainActive() bool {
if !entry.isGlobalDomain {
// domain is not a global domain, meaning domain is always "active" within each cluster
return true
}
return entry.clusterMetadata.GetCurrentClusterName() == entry.replicationConfig.ActiveClusterName
}

func (entry *domainCacheEntry) ShouldReplicateEvent() bool {
// frontend guarantee that the clusters always contains the active domain, so if the # of clusters is 1
// then we do not need to send out any events for replication
return entry.clusterMetadata.GetCurrentClusterName() == entry.replicationConfig.ActiveClusterName &&
entry.isGlobalDomain && len(entry.replicationConfig.Clusters) > 1
}

func (entry *domainCacheEntry) GetDomainNotActiveErr() *workflow.DomainNotActiveError {
if entry.IsDomainActive() {
// domain is consider active
return nil
}
return errors.NewDomainNotActiveError(entry.info.Name, entry.clusterMetadata.GetCurrentClusterName(), entry.replicationConfig.ActiveClusterName)
}
Loading

0 comments on commit ae6b94c

Please sign in to comment.