Skip to content

Commit

Permalink
Enforce context timeout in ES persistence implementation (cadence-wor…
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt authored Oct 13, 2020
1 parent d072276 commit de605fa
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 33 deletions.
50 changes: 25 additions & 25 deletions common/persistence/elasticsearch/esVisibilityStore.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func (v *esVisibilityStore) UpsertWorkflowExecution(
}

func (v *esVisibilityStore) ListOpenWorkflowExecutions(
_ context.Context,
ctx context.Context,
request *p.InternalListWorkflowExecutionsRequest,
) (*p.InternalListWorkflowExecutionsResponse, error) {
token, err := v.getNextPageToken(request.NextPageToken)
Expand All @@ -187,7 +187,7 @@ func (v *esVisibilityStore) ListOpenWorkflowExecutions(
}

isOpen := true
searchResult, err := v.getSearchResult(request, token, nil, isOpen)
searchResult, err := v.getSearchResult(ctx, request, token, nil, isOpen)
if err != nil {
return nil, &workflow.InternalServiceError{
Message: fmt.Sprintf("ListOpenWorkflowExecutions failed. Error: %v", err),
Expand All @@ -203,7 +203,7 @@ func (v *esVisibilityStore) ListOpenWorkflowExecutions(
}

func (v *esVisibilityStore) ListClosedWorkflowExecutions(
_ context.Context,
ctx context.Context,
request *p.InternalListWorkflowExecutionsRequest,
) (*p.InternalListWorkflowExecutionsResponse, error) {

Expand All @@ -213,7 +213,7 @@ func (v *esVisibilityStore) ListClosedWorkflowExecutions(
}

isOpen := false
searchResult, err := v.getSearchResult(request, token, nil, isOpen)
searchResult, err := v.getSearchResult(ctx, request, token, nil, isOpen)
if err != nil {
return nil, &workflow.InternalServiceError{
Message: fmt.Sprintf("ListClosedWorkflowExecutions failed. Error: %v", err),
Expand All @@ -229,7 +229,7 @@ func (v *esVisibilityStore) ListClosedWorkflowExecutions(
}

func (v *esVisibilityStore) ListOpenWorkflowExecutionsByType(
_ context.Context,
ctx context.Context,
request *p.InternalListWorkflowExecutionsByTypeRequest,
) (*p.InternalListWorkflowExecutionsResponse, error) {

Expand All @@ -240,7 +240,7 @@ func (v *esVisibilityStore) ListOpenWorkflowExecutionsByType(

isOpen := true
matchQuery := elastic.NewMatchQuery(es.WorkflowType, request.WorkflowTypeName)
searchResult, err := v.getSearchResult(&request.InternalListWorkflowExecutionsRequest, token, matchQuery, isOpen)
searchResult, err := v.getSearchResult(ctx, &request.InternalListWorkflowExecutionsRequest, token, matchQuery, isOpen)
if err != nil {
return nil, &workflow.InternalServiceError{
Message: fmt.Sprintf("ListOpenWorkflowExecutionsByType failed. Error: %v", err),
Expand All @@ -256,7 +256,7 @@ func (v *esVisibilityStore) ListOpenWorkflowExecutionsByType(
}

func (v *esVisibilityStore) ListClosedWorkflowExecutionsByType(
_ context.Context,
ctx context.Context,
request *p.InternalListWorkflowExecutionsByTypeRequest,
) (*p.InternalListWorkflowExecutionsResponse, error) {

Expand All @@ -267,7 +267,7 @@ func (v *esVisibilityStore) ListClosedWorkflowExecutionsByType(

isOpen := false
matchQuery := elastic.NewMatchQuery(es.WorkflowType, request.WorkflowTypeName)
searchResult, err := v.getSearchResult(&request.InternalListWorkflowExecutionsRequest, token, matchQuery, isOpen)
searchResult, err := v.getSearchResult(ctx, &request.InternalListWorkflowExecutionsRequest, token, matchQuery, isOpen)
if err != nil {
return nil, &workflow.InternalServiceError{
Message: fmt.Sprintf("ListClosedWorkflowExecutionsByType failed. Error: %v", err),
Expand All @@ -283,7 +283,7 @@ func (v *esVisibilityStore) ListClosedWorkflowExecutionsByType(
}

func (v *esVisibilityStore) ListOpenWorkflowExecutionsByWorkflowID(
_ context.Context,
ctx context.Context,
request *p.InternalListWorkflowExecutionsByWorkflowIDRequest,
) (*p.InternalListWorkflowExecutionsResponse, error) {

Expand All @@ -294,7 +294,7 @@ func (v *esVisibilityStore) ListOpenWorkflowExecutionsByWorkflowID(

isOpen := true
matchQuery := elastic.NewMatchQuery(es.WorkflowID, request.WorkflowID)
searchResult, err := v.getSearchResult(&request.InternalListWorkflowExecutionsRequest, token, matchQuery, isOpen)
searchResult, err := v.getSearchResult(ctx, &request.InternalListWorkflowExecutionsRequest, token, matchQuery, isOpen)
if err != nil {
return nil, &workflow.InternalServiceError{
Message: fmt.Sprintf("ListOpenWorkflowExecutionsByWorkflowID failed. Error: %v", err),
Expand All @@ -310,7 +310,7 @@ func (v *esVisibilityStore) ListOpenWorkflowExecutionsByWorkflowID(
}

func (v *esVisibilityStore) ListClosedWorkflowExecutionsByWorkflowID(
_ context.Context,
ctx context.Context,
request *p.InternalListWorkflowExecutionsByWorkflowIDRequest,
) (*p.InternalListWorkflowExecutionsResponse, error) {

Expand All @@ -321,7 +321,7 @@ func (v *esVisibilityStore) ListClosedWorkflowExecutionsByWorkflowID(

isOpen := false
matchQuery := elastic.NewMatchQuery(es.WorkflowID, request.WorkflowID)
searchResult, err := v.getSearchResult(&request.InternalListWorkflowExecutionsRequest, token, matchQuery, isOpen)
searchResult, err := v.getSearchResult(ctx, &request.InternalListWorkflowExecutionsRequest, token, matchQuery, isOpen)
if err != nil {
return nil, &workflow.InternalServiceError{
Message: fmt.Sprintf("ListClosedWorkflowExecutionsByWorkflowID failed. Error: %v", err),
Expand All @@ -337,7 +337,7 @@ func (v *esVisibilityStore) ListClosedWorkflowExecutionsByWorkflowID(
}

func (v *esVisibilityStore) ListClosedWorkflowExecutionsByStatus(
_ context.Context,
ctx context.Context,
request *p.InternalListClosedWorkflowExecutionsByStatusRequest,
) (*p.InternalListWorkflowExecutionsResponse, error) {

Expand All @@ -348,7 +348,7 @@ func (v *esVisibilityStore) ListClosedWorkflowExecutionsByStatus(

isOpen := false
matchQuery := elastic.NewMatchQuery(es.CloseStatus, int32(request.Status))
searchResult, err := v.getSearchResult(&request.InternalListWorkflowExecutionsRequest, token, matchQuery, isOpen)
searchResult, err := v.getSearchResult(ctx, &request.InternalListWorkflowExecutionsRequest, token, matchQuery, isOpen)
if err != nil {
return nil, &workflow.InternalServiceError{
Message: fmt.Sprintf("ListClosedWorkflowExecutionsByStatus failed. Error: %v", err),
Expand All @@ -364,7 +364,7 @@ func (v *esVisibilityStore) ListClosedWorkflowExecutionsByStatus(
}

func (v *esVisibilityStore) GetClosedWorkflowExecution(
_ context.Context,
ctx context.Context,
request *p.InternalGetClosedWorkflowExecutionRequest,
) (*p.InternalGetClosedWorkflowExecutionResponse, error) {

Expand All @@ -378,7 +378,6 @@ func (v *esVisibilityStore) GetClosedWorkflowExecution(
boolQuery = boolQuery.Must(matchRunIDQuery)
}

ctx := context.Background()
params := &es.SearchParameters{
Index: v.index,
Query: boolQuery,
Expand Down Expand Up @@ -415,7 +414,7 @@ func (v *esVisibilityStore) DeleteWorkflowExecution(
}

func (v *esVisibilityStore) ListWorkflowExecutions(
_ context.Context,
ctx context.Context,
request *p.ListWorkflowExecutionsRequestV2,
) (*p.InternalListWorkflowExecutionsResponse, error) {

Expand All @@ -431,7 +430,6 @@ func (v *esVisibilityStore) ListWorkflowExecutions(
return nil, &workflow.BadRequestError{Message: fmt.Sprintf("Error when parse query: %v", err)}
}

ctx := context.Background()
searchResult, err := v.esClient.SearchWithDSL(ctx, v.index, queryDSL)
if err != nil {
return nil, &workflow.InternalServiceError{
Expand All @@ -443,7 +441,7 @@ func (v *esVisibilityStore) ListWorkflowExecutions(
}

func (v *esVisibilityStore) ScanWorkflowExecutions(
_ context.Context,
ctx context.Context,
request *p.ListWorkflowExecutionsRequestV2,
) (*p.InternalListWorkflowExecutionsResponse, error) {

Expand All @@ -454,7 +452,6 @@ func (v *esVisibilityStore) ScanWorkflowExecutions(
return nil, err
}

ctx := context.Background()
var searchResult *elastic.SearchResult
var scrollService es.ScrollService
if len(token.ScrollID) == 0 { // first call
Expand Down Expand Up @@ -482,7 +479,7 @@ func (v *esVisibilityStore) ScanWorkflowExecutions(
}

func (v *esVisibilityStore) CountWorkflowExecutions(
_ context.Context,
ctx context.Context,
request *p.CountWorkflowExecutionsRequest,
) (
*p.CountWorkflowExecutionsResponse, error) {
Expand All @@ -492,7 +489,6 @@ func (v *esVisibilityStore) CountWorkflowExecutions(
return nil, &workflow.BadRequestError{Message: fmt.Sprintf("Error when parse query: %v", err)}
}

ctx := context.Background()
count, err := v.esClient.Count(ctx, v.index, queryDSL)
if err != nil {
return nil, &workflow.InternalServiceError{
Expand Down Expand Up @@ -770,8 +766,13 @@ func (v *esVisibilityStore) getNextPageToken(token []byte) (*esVisibilityPageTok
return result, nil
}

func (v *esVisibilityStore) getSearchResult(request *p.InternalListWorkflowExecutionsRequest, token *esVisibilityPageToken,
matchQuery *elastic.MatchQuery, isOpen bool) (*elastic.SearchResult, error) {
func (v *esVisibilityStore) getSearchResult(
ctx context.Context,
request *p.InternalListWorkflowExecutionsRequest,
token *esVisibilityPageToken,
matchQuery *elastic.MatchQuery,
isOpen bool,
) (*elastic.SearchResult, error) {

matchDomainQuery := elastic.NewMatchQuery(es.DomainID, request.DomainUUID)
existClosedStatusQuery := elastic.NewExistsQuery(es.CloseStatus)
Expand Down Expand Up @@ -806,7 +807,6 @@ func (v *esVisibilityStore) getSearchResult(request *p.InternalListWorkflowExecu
boolQuery = boolQuery.Must(existClosedStatusQuery)
}

ctx := context.Background()
params := &es.SearchParameters{
Index: v.index,
Query: boolQuery,
Expand Down
10 changes: 5 additions & 5 deletions common/persistence/elasticsearch/esVisibilityStore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,7 @@ func (s *ESVisibilitySuite) TestGetSearchResult() {
Sorter: []elastic.Sorter{elastic.NewFieldSort(es.StartTime).Desc(), tieBreakerSorter},
}
s.mockESClient.On("Search", mock.Anything, params).Return(nil, nil).Once()
_, err := s.visibilityStore.getSearchResult(request, token, nil, isOpen)
_, err := s.visibilityStore.getSearchResult(context.Background(), request, token, nil, isOpen)
s.NoError(err)

// test request latestTime overflow
Expand All @@ -519,7 +519,7 @@ func (s *ESVisibilitySuite) TestGetSearchResult() {
Sorter: []elastic.Sorter{elastic.NewFieldSort(es.StartTime).Desc(), tieBreakerSorter},
}
s.mockESClient.On("Search", mock.Anything, param1).Return(nil, nil).Once()
_, err = s.visibilityStore.getSearchResult(request, token, nil, isOpen)
_, err = s.visibilityStore.getSearchResult(context.Background(), request, token, nil, isOpen)
s.NoError(err)
request.LatestStartTime = testLatestTime // revert

Expand All @@ -530,15 +530,15 @@ func (s *ESVisibilitySuite) TestGetSearchResult() {
params.Query = boolQuery
params.Sorter = []elastic.Sorter{elastic.NewFieldSort(es.CloseTime).Desc(), tieBreakerSorter}
s.mockESClient.On("Search", mock.Anything, params).Return(nil, nil).Once()
_, err = s.visibilityStore.getSearchResult(request, token, nil, isOpen)
_, err = s.visibilityStore.getSearchResult(context.Background(), request, token, nil, isOpen)
s.NoError(err)

// test for additional matchQuery
matchQuery := elastic.NewMatchQuery(es.CloseStatus, int32(0))
boolQuery = elastic.NewBoolQuery().Must(matchDomainQuery).Filter(rangeQuery).Must(matchQuery).Must(existClosedStatusQuery)
params.Query = boolQuery
s.mockESClient.On("Search", mock.Anything, params).Return(nil, nil).Once()
_, err = s.visibilityStore.getSearchResult(request, token, matchQuery, isOpen)
_, err = s.visibilityStore.getSearchResult(context.Background(), request, token, matchQuery, isOpen)
s.NoError(err)

// test for search after
Expand All @@ -550,7 +550,7 @@ func (s *ESVisibilitySuite) TestGetSearchResult() {
params.From = 0
params.SearchAfter = []interface{}{token.SortValue, token.TieBreaker}
s.mockESClient.On("Search", mock.Anything, params).Return(nil, nil).Once()
_, err = s.visibilityStore.getSearchResult(request, token, matchQuery, isOpen)
_, err = s.visibilityStore.getSearchResult(context.Background(), request, token, matchQuery, isOpen)
s.NoError(err)
}

Expand Down
2 changes: 1 addition & 1 deletion common/persistence/persistenceInterface.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ type (
GetDLQAckLevels(ctx context.Context) (map[string]int64, error)
}

// QueueMessage is the message that stores in the queue
// InternalQueueMessage is the message that stores in the queue
InternalQueueMessage struct {
ID int64 `json:"message_id"`
QueueType QueueType `json:"queue_type"`
Expand Down
4 changes: 2 additions & 2 deletions common/persistence/sql/sqlExecutionManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -860,9 +860,9 @@ func (m *sqlExecutionManager) ListCurrentExecutions(

func (m *sqlExecutionManager) IsWorkflowExecutionExists(
_ context.Context,
request *p.IsWorkflowExecutionExistsRequest,
_ *p.IsWorkflowExecutionExistsRequest,
) (*p.IsWorkflowExecutionExistsResponse, error) {
panic("not implemented yet")
return nil, &workflow.InternalServiceError{Message: "Not yet implemented"}
}

func (m *sqlExecutionManager) ListConcreteExecutions(
Expand Down

0 comments on commit de605fa

Please sign in to comment.