Skip to content

Commit

Permalink
Add ES integration test (cadence-workflow#1703)
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt authored Apr 16, 2019
1 parent 8d19cbf commit 515018f
Show file tree
Hide file tree
Showing 16 changed files with 532 additions and 106 deletions.
10 changes: 7 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ ifndef PERSISTENCE_TYPE
override PERSISTENCE_TYPE = cassandra
endif

ifdef TEST_TAG
override TEST_TAG := -tags $(TEST_TAG)
endif

define thriftrwrule
THRIFTRW_GEN_SRC += $(THRIFT_GENDIR)/go/$1/$1.go

Expand Down Expand Up @@ -106,15 +110,15 @@ test: dep-ensured bins
@rm -f test
@rm -f test.log
@for dir in $(TEST_DIRS); do \
go test -timeout 20m -race -coverprofile=$@ "$$dir" | tee -a test.log; \
go test -timeout 20m -race -coverprofile=$@ "$$dir" $(TEST_TAG) | tee -a test.log; \
done;

test_eventsV2: dep-ensured bins
@rm -f test_eventsV2
@rm -f test_eventsV2.log
@echo Running integration test
@for dir in $(INTEG_TEST_ROOT); do \
go test -timeout 20m -coverprofile=$@ "$$dir" -v -eventsV2=true | tee -a test_eventsV2.log; \
go test -timeout 20m -coverprofile=$@ "$$dir" -v $(TEST_TAG) -eventsV2=true | tee -a test_eventsV2.log; \
done;

test_eventsV2_xdc: dep-ensured bins
Expand Down Expand Up @@ -150,7 +154,7 @@ cover_integration_profile: clean bins_nothrift

@echo Running integration test with $(PERSISTENCE_TYPE) and eventsV2 $(EVENTSV2)
@mkdir -p $(BUILD)/$(INTEG_TEST_DIR)
@time go test $(INTEG_TEST_ROOT) $(TEST_ARG) -eventsV2=$(EVENTSV2) -persistenceType=$(PERSISTENCE_TYPE) $(GOCOVERPKG_ARG) -coverprofile=$(BUILD)/$(INTEG_TEST_DIR)/coverage.out || exit 1;
@time go test $(INTEG_TEST_ROOT) $(TEST_ARG) $(TEST_TAG) -eventsV2=$(EVENTSV2) -persistenceType=$(PERSISTENCE_TYPE) $(GOCOVERPKG_ARG) -coverprofile=$(BUILD)/$(INTEG_TEST_DIR)/coverage.out || exit 1;
@cat $(BUILD)/$(INTEG_TEST_DIR)/coverage.out | grep -v "mode: atomic" >> $(BUILD)/cover.out

cover_xdc_profile: clean bins_nothrift
Expand Down
26 changes: 26 additions & 0 deletions docker/buildkite/docker-compose-local.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,17 @@ services:
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181

elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch-oss:6.5.1
ports:
- "9200:9200"
networks:
services-network:
aliases:
- elasticsearch
environment:
- discovery.type=single-node

unit-test:
build:
context: ../../
Expand Down Expand Up @@ -93,8 +104,13 @@ services:
- "7939:7939"
environment:
- "CASSANDRA_SEEDS=cassandra"
- "ES_SEEDS=elasticsearch"
- "KAFKA_SEEDS=kafka"
- "TEST_TAG=esintegration"
depends_on:
- cassandra
- elasticsearch
- kafka
volumes:
- ../../:/go/src/github.com/uber/cadence
networks:
Expand All @@ -119,9 +135,14 @@ services:
- "7939:7939"
environment:
- "MYSQL_SEEDS=mysql"
- "ES_SEEDS=elasticsearch"
- "KAFKA_SEEDS=kafka"
- "PERSISTENCE_TYPE=sql"
- "TEST_TAG=esintegration"
depends_on:
- mysql
- elasticsearch
- kafka
volumes:
- ../../:/go/src/github.com/uber/cadence
networks:
Expand All @@ -146,8 +167,13 @@ services:
- "7939:7939"
environment:
- "CASSANDRA_SEEDS=cassandra"
- "ES_SEEDS=elasticsearch"
- "KAFKA_SEEDS=kafka"
- "TEST_TAG=esintegration"
depends_on:
- cassandra
- elasticsearch
- kafka
volumes:
- ../../:/go/src/github.com/uber/cadence
networks:
Expand Down
21 changes: 21 additions & 0 deletions docker/buildkite/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,17 @@ services:
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181

elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch-oss:6.5.1
ports:
- "9200:9200"
networks:
services-network:
aliases:
- elasticsearch
environment:
- discovery.type=single-node

unit-test:
build:
context: ../../
Expand Down Expand Up @@ -86,12 +97,17 @@ services:
- "7939:7939"
environment:
- "CASSANDRA_SEEDS=cassandra"
- "ES_SEEDS=elasticsearch"
- "KAFKA_SEEDS=kafka"
- "TEST_TAG=esintegration"
- BUILDKITE_AGENT_ACCESS_TOKEN
- BUILDKITE_JOB_ID
- BUILDKITE_BUILD_ID
- BUILDKITE_BUILD_NUMBER
depends_on:
- cassandra
- elasticsearch
- kafka
volumes:
- ../../:/go/src/github.com/uber/cadence
- /usr/bin/buildkite-agent:/usr/bin/buildkite-agent
Expand All @@ -111,13 +127,18 @@ services:
- "7939:7939"
environment:
- "MYSQL_SEEDS=mysql"
- "ES_SEEDS=elasticsearch"
- "KAFKA_SEEDS=kafka"
- "PERSISTENCE_TYPE=sql"
- "TEST_TAG=esintegration"
- BUILDKITE_AGENT_ACCESS_TOKEN
- BUILDKITE_JOB_ID
- BUILDKITE_BUILD_ID
- BUILDKITE_BUILD_NUMBER
depends_on:
- mysql
- elasticsearch
- kafka
volumes:
- ../../:/go/src/github.com/uber/cadence
- /usr/bin/buildkite-agent:/usr/bin/buildkite-agent
Expand Down
43 changes: 43 additions & 0 deletions environment/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,13 @@ const (
KafkaPort = "KAFKA_PORT"
// KafkaDefaultPort Kafka default port
KafkaDefaultPort = "9092"

// ESSeeds env
ESSeeds = "ES_SEEDS"
// ESPort env
ESPort = "ES_PORT"
// ESDefaultPort ES default port
ESDefaultPort = "9200"
)

// SetupEnv setup the necessary env
Expand Down Expand Up @@ -95,6 +102,20 @@ func SetupEnv() {
panic(fmt.Sprintf("error setting env %v", KafkaPort))
}
}

