Skip to content

Commit

Permalink
Merge branch 'master' into 3+DC-merge
Browse files Browse the repository at this point in the history
  • Loading branch information
Wenquan Xing committed Jun 20, 2019
2 parents e7b37b6 + 02c18ce commit 59eb51c
Show file tree
Hide file tree
Showing 117 changed files with 7,366 additions and 1,747 deletions.
33 changes: 32 additions & 1 deletion .buildkite/pipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ steps:
queue: "workers"
docker: "*"
command: "make cover_integration_ci"
retry:
automatic:
limit: 1
plugins:
- docker-compose#v3.0.0:
run: integration-test-cassandra
Expand All @@ -24,6 +27,9 @@ steps:
queue: "workers"
docker: "*"
command: "make cover_integration_ci EVENTSV2=true"
retry:
automatic:
limit: 1
plugins:
- docker-compose#v3.0.0:
run: integration-test-cassandra
Expand All @@ -34,6 +40,9 @@ steps:
queue: "workers"
docker: "*"
command: "make cover_xdc_ci"
retry:
automatic:
limit: 1
plugins:
- docker-compose#v3.0.0:
run: integration-test-xdc-cassandra
Expand All @@ -44,6 +53,9 @@ steps:
queue: "workers"
docker: "*"
command: "make cover_integration_ci"
retry:
automatic:
limit: 1
plugins:
- docker-compose#v3.0.0:
run: integration-test-mysql
Expand All @@ -54,6 +66,9 @@ steps:
queue: "workers"
docker: "*"
command: "make cover_integration_ci EVENTSV2=true"
retry:
automatic:
limit: 1
plugins:
- docker-compose#v3.0.0:
run: integration-test-mysql
Expand All @@ -64,9 +79,25 @@ steps:
queue: "workers"
docker: "*"
command: "make cover_xdc_ci"
retry:
automatic:
limit: 1
plugins:
- docker-compose#v3.0.0:
run: integration-test-xdc-mysql
config: docker/buildkite/docker-compose.yml

- wait
- wait

