Skip to content

Commit

Permalink
Add Scan API (cadence-workflow#1752)
Browse files Browse the repository at this point in the history
* Fix lint

* Add Scan API

* Add unit test

* Add unit test

* Add back build tag for es host test

* Add mock
  • Loading branch information
vancexu authored Apr 30, 2019
1 parent 364dce9 commit 3e9afde
Show file tree
Hide file tree
Showing 31 changed files with 1,493 additions and 36 deletions.
4 changes: 2 additions & 2 deletions .gen/go/cadence/idl.go

Large diffs are not rendered by default.

721 changes: 721 additions & 0 deletions .gen/go/cadence/workflowservice_scanworkflowexecutions.go

Large diffs are not rendered by default.

29 changes: 29 additions & 0 deletions .gen/go/cadence/workflowserviceclient/client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

37 changes: 36 additions & 1 deletion .gen/go/cadence/workflowserviceserver/server.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

33 changes: 33 additions & 0 deletions .gen/go/cadence/workflowservicetest/client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 16 additions & 0 deletions client/frontend/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,22 @@ func (c *clientImpl) ListWorkflowExecutions(
return client.ListWorkflowExecutions(ctx, request, opts...)
}

func (c *clientImpl) ScanWorkflowExecutions(
ctx context.Context,
request *shared.ListWorkflowExecutionsRequest,
opts ...yarpc.CallOption,
) (*shared.ListWorkflowExecutionsResponse, error) {

opts = common.AggregateYarpcOptions(ctx, opts...)
client, err := c.getRandomClient()
if err != nil {
return nil, err
}
ctx, cancel := c.createContext(ctx)
defer cancel()
return client.ScanWorkflowExecutions(ctx, request, opts...)
}

func (c *clientImpl) PollForActivityTask(
ctx context.Context,
request *shared.PollForActivityTaskRequest,
Expand Down
18 changes: 18 additions & 0 deletions client/frontend/metricClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,24 @@ func (c *metricClient) ListWorkflowExecutions(
return resp, err
}

func (c *metricClient) ScanWorkflowExecutions(
ctx context.Context,
request *shared.ListWorkflowExecutionsRequest,
opts ...yarpc.CallOption,
) (*shared.ListWorkflowExecutionsResponse, error) {

c.metricsClient.IncCounter(metrics.FrontendScanWorkflowExecutionsScope, metrics.CadenceClientRequests)

sw := c.metricsClient.StartTimer(metrics.FrontendScanWorkflowExecutionsScope, metrics.CadenceClientLatency)
resp, err := c.client.ListWorkflowExecutions(ctx, request, opts...)
sw.Stop()

if err != nil {
c.metricsClient.IncCounter(metrics.FrontendScanWorkflowExecutionsScope, metrics.CadenceClientFailures)
}
return resp, err
}

func (c *metricClient) PollForActivityTask(
ctx context.Context,
request *shared.PollForActivityTaskRequest,
Expand Down
16 changes: 16 additions & 0 deletions client/frontend/retryableClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,22 @@ func (c *retryableClient) ListWorkflowExecutions(
return resp, err
}

func (c *retryableClient) ScanWorkflowExecutions(
ctx context.Context,
request *shared.ListWorkflowExecutionsRequest,
opts ...yarpc.CallOption,
) (*shared.ListWorkflowExecutionsResponse, error) {

var resp *shared.ListWorkflowExecutionsResponse
op := func() error {
var err error
resp, err = c.client.ScanWorkflowExecutions(ctx, request, opts...)
return err
}
err := backoff.Retry(op, c.policy, c.isRetryable)
return resp, err
}

func (c *retryableClient) PollForActivityTask(
ctx context.Context,
request *shared.PollForActivityTaskRequest,
Expand Down
31 changes: 31 additions & 0 deletions common/elasticsearch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,16 @@ type (
Client interface {
Search(ctx context.Context, p *SearchParameters) (*elastic.SearchResult, error)
SearchWithDSL(ctx context.Context, index, query string) (*elastic.SearchResult, error)
Scroll(ctx context.Context, scrollID string) (*elastic.SearchResult, ScrollService, error)
ScrollFirstPage(ctx context.Context, index, query string) (*elastic.SearchResult, ScrollService, error)
RunBulkProcessor(ctx context.Context, p *BulkProcessorParameters) (*elastic.BulkProcessor, error)
}

// ScrollService is a interface for elastic.ScrollService
ScrollService interface {
Clear(ctx context.Context) error
}

// SearchParameters holds all required and optional parameters for executing a search
SearchParameters struct {
Index string
Expand All @@ -62,6 +69,10 @@ type (
elasticWrapper struct {
client *elastic.Client
}

scrollServiceImpl struct {
scrollService *elastic.ScrollService
}
)

var _ Client = (*elasticWrapper)(nil)
Expand Down Expand Up @@ -104,6 +115,22 @@ func (c *elasticWrapper) SearchWithDSL(ctx context.Context, index, query string)
return c.client.Search(index).Source(query).Do(ctx)
}

func (c *elasticWrapper) Scroll(ctx context.Context, scrollID string) (
*elastic.SearchResult, ScrollService, error) {

scrollService := elastic.NewScrollService(c.client)
result, err := scrollService.ScrollId(scrollID).Do(ctx)
return result, &scrollServiceImpl{scrollService}, err
}

func (c *elasticWrapper) ScrollFirstPage(ctx context.Context, index, query string) (
*elastic.SearchResult, ScrollService, error) {

scrollService := elastic.NewScrollService(c.client)
result, err := scrollService.Index(index).Body(query).Do(ctx)
return result, &scrollServiceImpl{scrollService}, err
}

func (c *elasticWrapper) RunBulkProcessor(ctx context.Context, p *BulkProcessorParameters) (*elastic.BulkProcessor, error) {
return c.client.BulkProcessor().
Name(p.Name).
Expand All @@ -116,3 +143,7 @@ func (c *elasticWrapper) RunBulkProcessor(ctx context.Context, p *BulkProcessorP
After(p.AfterFunc).
Do(ctx)
}

func (s *scrollServiceImpl) Clear(ctx context.Context) error {
return s.scrollService.Clear(ctx)
}
66 changes: 65 additions & 1 deletion common/elasticsearch/mocks/Client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 3e9afde

Please sign in to comment.