if os.Getenv(ESSeeds) == "" {
err := os.Setenv(ESSeeds, Localhost)
if err != nil {
panic(fmt.Sprintf("error setting env %v", ESSeeds))
}
}

if os.Getenv(ESPort) == "" {
err := os.Setenv(ESPort, ESDefaultPort)
if err != nil {
panic(fmt.Sprintf("error setting env %v", ESPort))
}
}
}

// GetCassandraAddress return the cassandra address
Expand Down Expand Up @@ -162,3 +183,25 @@ func GetKafkaPort() int {
}
return p
}

// GetESAddress return the kafka address
func GetESAddress() string {
addr := os.Getenv(ESSeeds)
if addr == "" {
addr = Localhost
}
return addr
}

// GetESPort return the Kafka port
func GetESPort() int {
port := os.Getenv(ESPort)
if port == "" {
port = ESDefaultPort
}
p, err := strconv.Atoi(port)
if err != nil {
panic(fmt.Sprintf("error getting env %v", ESPort))
}
return p
}
156 changes: 156 additions & 0 deletions host/elasticsearch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
// Copyright (c) 2016 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
//+build esintegration

package host

import (
"flag"
"io/ioutil"
"testing"
"time"

"github.com/olivere/elastic"
"github.com/pborman/uuid"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
workflow "github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/common"
)

type elasticsearchIntegrationSuite struct {
// override suite.Suite.Assertions with require.Assertions; this means that s.NotNil(nil) will stop the test,
// not merely log an error
*require.Assertions
IntegrationBase
esClient *elastic.Client
}

// This cluster use customized threshold for history config
func (s *elasticsearchIntegrationSuite) SetupSuite() {
s.setupSuite("testdata/integration_elasticsearch_cluster.yaml")
s.createESClient()
s.putIndexTemplate("testdata/es_index_template.json", "test-visibility-template")
s.createIndex(s.testClusterConfig.ESConfig.Indices[common.VisibilityAppName])
}

func (s *elasticsearchIntegrationSuite) TearDownSuite() {
s.tearDownSuite()
s.deleteIndex(s.testClusterConfig.ESConfig.Indices[common.VisibilityAppName])
}

func (s *elasticsearchIntegrationSuite) SetupTest() {
// Have to define our overridden assertions in the test setup. If we did it earlier, s.T() will return nil
s.Assertions = require.New(s.T())
}

func TestElasticsearchIntegrationSuite(t *testing.T) {
flag.Parse()
suite.Run(t, new(elasticsearchIntegrationSuite))
}