- label: ":docker: build and push master"
agents:
queue: "workers"
docker: "*"
command: "scripts/buildkite/docker-push.sh"
retry:
automatic:
limit: 1
plugins:
- docker-login#v2.0.1:
username: ubercadence
password-env: DOCKER_LOGIN_PASSWORD
5 changes: 2 additions & 3 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 2 additions & 3 deletions cmd/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,15 +143,14 @@ func (s *server) startService() common.Daemon {
}
params.ClusterMetadata = cluster.NewMetadata(
params.Logger,
params.MetricsClient,
dc.GetBoolProperty(dynamicconfig.EnableGlobalDomain, clusterMetadata.EnableGlobalDomain),
clusterMetadata.FailoverVersionIncrement,
clusterMetadata.MasterClusterName,
clusterMetadata.CurrentClusterName,
clusterMetadata.ClusterInformation,
archivalStatus,
archivalStatus(),
s.cfg.Archival.DefaultBucket,
enableReadFromArchival,
enableReadFromArchival(),
)
params.DispatcherProvider = client.NewIPYarpcDispatcherProvider()
params.ESConfig = &s.cfg.ElasticSearch
Expand Down
61 changes: 50 additions & 11 deletions common/cache/domainCache.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,6 @@ const (
DomainCacheRefreshInterval = 10 * time.Second
domainCacheRefreshPageSize = 100

domainCacheLocked int32 = 0
domainCacheReleased int32 = 1

domainCacheInitialized int32 = 0
domainCacheStarted int32 = 1
domainCacheStopped int32 = 2
Expand Down Expand Up @@ -118,7 +115,13 @@ type (
)

// NewDomainCache creates a new instance of cache for holding onto domain information to reduce the load on persistence
func NewDomainCache(metadataMgr persistence.MetadataManager, clusterMetadata cluster.Metadata, metricsClient metrics.Client, logger log.Logger) DomainCache {
func NewDomainCache(
metadataMgr persistence.MetadataManager,
clusterMetadata cluster.Metadata,
metricsClient metrics.Client,
logger log.Logger,
) DomainCache {

cache := &domainCache{
status: domainCacheInitialized,
shutdownChan: make(chan struct{}),
Expand Down Expand Up @@ -149,12 +152,15 @@ func newDomainCacheEntry(clusterMetadata cluster.Metadata) *DomainCacheEntry {
return &DomainCacheEntry{clusterMetadata: clusterMetadata}
}

// NewDomainCacheEntryWithReplicationForTest returns an entry with test data
func NewDomainCacheEntryWithReplicationForTest(info *persistence.DomainInfo,
// NewGlobalDomainCacheEntryForTest returns an entry with test data
func NewGlobalDomainCacheEntryForTest(
info *persistence.DomainInfo,
config *persistence.DomainConfig,
repConfig *persistence.DomainReplicationConfig,
failoverVersion int64,
clusterMetadata cluster.Metadata) *DomainCacheEntry {
clusterMetadata cluster.Metadata,
) *DomainCacheEntry {

return &DomainCacheEntry{
info: info,
config: config,
Expand All @@ -165,11 +171,44 @@ func NewDomainCacheEntryWithReplicationForTest(info *persistence.DomainInfo,
}
}

// NewDomainCacheEntryForTest returns an entry with domainInfo
func NewDomainCacheEntryForTest(info *persistence.DomainInfo, config *persistence.DomainConfig) *DomainCacheEntry {
// NewLocalDomainCacheEntryForTest returns an entry with test data
func NewLocalDomainCacheEntryForTest(
info *persistence.DomainInfo,
config *persistence.DomainConfig,
targetCluster string,
clusterMetadata cluster.Metadata,
) *DomainCacheEntry {

return &DomainCacheEntry{
info: info,
config: config,
isGlobalDomain: false,
replicationConfig: &persistence.DomainReplicationConfig{
ActiveClusterName: targetCluster,
Clusters: []*persistence.ClusterReplicationConfig{{ClusterName: targetCluster}},
},
failoverVersion: common.EmptyVersion,
clusterMetadata: clusterMetadata,
}
}

// NewDomainCacheEntryForTest returns an entry with test data
func NewDomainCacheEntryForTest(
info *persistence.DomainInfo,
config *persistence.DomainConfig,
isGlobalDomain bool,
repConfig *persistence.DomainReplicationConfig,
failoverVersion int64,
clusterMetadata cluster.Metadata,
) *DomainCacheEntry {

return &DomainCacheEntry{
info: info,
config: config,
info: info,
config: config,
isGlobalDomain: isGlobalDomain,
replicationConfig: repConfig,
failoverVersion: failoverVersion,
clusterMetadata: clusterMetadata,
}
}

Expand Down
43 changes: 9 additions & 34 deletions common/cluster/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ import (
"fmt"

"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/common/service/config"
"github.com/uber/cadence/common/service/dynamicconfig"
)
Expand Down Expand Up @@ -56,8 +54,7 @@ type (
}

metadataImpl struct {
logger log.Logger
metricsClient metrics.Client
logger log.Logger
// EnableGlobalDomain whether the global domain is enabled,
// this attr should be discarded when cross DC is made public
enableGlobalDomain dynamicconfig.BoolPropertyFn
Expand All @@ -75,27 +72,22 @@ type (
// clusterToAddress contains the cluster name to corresponding frontend client
clusterToAddress map[string]config.Address

// archivalStatus is cluster's archival status
archivalStatus dynamicconfig.StringPropertyFn
// defaultBucket is the default archival bucket name used for this cluster
defaultBucket string
// enableReadFromArchival whether reading history from archival is enabled
enableReadFromArchival dynamicconfig.BoolPropertyFn
// archivalConfig is cluster's archival config
archivalConfig *ArchivalConfig
}
)

// NewMetadata create a new instance of Metadata
func NewMetadata(
logger log.Logger,
metricsClient metrics.Client,
enableGlobalDomain dynamicconfig.BoolPropertyFn,
failoverVersionIncrement int64,
masterClusterName string,
currentClusterName string,
clusterInfo map[string]config.ClusterInformation,
archivalStatus dynamicconfig.StringPropertyFn,
archivalStatus string,
defaultBucket string,
enableReadFromArchival dynamicconfig.BoolPropertyFn,
enableReadFromArchival bool,
) Metadata {

if len(clusterInfo) == 0 {
Expand Down Expand Up @@ -137,27 +129,19 @@ func NewMetadata(
panic("Cluster info initial versions have duplicates")
}

status, err := getArchivalStatus(archivalStatus())
status, err := getArchivalStatus(archivalStatus)
if err != nil {
panic(err)
}
archivalConfig := NewArchivalConfig(status, defaultBucket, enableReadFromArchival())
if !archivalConfig.isValid() {
panic("Archival config is not valid")
}

return &metadataImpl{
logger: logger,
metricsClient: metricsClient,
enableGlobalDomain: enableGlobalDomain,
failoverVersionIncrement: failoverVersionIncrement,
masterClusterName: masterClusterName,
currentClusterName: currentClusterName,
clusterInfo: clusterInfo,
versionToClusterName: versionToClusterName,
archivalStatus: archivalStatus,
defaultBucket: defaultBucket,
enableReadFromArchival: enableReadFromArchival,
archivalConfig: NewArchivalConfig(status, defaultBucket, enableReadFromArchival),
}
}

Expand Down Expand Up @@ -224,15 +208,6 @@ func (metadata *metadataImpl) ClusterNameForFailoverVersion(failoverVersion int6
}

// ArchivalConfig returns the archival config of the cluster.
// This method always return a well formed ArchivalConfig (this means ArchivalConfig().IsValid always returns true).
func (metadata *metadataImpl) ArchivalConfig() (retCfg *ArchivalConfig) {
inputStatus := metadata.archivalStatus()
status, err := getArchivalStatus(inputStatus)
if err != nil {
metadata.logger.Error("error getting archival config, invalid archival status in dynamic config",
tag.ArchivalClusterArchivalStatus(inputStatus),
tag.Error(err))
metadata.metricsClient.IncCounter(metrics.ClusterMetadataArchivalConfigScope, metrics.ArchivalConfigFailures)
}
return NewArchivalConfig(status, metadata.defaultBucket, metadata.enableReadFromArchival())
func (metadata *metadataImpl) ArchivalConfig() *ArchivalConfig {
return metadata.archivalConfig
}
11 changes: 4 additions & 7 deletions common/cluster/metadataTestBase.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ package cluster
import (
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/log/loggerimpl"
"github.com/uber/cadence/common/metrics/mocks"
"github.com/uber/cadence/common/service/config"
"github.com/uber/cadence/common/service/dynamicconfig"
)
Expand Down Expand Up @@ -93,28 +92,26 @@ func GetTestClusterMetadata(enableGlobalDomain bool, isMasterCluster bool, enabl
if enableGlobalDomain {
return NewMetadata(
loggerimpl.NewNopLogger(),
&mocks.Client{},
dynamicconfig.GetBoolPropertyFn(true),
TestFailoverVersionIncrement,
masterClusterName,
TestCurrentClusterName,
TestAllClusterInfo,
dynamicconfig.GetStringPropertyFn(archivalStatus),
archivalStatus,
clusterDefaultBucket,
dynamicconfig.GetBoolPropertyFn(enableArchival),
enableArchival,
)
}

return NewMetadata(
loggerimpl.NewNopLogger(),
&mocks.Client{},
dynamicconfig.GetBoolPropertyFn(false),
TestFailoverVersionIncrement,
TestCurrentClusterName,
TestCurrentClusterName,
TestSingleDCClusterInfo,
dynamicconfig.GetStringPropertyFn(archivalStatus),
archivalStatus,
clusterDefaultBucket,
dynamicconfig.GetBoolPropertyFn(enableArchival),
enableArchival,
)
}
8 changes: 6 additions & 2 deletions common/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,16 @@ const (
)

const (
// SystemDomainName is domain name for all cadence system workflows
SystemDomainName = "cadence-system"
// SystemGlobalDomainName is global domain name for cadence system workflows running globally
SystemGlobalDomainName = "cadence-system-global"
// SystemLocalDomainName is domain name for cadence system workflows running in local cluster
SystemLocalDomainName = "cadence-system"
// SystemDomainID is domain id for all cadence system workflows
SystemDomainID = "32049b68-7872-4094-8e63-d0dd59896a83"
// SystemDomainRetentionDays is retention config for all cadence system workflows
SystemDomainRetentionDays = 7
// DefaultAdminOperationToken is the default dynamic config value for AdminOperationToken
DefaultAdminOperationToken = "CadenceTeamONLY"
)

const (
Expand Down
1 change: 1 addition & 0 deletions common/elasticsearch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ func NewClient(config *Config) (Client, error) {
client, err := elastic.NewClient(
elastic.SetURL(config.URL.String()),
elastic.SetRetrier(elastic.NewBackoffRetrier(elastic.NewExponentialBackoff(128*time.Millisecond, 513*time.Millisecond))),
elastic.SetDecoder(&elastic.NumberDecoder{}), // critical to ensure decode of int64 won't lose precise
)
if err != nil {
return nil, err
Expand Down
5 changes: 5 additions & 0 deletions common/log/tag/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -716,3 +716,8 @@ func ArchivalDeterministicConstructionCheckFailReason(deterministicConstructionC
func ArchivalNonDeterministicBlobKey(nondeterministicBlobKey string) Tag {
return newStringTag("archival-non-deterministic-blob-key", nondeterministicBlobKey)
}

// ArchivalBlobIntegrityCheckFailReason returns tag for ArchivalBlobIntegrityCheckFailReason
func ArchivalBlobIntegrityCheckFailReason(blobIntegrityCheckFailReason string) Tag {
return newStringTag("archival-blob-integrity-check-fail-reason", blobIntegrityCheckFailReason)
}
1 change: 1 addition & 0 deletions common/log/tag/values.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ var (
ComponentIndexerESProcessor = component("indexer-es-processor")
ComponentESVisibilityManager = component("es-visibility-manager")
ComponentArchiver = component("archiver")
ComponentBatcher = component("batcher")
ComponentWorker = component("worker")
ComponentServiceResolver = component("service-resolver")
)
Expand Down
Loading

0 comments on commit 59eb51c

Please sign in to comment.