Skip to content

Commit

Permalink
Support ElasticSearch v7 (cadence-workflow#3700)
Browse files Browse the repository at this point in the history
  • Loading branch information
longquanzheng authored Nov 5, 2020
1 parent c11be59 commit 1125076
Show file tree
Hide file tree
Showing 30 changed files with 1,549 additions and 89 deletions.
15 changes: 15 additions & 0 deletions .buildkite/pipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,21 @@ steps:
run: integration-test-cassandra
config: docker/buildkite/docker-compose.yml

- label: ":golang: integration test with cassandra with ElasticSearch V7"
agents:
queue: "workers"
docker: "*"
command: "make cover_integration_profile"
artifact_paths:
- "build/coverage/*.out"
retry:
automatic:
limit: 1
plugins:
- docker-compose#v3.0.0:
run: integration-test-cassandra
config: docker/buildkite/docker-compose-es7.yml

- label: ":golang: integration ndc test with cassandra"
agents:
queue: "workers"
Expand Down
16 changes: 9 additions & 7 deletions common/elasticsearch/client_v6.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,14 @@ type (
}
)

// TODO https://github.com/uber/cadence/issues/3686
const oneMicroSecondInNano = int64(time.Microsecond / time.Nanosecond)
func (c *elasticV6) IsNotFoundError(err error) bool {
if elastic.IsNotFound(err) {
return true
}
return false
}

// NewWrapperClient returns a new implementation of Client
// newV6Client returns a new implementation of GenericClient
func newV6Client(
connectConfig *config.ElasticSearchConfig,
visibilityConfig *config.VisibilityConfig,
Expand Down Expand Up @@ -200,7 +204,7 @@ func (c *elasticV6) RunBulkProcessor(ctx context.Context, parameters *BulkProces
}

afterFunc := func(executionId int64, requests []elastic.BulkableRequest, response *elastic.BulkResponse, err error) {
gerr := convertToGenericError(err)
gerr := convertV6ErrorToGenericError(err)
parameters.AfterFunc(
executionId,
fromV6ToGenericBulkableRequests(requests),
Expand All @@ -220,9 +224,7 @@ func (c *elasticV6) RunBulkProcessor(ctx context.Context, parameters *BulkProces
})
}

const unknownStatusCode = -1

func convertToGenericError(err error) *GenericError {
func convertV6ErrorToGenericError(err error) *GenericError {
if err == nil {
return nil
}
Expand Down
Loading

0 comments on commit 1125076

Please sign in to comment.