func (s *elasticsearchIntegrationSuite) TestListOpenWorkflow() {
id := "es-integration-start-workflow-test"
wt := "es-integration-start-workflow-test-type"
tl := "es-integration-start-workflow-test-tasklist"
identity := "worker1"

workflowType := &workflow.WorkflowType{}
workflowType.Name = common.StringPtr(wt)

taskList := &workflow.TaskList{}
taskList.Name = common.StringPtr(tl)

request := &workflow.StartWorkflowExecutionRequest{
RequestId: common.StringPtr(uuid.New()),
Domain: common.StringPtr(s.domainName),
WorkflowId: common.StringPtr(id),
WorkflowType: workflowType,
TaskList: taskList,
Input: nil,
ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(100),
TaskStartToCloseTimeoutSeconds: common.Int32Ptr(1),
Identity: common.StringPtr(identity),
}

startTime := time.Now().UnixNano()
we, err := s.engine.StartWorkflowExecution(createContext(), request)
s.Nil(err)

startFilter := &workflow.StartTimeFilter{}
startFilter.EarliestTime = common.Int64Ptr(startTime)
var openExecution *workflow.WorkflowExecutionInfo
for i := 0; i < 20; i++ {
startFilter.LatestTime = common.Int64Ptr(time.Now().UnixNano())
resp, err := s.engine.ListOpenWorkflowExecutions(createContext(), &workflow.ListOpenWorkflowExecutionsRequest{
Domain: common.StringPtr(s.domainName),
MaximumPageSize: common.Int32Ptr(100),
StartTimeFilter: startFilter,
ExecutionFilter: &workflow.WorkflowExecutionFilter{
WorkflowId: common.StringPtr(id),
},
})
s.Nil(err)
if len(resp.GetExecutions()) == 1 {
openExecution = resp.GetExecutions()[0]
break
}
time.Sleep(200 * time.Millisecond)
}
s.NotNil(openExecution)
s.Equal(we.GetRunId(), openExecution.GetExecution().GetRunId())
}

func (s *elasticsearchIntegrationSuite) createESClient() {
var err error
s.esClient, err = elastic.NewClient(
elastic.SetURL(s.testClusterConfig.ESConfig.URL.String()),
elastic.SetRetrier(elastic.NewBackoffRetrier(elastic.NewExponentialBackoff(128*time.Millisecond, 513*time.Millisecond))),
)
s.Require().NoError(err)
}

func (s *elasticsearchIntegrationSuite) putIndexTemplate(templateConfigFile, templateName string) {
template, err := ioutil.ReadFile(templateConfigFile)
s.Require().NoError(err)
putTemplate, err := s.esClient.IndexPutTemplate(templateName).BodyString(string(template)).Do(createContext())
s.Require().NoError(err)
s.Require().True(putTemplate.Acknowledged)
}

func (s *elasticsearchIntegrationSuite) createIndex(indexName string) {
exists, err := s.esClient.IndexExists(indexName).Do(createContext())
s.Require().NoError(err)
if exists {
deleteTestIndex, err := s.esClient.DeleteIndex(indexName).Do(createContext())
s.Require().Nil(err)
s.Require().True(deleteTestIndex.Acknowledged)
}

createTestIndex, err := s.esClient.CreateIndex(indexName).Do(createContext())
s.Require().NoError(err)
s.Require().True(createTestIndex.Acknowledged)
}

func (s *elasticsearchIntegrationSuite) deleteIndex(indexName string) {
deleteTestIndex, err := s.esClient.DeleteIndex(indexName).Do(createContext())
s.Nil(err)
s.True(deleteTestIndex.Acknowledged)
}
6 changes: 5 additions & 1 deletion host/integration_cross_dc_domain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,11 @@ func (s *integrationCrossDCSuite) TearDownTest() {

func (s *integrationCrossDCSuite) setupTest(enableGlobalDomain bool, isMasterCluster bool) {
c, err := NewCluster(&TestClusterConfig{
EnableWorker: false,
WorkerConfig: &WorkerConfig{
EnableReplicator: false,
EnableArchiver: false,
EnableIndexer: false,
},
IsMasterCluster: isMasterCluster,
ClusterInfo: config.ClustersInfo{
EnableGlobalDomain: enableGlobalDomain,
Expand Down
3 changes: 3 additions & 0 deletions host/integrationbase.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,9 @@ func GetTestClusterConfig(configFile string) (*TestClusterConfig, error) {

options.EnableEventsV2 = TestFlags.EnableEventsV2
options.FrontendAddress = TestFlags.FrontendAddr
if options.ESConfig.Enable {
options.ESConfig.Indices[common.VisibilityAppName] += uuid.New()
}
return &options, nil
}

Expand Down
Loading

0 comments on commit 515018f

Please sign in to comment.