Skip to content

Commit

Permalink
Add integration test for domain failover (cadence-workflow#731)
Browse files Browse the repository at this point in the history
* Add multi cluster setup for domain failover integration test

* comment test until background check is ready
  • Loading branch information
vancexu authored May 21, 2018
1 parent 8d7b467 commit 23bb0db
Show file tree
Hide file tree
Showing 11 changed files with 474 additions and 43 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
.DS_Store
test
test.log
test_xdc
test_xdc.log

# Executables produced by cadence repo
/cadence*
11 changes: 10 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,12 @@ language: go
directories:
- $HOME/.glide/cache
go:
- "1.10"
- 1.10.x

env:
global:
- ZOOKEEPER_PEERS=localhost:2181
- KAFKA_PEERS=localhost:9092

addons:
apt:
Expand All @@ -19,6 +24,10 @@ addons:

before_install:
- pip install --user ccm
- wget http://www.us.apache.org/dist/kafka/1.1.0/kafka_2.12-1.1.0.tgz -O kafka.tgz
- mkdir -p kafka && tar xzf kafka.tgz -C kafka --strip-components 1
- nohup bash -c "cd kafka && bin/zookeeper-server-start.sh config/zookeeper.properties &"
- nohup bash -c "cd kafka && bin/kafka-server-start.sh config/server.properties &"

install:
- go get -u github.com/Masterminds/glide
Expand Down
16 changes: 13 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ BUILD := ./build
TOOLS_CMD_ROOT=./cmd/tools
INTEG_TEST_ROOT=./host
INTEG_TEST_DIR=host
INTEG_TEST_XDC_ROOT=./hostxdc
INTEG_TEST_XDC_DIR=hostxdc

define thriftrwrule
THRIFTRW_GEN_SRC += $(THRIFT_GENDIR)/go/$1/$1.go
Expand All @@ -47,13 +49,12 @@ ALL_SRC := $(shell find . -name "*.go" | grep -v -e Godeps -e vendor \
TOOLS_SRC := $(shell find ./tools -name "*.go")
TOOLS_SRC += $(TOOLS_CMD_ROOT)

# all directories with *_test.go files in them
TEST_DIRS := $(sort $(dir $(filter %_test.go,$(ALL_SRC))))
# all directories with *_test.go files in them (exclude hostxdc)
TEST_DIRS := $(filter-out $(INTEG_TEST_XDC_ROOT)%, $(sort $(dir $(filter %_test.go,$(ALL_SRC)))))

# all tests other than integration test fall into the pkg_test category
PKG_TEST_DIRS := $(filter-out $(INTEG_TEST_ROOT)%,$(TEST_DIRS))


# Need the following option to have integration tests
# count towards coverage. godoc below:
# -coverpkg pkg1,pkg2,pkg3
Expand Down Expand Up @@ -96,6 +97,10 @@ test: vendor/glide.updated bins
@rm -f test.log
@for dir in $(TEST_DIRS); do \
go test -timeout 15m -race -coverprofile=$@ "$$dir" | tee -a test.log; \
done; \
# need to run xdc tests with race detector off because of ringpop bug causing data race issue
@for dir in $(INTEG_TEST_XDC_ROOT); do \
go test -timeout 15m -coverprofile=$@ "$$dir" | tee -a test.log; \
done;

cover_profile: clean bins_nothrift
Expand All @@ -107,6 +112,11 @@ cover_profile: clean bins_nothrift
@time go test $(INTEG_TEST_ROOT) $(TEST_ARG) $(GOCOVERPKG_ARG) -coverprofile=$(BUILD)/$(INTEG_TEST_DIR)/coverage.out || exit 1;
@cat $(BUILD)/$(INTEG_TEST_DIR)/coverage.out | grep -v "mode: atomic" >> $(BUILD)/cover.out

@echo Running integration test for cross dc
@mkdir -p $(BUILD)/$(INTEG_TEST_XDC_DIR)
@time go test $(INTEG_TEST_XDC_ROOT) $(GOCOVERPKG_ARG) -coverprofile=$(BUILD)/$(INTEG_TEST_XDC_DIR)/coverage.out || exit 1;
@cat $(BUILD)/$(INTEG_TEST_XDC_DIR)/coverage.out | grep -v "mode: atomic" >> $(BUILD)/cover.out

@echo Running package tests:
@for dir in $(PKG_TEST_DIRS); do \
mkdir -p $(BUILD)/"$$dir"; \
Expand Down
4 changes: 2 additions & 2 deletions common/cluster/metadataTestBase.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ const (
// TestFailoverVersionIncrement is failover version increment used for test
TestFailoverVersionIncrement = int64(10)
// TestCurrentClusterName is current cluster used for test
TestCurrentClusterName = "current-cluster"
TestCurrentClusterName = "active"
// TestAlternativeClusterName is alternative cluster used for test
TestAlternativeClusterName = "alternative-cluster"
TestAlternativeClusterName = "standby"
)

var (
Expand Down
26 changes: 16 additions & 10 deletions common/persistence/persistenceTestBase.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,20 +110,26 @@ func (g *testTransferTaskIDGenerator) GetNextTransferTaskID() (int64, error) {
}

// SetupWorkflowStoreWithOptions to setup workflow test base
func (s *TestBase) SetupWorkflowStoreWithOptions(options TestBaseOptions) {
func (s *TestBase) SetupWorkflowStoreWithOptions(options TestBaseOptions, metadata cluster.Metadata) {
log := bark.NewLoggerFromLogrus(log.New())

s.ClusterMetadata = cluster.GetTestClusterMetadata(
options.EnableGlobalDomain,
options.IsMasterCluster,
)
if metadata == nil {
s.ClusterMetadata = cluster.GetTestClusterMetadata(
options.EnableGlobalDomain,
options.IsMasterCluster,
)
} else {
s.ClusterMetadata = metadata
log = log.WithField("Cluster", metadata.GetCurrentClusterName())
}
currentClusterName := s.ClusterMetadata.GetCurrentClusterName()

// Setup Workflow keyspace and deploy schema for tests
s.CassandraTestCluster.setupTestCluster(options)
shardID := 0
var err error
s.ShardMgr, err = NewCassandraShardPersistence(options.ClusterHost, options.ClusterPort, options.ClusterUser,
options.ClusterPassword, options.Datacenter, s.CassandraTestCluster.keyspace, s.ClusterMetadata.GetCurrentClusterName(), log)
options.ClusterPassword, options.Datacenter, s.CassandraTestCluster.keyspace, currentClusterName, log)
if err != nil {
log.Fatal(err)
}
Expand Down Expand Up @@ -151,7 +157,7 @@ func (s *TestBase) SetupWorkflowStoreWithOptions(options TestBaseOptions) {
}

s.MetadataManager, err = NewCassandraMetadataPersistence(options.ClusterHost, options.ClusterPort, options.ClusterUser,
options.ClusterPassword, options.Datacenter, s.CassandraTestCluster.keyspace, s.ClusterMetadata.GetCurrentClusterName(), log)
options.ClusterPassword, options.Datacenter, s.CassandraTestCluster.keyspace, currentClusterName, log)
if err != nil {
log.Fatal(err)
}
Expand All @@ -173,8 +179,8 @@ func (s *TestBase) SetupWorkflowStoreWithOptions(options TestBaseOptions) {
TransferAckLevel: 0,
ReplicationAckLevel: 0,
TimerAckLevel: time.Time{},
ClusterTimerAckLevel: map[string]time.Time{cluster.TestCurrentClusterName: time.Time{}},
ClusterTransferAckLevel: map[string]int64{cluster.TestCurrentClusterName: 0},
ClusterTimerAckLevel: map[string]time.Time{currentClusterName: time.Time{}},
ClusterTransferAckLevel: map[string]int64{currentClusterName: 0},
}

err1 := s.ShardMgr.CreateShard(&CreateShardRequest{
Expand Down Expand Up @@ -979,7 +985,7 @@ func (s *TestBase) SetupWorkflowStore() {
ClusterPassword: testPassword,
DropKeySpace: true,
EnableGlobalDomain: false,
})
}, nil)
}

// TearDownWorkflowStore to cleanup
Expand Down
12 changes: 12 additions & 0 deletions common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (

workflow "github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/common/backoff"
"math/rand"
)

const (
Expand Down Expand Up @@ -179,3 +180,14 @@ func IsValidContext(ctx context.Context) error {
}
return nil
}

// GenerateRandomString is used for generate test string
func GenerateRandomString(n int) string {
rand.Seed(time.Now().UnixNano())
letterRunes := []rune("random")
b := make([]rune, n)
for i := range b {
b[i] = letterRunes[rand.Intn(len(letterRunes))]
}
return string(b)
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,15 @@ type (
// override suite.Suite.Assertions with require.Assertions; this means that s.NotNil(nil) will stop the test,
// not merely log an error
*require.Assertions
suite.Suite
persistence.TestBase
domainName string
foreignDomainName string
mockMessagingClient messaging.Client
mockProducer messaging.Producer
host Cadence
engine wsc.Interface
logger bark.Logger
suite.Suite
persistence.TestBase
}
)

Expand Down Expand Up @@ -101,7 +101,7 @@ func (s *integrationCrossDCSuite) setupTest(enableGlobalDomain bool, isMasterClu
options.SchemaDir = ".."
options.EnableGlobalDomain = enableGlobalDomain
options.IsMasterCluster = isMasterCluster
s.SetupWorkflowStoreWithOptions(options)
s.SetupWorkflowStoreWithOptions(options, nil)

s.setupShards()

Expand All @@ -110,7 +110,7 @@ func (s *integrationCrossDCSuite) setupTest(enableGlobalDomain bool, isMasterClu
s.mockMessagingClient = mocks.NewMockMessagingClient(s.mockProducer, nil)

s.host = NewCadence(s.ClusterMetadata, s.mockMessagingClient, s.MetadataManager, s.ShardMgr, s.HistoryMgr, s.ExecutionMgrFactory, s.TaskMgr,
s.VisibilityMgr, testNumberOfHistoryShards, testNumberOfHistoryHosts, s.logger)
s.VisibilityMgr, testNumberOfHistoryShards, testNumberOfHistoryHosts, s.logger, 0, false)

s.host.Start()

Expand Down
9 changes: 4 additions & 5 deletions host/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ type (
// override suite.Suite.Assertions with require.Assertions; this means that s.NotNil(nil) will stop the test,
// not merely log an error
*require.Assertions
suite.Suite
persistence.TestBase
domainName string
domainID string
foreignDomainName string
Expand All @@ -64,8 +66,6 @@ type (
host Cadence
engine wsc.Interface
logger bark.Logger
suite.Suite
persistence.TestBase
}

decisionTaskHandler func(execution *workflow.WorkflowExecution, wt *workflow.WorkflowType,
Expand Down Expand Up @@ -138,7 +138,7 @@ func (s *integrationSuite) setupSuite(enableGlobalDomain bool, isMasterCluster b
options.SchemaDir = ".."
options.EnableGlobalDomain = enableGlobalDomain
options.IsMasterCluster = isMasterCluster
s.SetupWorkflowStoreWithOptions(options)
s.SetupWorkflowStoreWithOptions(options, nil)

s.setupShards()

Expand All @@ -147,8 +147,7 @@ func (s *integrationSuite) setupSuite(enableGlobalDomain bool, isMasterCluster b
s.mockMessagingClient = mocks.NewMockMessagingClient(s.mockProducer, nil)

s.host = NewCadence(s.ClusterMetadata, s.mockMessagingClient, s.MetadataManager, s.ShardMgr, s.HistoryMgr, s.ExecutionMgrFactory, s.TaskMgr,
s.VisibilityMgr, testNumberOfHistoryShards, testNumberOfHistoryHosts, s.logger)

s.VisibilityMgr, testNumberOfHistoryShards, testNumberOfHistoryHosts, s.logger, 0, false)
s.host.Start()

s.engine = s.host.GetFrontendClient()
Expand Down
Loading

0 comments on commit 23bb0db

Please sign in to comment.