Skip to content

Commit

Permalink
Update XDC integration test to use add remote cluster api (temporalio…
Browse files Browse the repository at this point in the history
…#2424)

* Update XDC integration test to use add remote cluster api
  • Loading branch information
yux0 authored Jan 28, 2022
1 parent 7388783 commit aceda14
Show file tree
Hide file tree
Showing 11 changed files with 66 additions and 26 deletions.
16 changes: 13 additions & 3 deletions common/cluster/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ import (
"sync/atomic"
"time"

"go.temporal.io/server/common/dynamicconfig"

"go.temporal.io/server/common"
"go.temporal.io/server/common/collection"
"go.temporal.io/server/common/log"
Expand All @@ -44,7 +46,6 @@ import (
const (
defaultClusterMetadataPageSize = 100
refreshInterval = time.Minute
refreshFailureInterval = time.Second * 30

FakeClusterForEmptyVersion = "fake-cluster-for-empty-version"
)
Expand Down Expand Up @@ -106,6 +107,7 @@ type (
status int32
clusterMetadataStore persistence.ClusterMetadataManager
refresher *goro.Handle
refreshDuration dynamicconfig.DurationPropertyFn
logger log.Logger

// Immutable fields
Expand Down Expand Up @@ -139,6 +141,7 @@ func NewMetadata(
currentClusterName string,
clusterInfo map[string]ClusterInformation,
clusterMetadataStore persistence.ClusterMetadataManager,
refreshDuration dynamicconfig.DurationPropertyFn,
logger log.Logger,
) Metadata {
if len(clusterInfo) == 0 {
Expand Down Expand Up @@ -166,6 +169,9 @@ func NewMetadata(
for k, v := range clusterInfo {
copyClusterInfo[k] = v
}
if refreshDuration == nil {
refreshDuration = dynamicconfig.GetDurationPropertyFn(refreshInterval)
}
return &metadataImpl{
status: common.DaemonStatusInitialized,
enableGlobalNamespace: enableGlobalNamespace,
Expand All @@ -177,12 +183,14 @@ func NewMetadata(
clusterChangeCallback: make(map[string]CallbackFn),
clusterMetadataStore: clusterMetadataStore,
logger: logger,
refreshDuration: refreshDuration,
}
}

func NewMetadataFromConfig(
config *Config,
clusterMetadataStore persistence.ClusterMetadataManager,
dynamicCollection *dynamicconfig.Collection,
logger log.Logger,
) Metadata {
return NewMetadata(
Expand All @@ -192,6 +200,7 @@ func NewMetadataFromConfig(
config.CurrentClusterName,
config.ClusterInformation,
clusterMetadataStore,
dynamicCollection.GetDurationProperty(dynamicconfig.ClusterMetadataRefreshInterval, refreshInterval),
logger,
)
}
Expand All @@ -206,6 +215,7 @@ func NewMetadataForTest(
config.CurrentClusterName,
config.ClusterInformation,
nil,
nil,
log.NewNoopLogger(),
)
}
Expand Down Expand Up @@ -344,7 +354,7 @@ func (m *metadataImpl) UnRegisterMetadataChangeCallback(callbackId string) {
}

func (m *metadataImpl) refreshLoop(ctx context.Context) error {
timer := time.NewTicker(refreshInterval)
timer := time.NewTicker(m.refreshDuration())
defer timer.Stop()

for {
Expand All @@ -355,7 +365,7 @@ func (m *metadataImpl) refreshLoop(ctx context.Context) error {
for err := m.refreshClusterMetadata(ctx); err != nil; err = m.refreshClusterMetadata(ctx) {
m.logger.Error("Error refreshing remote cluster metadata", tag.Error(err))
select {
case <-time.After(refreshFailureInterval):
case <-time.After(m.refreshDuration() / 2):
case <-ctx.Done():
return nil
}
Expand Down
4 changes: 4 additions & 0 deletions common/cluster/metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ package cluster
import (
"context"
"testing"
"time"

"go.temporal.io/server/common/dynamicconfig"

"github.com/golang/mock/gomock"
"github.com/pborman/uuid"
Expand Down Expand Up @@ -95,6 +98,7 @@ func (s *metadataSuite) SetupTest() {
s.clusterName,
clusterInfo,
s.mockClusterMetadataStore,
dynamicconfig.GetDurationPropertyFn(time.Second),
log.NewNoopLogger(),
).(*metadataImpl)
}
Expand Down
3 changes: 3 additions & 0 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ var Keys = map[Key]string{
EnableAuthorization: "system.enableAuthorization",
EnableCrossNamespaceCommands: "system.enableCrossNamespaceCommands",
ForceSearchAttributesCacheRefreshOnRead: "system.forceSearchAttributesCacheRefreshOnRead",
ClusterMetadataRefreshInterval: "system.clusterMetadataRefreshInterval",

// size limit
BlobSizeLimitError: "limit.blobSize.error",
Expand Down Expand Up @@ -418,6 +419,8 @@ const (
EnableAuthorization
// EnableCrossNamespaceCommands is the key to enable commands for external namespaces
EnableCrossNamespaceCommands
// ClusterMetadataRefreshInterval is config to manage cluster metadata table refresh interval
ClusterMetadataRefreshInterval
// BlobSizeLimitError is the per event blob size limit
BlobSizeLimitError
// BlobSizeLimitWarn is the per event blob size limit for warning
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ func (s *TestBase) Setup(clusterMetadataConfig *cluster.Config) {
s.ClusterMetadataManager, err = factory.NewClusterMetadataManager()
s.fatalOnError("NewClusterMetadataManager", err)

s.ClusterMetadata = cluster.NewMetadataFromConfig(clusterMetadataConfig, s.ClusterMetadataManager, s.Logger)
s.ClusterMetadata = cluster.NewMetadataFromConfig(clusterMetadataConfig, s.ClusterMetadataManager, dynamicconfig.NewNoopCollection(), s.Logger)
s.SearchAttributesManager = searchattribute.NewManager(clock.NewRealTimeSource(), s.ClusterMetadataManager, dynamicconfig.GetBoolPropertyFn(true))

s.MetadataManager, err = factory.NewMetadataManager()
Expand Down
1 change: 1 addition & 0 deletions common/resource/resourceImpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ func New(
params.ClusterMetadataConfig.CurrentClusterName,
params.ClusterMetadataConfig.ClusterInformation,
persistenceBean.GetClusterMetadataManager(),
nil,
logger,
)

Expand Down
1 change: 1 addition & 0 deletions host/dynamicconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ var (
dynamicconfig.ReplicationTaskFetcherAggregationInterval: 200 * time.Millisecond,
dynamicconfig.ReplicationTaskFetcherErrorRetryWait: 50 * time.Millisecond,
dynamicconfig.ReplicationTaskProcessorErrorRetryWait: time.Millisecond,
dynamicconfig.ClusterMetadataRefreshInterval: 100 * time.Millisecond,
}
)

Expand Down
12 changes: 1 addition & 11 deletions host/testdata/xdc_integration_es_clusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,6 @@
initialFailoverVersion: 1
rpcName: "frontend"
rpcAddress: "127.0.0.1:9134"
standby-es:
enabled: true
initialFailoverVersion: 2
rpcName: "frontend"
rpcAddress: "127.0.0.1:10134"
enablearchival: false
workerconfig:
enablearchiver: false
Expand All @@ -38,14 +33,9 @@
clustermetadata:
enableGlobalNamespace: true
failoverVersionIncrement: 10
masterClusterName: "active-es"
masterClusterName: "standby-es"
currentClusterName: "standby-es"
clusterInformation:
active-es:
enabled: true
initialFailoverVersion: 1
rpcName: "frontend"
rpcAddress: "127.0.0.1:9134"
standby-es:
enabled: true
initialFailoverVersion: 2
Expand Down
12 changes: 1 addition & 11 deletions host/testdata/xdc_integration_test_clusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,6 @@
initialFailoverVersion: 1
rpcName: "frontend"
rpcAddress: "127.0.0.1:7134"
standby:
enabled: true
initialFailoverVersion: 2
rpcName: "frontend"
rpcAddress: "127.0.0.1:8134"
enablearchival: false
workerconfig:
enablearchiver: false
Expand All @@ -30,14 +25,9 @@
clustermetadata:
enableGlobalNamespace: true
failoverVersionIncrement: 10
masterClusterName: "active"
masterClusterName: "standby"
currentClusterName: "standby"
clusterInformation:
active:
enabled: true
initialFailoverVersion: 1
rpcName: "frontend"
rpcAddress: "127.0.0.1:7134"
standby:
enabled: true
initialFailoverVersion: 2
Expand Down
18 changes: 18 additions & 0 deletions host/xdc/elasticsearch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ import (
"testing"
"time"

"go.temporal.io/server/api/adminservice/v1"

"github.com/pborman/uuid"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
Expand Down Expand Up @@ -124,6 +126,22 @@ func (s *esCrossDCTestSuite) SetupSuite() {
s.Require().NoError(err)
s.cluster2 = c

cluster1Address := clusterConfigs[0].ClusterMetadata.ClusterInformation[clusterConfigs[0].ClusterMetadata.CurrentClusterName].RPCAddress
cluster2Address := clusterConfigs[1].ClusterMetadata.ClusterInformation[clusterConfigs[1].ClusterMetadata.CurrentClusterName].RPCAddress
_, err = s.cluster1.GetAdminClient().AddOrUpdateRemoteCluster(host.NewContext(), &adminservice.AddOrUpdateRemoteClusterRequest{
FrontendAddress: cluster2Address,
EnableRemoteClusterConnection: true,
})
s.Require().NoError(err)

_, err = s.cluster2.GetAdminClient().AddOrUpdateRemoteCluster(host.NewContext(), &adminservice.AddOrUpdateRemoteClusterRequest{
FrontendAddress: cluster1Address,
EnableRemoteClusterConnection: true,
})
s.Require().NoError(err)
// Wait for cluster metadata to refresh new added clusters
time.Sleep(time.Millisecond * 200)

s.esClient = host.CreateESClient(s.Suite, s.clusterConfigs[0].ESConfig, s.logger)
host.PutIndexTemplate(s.Suite, s.esClient, fmt.Sprintf("../testdata/es_%s_index_template.json", s.clusterConfigs[0].ESConfig.Version), "test-visibility-template")
host.CreateIndex(s.Suite, s.esClient, s.clusterConfigs[0].ESConfig.GetVisibilityIndex())
Expand Down
22 changes: 22 additions & 0 deletions host/xdc/integration_failover_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ import (
"testing"
"time"

"go.temporal.io/server/api/adminservice/v1"

"github.com/pborman/uuid"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
Expand Down Expand Up @@ -129,6 +131,26 @@ func (s *integrationClustersTestSuite) SetupSuite() {
c, err = host.NewCluster(clusterConfigs[1], log.With(s.logger, tag.ClusterName(clusterName[1])))
s.Require().NoError(err)
s.cluster2 = c

cluster1Address := clusterConfigs[0].ClusterMetadata.ClusterInformation[clusterConfigs[0].ClusterMetadata.CurrentClusterName].RPCAddress
cluster2Address := clusterConfigs[1].ClusterMetadata.ClusterInformation[clusterConfigs[1].ClusterMetadata.CurrentClusterName].RPCAddress
_, err = s.cluster1.GetAdminClient().AddOrUpdateRemoteCluster(
host.NewContext(),
&adminservice.AddOrUpdateRemoteClusterRequest{
FrontendAddress: cluster2Address,
EnableRemoteClusterConnection: true,
})
s.Require().NoError(err)

_, err = s.cluster2.GetAdminClient().AddOrUpdateRemoteCluster(
host.NewContext(),
&adminservice.AddOrUpdateRemoteClusterRequest{
FrontendAddress: cluster1Address,
EnableRemoteClusterConnection: true,
})
s.Require().NoError(err)
// Wait for cluster metadata to refresh new added clusters
time.Sleep(time.Millisecond * 200)
}

func (s *integrationClustersTestSuite) SetupTest() {
Expand Down
1 change: 1 addition & 0 deletions tools/cli/namespaceUtils.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,7 @@ func initializeClusterMetadata(
clusterMetadata.CurrentClusterName,
clusterMetadata.ClusterInformation,
nil,
nil,
log.NewNoopLogger(),
)
}
Expand Down

0 comments on commit aceda14

Please sign in to comment.