From 36dddd854d988ea0eab1a55b6c5af2e15f26d580 Mon Sep 17 00:00:00 2001 From: Venkat Date: Fri, 28 Jun 2019 13:14:27 -0700 Subject: [PATCH] matching: refactor taskListManager to separate out individual components (#2125) --- common/util.go | 8 + service/matching/config.go | 134 ++++ service/matching/handler.go | 4 +- service/matching/matcher.go | 203 ++++++ service/matching/matchingEngine.go | 189 ++++-- service/matching/matchingEngineInterfaces.go | 4 +- service/matching/matchingEngine_test.go | 111 +-- service/matching/ratelimiter.go | 5 + service/matching/service.go | 46 -- service/matching/task.go | 93 +++ service/matching/taskListManager.go | 671 +++++++------------ service/matching/taskListManager_test.go | 37 +- service/matching/taskReader.go | 208 ++++-- 13 files changed, 1018 insertions(+), 695 deletions(-) create mode 100644 service/matching/config.go create mode 100644 service/matching/matcher.go create mode 100644 service/matching/task.go diff --git a/common/util.go b/common/util.go index 0993ddd58fd..7bd2662b645 100644 --- a/common/util.go +++ b/common/util.go @@ -322,6 +322,14 @@ func MinInt64(a, b int64) int64 { return b } +// MaxInt64 returns the greater of two given int64 +func MaxInt64(a, b int64) int64 { + if a > b { + return a + } + return b +} + // MinInt32 return smaller one of two inputs int32 func MinInt32(a, b int32) int32 { if a < b { diff --git a/service/matching/config.go b/service/matching/config.go new file mode 100644 index 00000000000..d4e60f4cff6 --- /dev/null +++ b/service/matching/config.go @@ -0,0 +1,134 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package matching + +import ( + "time" + + "github.com/uber/cadence/common/cache" + "github.com/uber/cadence/common/service/dynamicconfig" +) + +type ( + // Config represents configuration for cadence-matching service + Config struct { + PersistenceMaxQPS dynamicconfig.IntPropertyFn + EnableSyncMatch dynamicconfig.BoolPropertyFnWithTaskListInfoFilters + RPS dynamicconfig.IntPropertyFn + + // taskListManager configuration + RangeSize int64 + GetTasksBatchSize dynamicconfig.IntPropertyFnWithTaskListInfoFilters + UpdateAckInterval dynamicconfig.DurationPropertyFnWithTaskListInfoFilters + IdleTasklistCheckInterval dynamicconfig.DurationPropertyFnWithTaskListInfoFilters + MaxTasklistIdleTime dynamicconfig.DurationPropertyFnWithTaskListInfoFilters + // Time to hold a poll request before returning an empty response if there are no tasks + LongPollExpirationInterval dynamicconfig.DurationPropertyFnWithTaskListInfoFilters + MinTaskThrottlingBurstSize dynamicconfig.IntPropertyFnWithTaskListInfoFilters + MaxTaskDeleteBatchSize dynamicconfig.IntPropertyFnWithTaskListInfoFilters + + // taskWriter configuration + OutstandingTaskAppendsThreshold dynamicconfig.IntPropertyFnWithTaskListInfoFilters + MaxTaskBatchSize dynamicconfig.IntPropertyFnWithTaskListInfoFilters + + ThrottledLogRPS dynamicconfig.IntPropertyFn + } + + taskListConfig struct { + EnableSyncMatch func() bool + // Time to hold a poll request before returning an empty response if there are no tasks + LongPollExpirationInterval func() time.Duration + RangeSize int64 + GetTasksBatchSize func() int + UpdateAckInterval func() time.Duration + IdleTasklistCheckInterval func() time.Duration + MaxTasklistIdleTime func() time.Duration + MinTaskThrottlingBurstSize func() int + MaxTaskDeleteBatchSize func() int + // taskWriter configuration + OutstandingTaskAppendsThreshold func() int + MaxTaskBatchSize func() int + } +) + +// NewConfig returns new service config with default values +func NewConfig(dc *dynamicconfig.Collection) *Config { + return &Config{ + PersistenceMaxQPS: dc.GetIntProperty(dynamicconfig.MatchingPersistenceMaxQPS, 3000), + EnableSyncMatch: dc.GetBoolPropertyFilteredByTaskListInfo(dynamicconfig.MatchingEnableSyncMatch, true), + RPS: dc.GetIntProperty(dynamicconfig.MatchingRPS, 1200), + RangeSize: 100000, + GetTasksBatchSize: dc.GetIntPropertyFilteredByTaskListInfo(dynamicconfig.MatchingGetTasksBatchSize, 1000), + UpdateAckInterval: dc.GetDurationPropertyFilteredByTaskListInfo(dynamicconfig.MatchingUpdateAckInterval, 1*time.Minute), + IdleTasklistCheckInterval: dc.GetDurationPropertyFilteredByTaskListInfo(dynamicconfig.MatchingIdleTasklistCheckInterval, 5*time.Minute), + MaxTasklistIdleTime: dc.GetDurationPropertyFilteredByTaskListInfo(dynamicconfig.MaxTasklistIdleTime, 5*time.Minute), + LongPollExpirationInterval: dc.GetDurationPropertyFilteredByTaskListInfo(dynamicconfig.MatchingLongPollExpirationInterval, time.Minute), + MinTaskThrottlingBurstSize: dc.GetIntPropertyFilteredByTaskListInfo(dynamicconfig.MatchingMinTaskThrottlingBurstSize, 1), + MaxTaskDeleteBatchSize: dc.GetIntPropertyFilteredByTaskListInfo(dynamicconfig.MatchingMaxTaskDeleteBatchSize, 100), + OutstandingTaskAppendsThreshold: dc.GetIntPropertyFilteredByTaskListInfo(dynamicconfig.MatchingOutstandingTaskAppendsThreshold, 250), + MaxTaskBatchSize: dc.GetIntPropertyFilteredByTaskListInfo(dynamicconfig.MatchingMaxTaskBatchSize, 100), + ThrottledLogRPS: dc.GetIntProperty(dynamicconfig.MatchingThrottledLogRPS, 20), + } +} + +func newTaskListConfig(id *taskListID, config *Config, domainCache cache.DomainCache) (*taskListConfig, error) { + domainEntry, err := domainCache.GetDomainByID(id.domainID) + if err != nil { + return nil, err + } + + domain := domainEntry.GetInfo().Name + taskListName := id.taskListName + taskType := id.taskType + return &taskListConfig{ + RangeSize: config.RangeSize, + GetTasksBatchSize: func() int { + return config.GetTasksBatchSize(domain, taskListName, taskType) + }, + UpdateAckInterval: func() time.Duration { + return config.UpdateAckInterval(domain, taskListName, taskType) + }, + IdleTasklistCheckInterval: func() time.Duration { + return config.IdleTasklistCheckInterval(domain, taskListName, taskType) + }, + MaxTasklistIdleTime: func() time.Duration { + return config.MaxTasklistIdleTime(domain, taskListName, taskType) + }, + MinTaskThrottlingBurstSize: func() int { + return config.MinTaskThrottlingBurstSize(domain, taskListName, taskType) + }, + EnableSyncMatch: func() bool { + return config.EnableSyncMatch(domain, taskListName, taskType) + }, + LongPollExpirationInterval: func() time.Duration { + return config.LongPollExpirationInterval(domain, taskListName, taskType) + }, + MaxTaskDeleteBatchSize: func() int { + return config.MaxTaskDeleteBatchSize(domain, taskListName, taskType) + }, + OutstandingTaskAppendsThreshold: func() int { + return config.OutstandingTaskAppendsThreshold(domain, taskListName, taskType) + }, + MaxTaskBatchSize: func() int { + return config.MaxTaskBatchSize(domain, taskListName, taskType) + }, + }, nil +} diff --git a/service/matching/handler.go b/service/matching/handler.go index 91be389ae3f..adc1782ab23 100644 --- a/service/matching/handler.go +++ b/service/matching/handler.go @@ -129,7 +129,7 @@ func (h *Handler) AddActivityTask(ctx context.Context, addRequest *m.AddActivity return h.handleErr(errMatchingHostThrottle, scope) } - syncMatch, err := h.engine.AddActivityTask(addRequest) + syncMatch, err := h.engine.AddActivityTask(ctx, addRequest) if syncMatch { h.metricsClient.RecordTimer(scope, metrics.SyncMatchLatency, time.Since(startT)) } @@ -149,7 +149,7 @@ func (h *Handler) AddDecisionTask(ctx context.Context, addRequest *m.AddDecision return h.handleErr(errMatchingHostThrottle, scope) } - syncMatch, err := h.engine.AddDecisionTask(addRequest) + syncMatch, err := h.engine.AddDecisionTask(ctx, addRequest) if syncMatch { h.metricsClient.RecordTimer(scope, metrics.SyncMatchLatency, time.Since(startT)) } diff --git a/service/matching/matcher.go b/service/matching/matcher.go new file mode 100644 index 00000000000..047a9906930 --- /dev/null +++ b/service/matching/matcher.go @@ -0,0 +1,203 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package matching + +import ( + "context" + "errors" + "time" + + "github.com/uber/cadence/common/metrics" + "golang.org/x/time/rate" +) + +// TaskMatcher matches a task producer with a task consumer +// Producers are usually rpc calls from history or taskReader +// that drains backlog from db. Consumers are the task list pollers +type TaskMatcher struct { + // synchronous task channel to match producer/consumer + taskC chan *internalTask + // synchronous task channel to match query task - the reason to have + // separate channel for this is because there are cases when consumers + // are interested in queryTasks but not others. Example is when domain is + // not active in a cluster + queryTaskC chan *internalTask + // ratelimiter that limits the rate at which tasks can be dispatched to consumers + limiter *rateLimiter + // domain metric scope + scope func() metrics.Scope +} + +const ( + _defaultTaskDispatchRPS = 100000.0 + _defaultTaskDispatchRPSTTL = 60 * time.Second +) + +var errTasklistThrottled = errors.New("cannot add to tasklist, limit exceeded") + +// newTaskMatcher returns an task matcher instance. The returned instance can be +// used by task producers and consumers to find a match. Both sync matches and non-sync +// matches should use this implementation +func newTaskMatcher(config *taskListConfig, scopeFunc func() metrics.Scope) *TaskMatcher { + dPtr := _defaultTaskDispatchRPS + limiter := newRateLimiter(&dPtr, _defaultTaskDispatchRPSTTL, config.MinTaskThrottlingBurstSize()) + return &TaskMatcher{ + limiter: limiter, + scope: scopeFunc, + taskC: make(chan *internalTask), + queryTaskC: make(chan *internalTask), + } +} + +// Offer offers a task to a potential consumer (poller) +// If the task is successfully matched with a consumer, this +// method will return true and no error. If the task is matched +// but consumer returned error, then this method will return +// true and error message. Both regular tasks and query tasks +// should use this method to match with a consumer. Likewise, sync matches +// and non-sync matches both should use this method. +// returns error when: +// - ratelimit is exceeded (does not apply to query task) +// - context deadline is exceeded +// - task is matched and consumer returns error in response channel +func (tm *TaskMatcher) Offer(ctx context.Context, task *internalTask) (bool, error) { + if task.isQuery() { + select { + case tm.queryTaskC <- task: + <-task.syncResponseCh + return true, nil + case <-ctx.Done(): + return false, ctx.Err() + } + } + + rsv, err := tm.ratelimit(ctx) + if err != nil { + tm.scope().IncCounter(metrics.SyncThrottleCounter) + return false, err + } + + select { + case tm.taskC <- task: // poller picked up the task + if task.syncResponseCh != nil { + // if there is a response channel, block until resp is received + // and return error if the response contains error + err = <-task.syncResponseCh + return true, err + } + return false, nil + default: // no poller waiting for tasks + if rsv != nil { + // there was a ratelimit token we consumed + // return it since we did not really do any work + rsv.Cancel() + } + return false, nil + } +} + +// MustOffer blocks until a consumer is found to handle this task +// Returns error only when context is canceled or the ratelimit is set to zero (allow nothing) +// The passed in context MUST NOT have a deadline associated with it +func (tm *TaskMatcher) MustOffer(ctx context.Context, task *internalTask) error { + if _, err := tm.ratelimit(ctx); err != nil { + return err + } + select { + case tm.taskC <- task: + return nil + case <-ctx.Done(): + return ctx.Err() + } +} + +// Poll blocks until a task is found or context deadline is exceeded +// On success, the returned task could be a query task or a regular task +// Returns ErrNoTasks when context deadline is exceeded +func (tm *TaskMatcher) Poll(ctx context.Context) (*internalTask, error) { + select { + case task := <-tm.taskC: + if task.syncResponseCh != nil { + tm.scope().IncCounter(metrics.PollSuccessWithSyncCounter) + } + tm.scope().IncCounter(metrics.PollSuccessCounter) + return task, nil + case task := <-tm.queryTaskC: + tm.scope().IncCounter(metrics.PollSuccessWithSyncCounter) + tm.scope().IncCounter(metrics.PollSuccessCounter) + return task, nil + case <-ctx.Done(): + tm.scope().IncCounter(metrics.PollTimeoutCounter) + return nil, ErrNoTasks + } +} + +// PollForQuery blocks until a *query* task is found or context deadline is exceeded +// Returns ErrNoTasks when context deadline is exceeded +func (tm *TaskMatcher) PollForQuery(ctx context.Context) (*internalTask, error) { + select { + case task := <-tm.queryTaskC: + tm.scope().IncCounter(metrics.PollSuccessWithSyncCounter) + tm.scope().IncCounter(metrics.PollSuccessCounter) + return task, nil + case <-ctx.Done(): + tm.scope().IncCounter(metrics.PollTimeoutCounter) + return nil, ErrNoTasks + } +} + +// UpdateRatelimit updates the task dispatch rate +func (tm *TaskMatcher) UpdateRatelimit(rps *float64) { + tm.limiter.UpdateMaxDispatch(rps) +} + +// Rate returns the current rate at which tasks are dispatched +func (tm *TaskMatcher) Rate() float64 { + return tm.limiter.Limit() +} + +func (tm *TaskMatcher) ratelimit(ctx context.Context) (*rate.Reservation, error) { + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + } + + deadline, ok := ctx.Deadline() + if !ok { + if err := tm.limiter.Wait(ctx); err != nil { + return nil, err + } + return nil, nil + } + + rsv := tm.limiter.Reserve() + // If we have to wait too long for reservation, give up and return + if !rsv.OK() || rsv.Delay() > deadline.Sub(time.Now()) { + if rsv.OK() { // if we were indeed given a reservation, return it before we bail out + rsv.Cancel() + } + return nil, errTasklistThrottled + } + + time.Sleep(rsv.Delay()) + return rsv, nil +} diff --git a/service/matching/matchingEngine.go b/service/matching/matchingEngine.go index 66a0f010ddb..9f0a533d7c3 100644 --- a/service/matching/matchingEngine.go +++ b/service/matching/matchingEngine.go @@ -35,6 +35,7 @@ import ( workflow "github.com/uber/cadence/.gen/go/shared" "github.com/uber/cadence/client/history" "github.com/uber/cadence/common" + "github.com/uber/cadence/common/backoff" "github.com/uber/cadence/common/cache" "github.com/uber/cadence/common/client" "github.com/uber/cadence/common/log" @@ -215,7 +216,7 @@ func (e *matchingEngineImpl) removeTaskListManager(id *taskListID) { } // AddDecisionTask either delivers task directly to waiting poller or save it into task list persistence. -func (e *matchingEngineImpl) AddDecisionTask(addRequest *m.AddDecisionTaskRequest) (bool, error) { +func (e *matchingEngineImpl) AddDecisionTask(ctx context.Context, addRequest *m.AddDecisionTaskRequest) (bool, error) { domainID := addRequest.GetDomainUUID() taskListName := addRequest.TaskList.GetName() taskListKind := common.TaskListKindPtr(addRequest.TaskList.GetKind()) @@ -235,11 +236,14 @@ func (e *matchingEngineImpl) AddDecisionTask(addRequest *m.AddDecisionTaskReques ScheduleToStartTimeout: addRequest.GetScheduleToStartTimeoutSeconds(), CreatedTime: time.Now(), } - return tlMgr.AddTask(addRequest.Execution, taskInfo) + return tlMgr.AddTask(ctx, addTaskParams{ + execution: addRequest.Execution, + taskInfo: taskInfo, + }) } // AddActivityTask either delivers task directly to waiting poller or save it into task list persistence. -func (e *matchingEngineImpl) AddActivityTask(addRequest *m.AddActivityTaskRequest) (bool, error) { +func (e *matchingEngineImpl) AddActivityTask(ctx context.Context, addRequest *m.AddActivityTaskRequest) (bool, error) { domainID := addRequest.GetDomainUUID() sourceDomainID := addRequest.GetSourceDomainUUID() taskListName := addRequest.TaskList.GetName() @@ -259,7 +263,10 @@ func (e *matchingEngineImpl) AddActivityTask(addRequest *m.AddActivityTaskReques ScheduleToStartTimeout: addRequest.GetScheduleToStartTimeoutSeconds(), CreatedTime: time.Now(), } - return tlMgr.AddTask(addRequest.Execution, taskInfo) + return tlMgr.AddTask(ctx, addTaskParams{ + execution: addRequest.Execution, + taskInfo: taskInfo, + }) } var errQueryBeforeFirstDecisionCompleted = errors.New("query cannot be handled before first decision task is processed, please retry later") @@ -284,7 +291,7 @@ pollLoop: pollerCtx = context.WithValue(pollerCtx, identityKey, request.GetIdentity()) taskList := newTaskListID(domainID, taskListName, persistence.TaskListTypeDecision) taskListKind := common.TaskListKindPtr(request.TaskList.GetKind()) - tCtx, err := e.getTask(pollerCtx, taskList, nil, taskListKind) + task, err := e.getTask(pollerCtx, taskList, nil, taskListKind) if err != nil { // TODO: Is empty poll the best reply for errPumpClosed? if err == ErrNoTasks || err == errPumpClosed { @@ -293,24 +300,24 @@ pollLoop: return nil, err } - if tCtx.queryTaskInfo != nil { - tCtx.completeTask(nil) // this only means query task sync match succeed. + if task.queryInfo != nil { + task.finish(nil) // this only means query task sync match succeed. // for query task, we don't need to update history to record decision task started. but we need to know // the NextEventID so front end knows what are the history events to load for this decision task. mutableStateResp, err := e.historyService.GetMutableState(ctx, &h.GetMutableStateRequest{ DomainUUID: req.DomainUUID, - Execution: &tCtx.workflowExecution, + Execution: &task.workflowExecution, }) if err != nil { // will notify query client that the query task failed - e.deliverQueryResult(tCtx.queryTaskInfo.taskID, &queryResult{err: err}) + e.deliverQueryResult(task.queryInfo.taskID, &queryResult{err: err}) return emptyPollForDecisionTaskResponse, nil } if mutableStateResp.GetPreviousStartedEventId() <= 0 { // first decision task is not processed by worker yet. - e.deliverQueryResult(tCtx.queryTaskInfo.taskID, + e.deliverQueryResult(task.queryInfo.taskID, &queryResult{err: errQueryBeforeFirstDecisionCompleted, waitNextEventID: mutableStateResp.GetNextEventId()}) return emptyPollForDecisionTaskResponse, nil } @@ -334,33 +341,24 @@ pollLoop: EventStoreVersion: mutableStateResp.EventStoreVersion, BranchToken: mutableStateResp.BranchToken, } - return e.createPollForDecisionTaskResponse(tCtx, resp), nil + return e.createPollForDecisionTaskResponse(task, resp), nil } - // Generate a unique requestId for this task which will be used for all retries - requestID := uuid.New() - resp, err := tCtx.RecordDecisionTaskStartedWithRetry(ctx, &h.RecordDecisionTaskStartedRequest{ - DomainUUID: common.StringPtr(domainID), - WorkflowExecution: &tCtx.workflowExecution, - ScheduleId: &tCtx.info.ScheduleID, - TaskId: &tCtx.info.TaskID, - RequestId: common.StringPtr(requestID), - PollRequest: request, - }) + resp, err := e.recordDecisionTaskStarted(ctx, request, task) if err != nil { switch err.(type) { case *workflow.EntityNotExistsError, *h.EventAlreadyStartedError: e.logger.Debug(fmt.Sprintf("Duplicated decision task taskList=%v, taskID=%v", - taskListName, tCtx.info.TaskID)) - tCtx.completeTask(nil) + taskListName, task.info.TaskID)) + task.finish(nil) default: - tCtx.completeTask(err) + task.finish(err) } continue pollLoop } - tCtx.completeTask(nil) - return e.createPollForDecisionTaskResponse(tCtx, resp), nil + task.finish(nil) + return e.createPollForDecisionTaskResponse(task, resp), nil } } @@ -391,7 +389,7 @@ pollLoop: pollerCtx := context.WithValue(ctx, pollerIDKey, pollerID) pollerCtx = context.WithValue(pollerCtx, identityKey, request.GetIdentity()) taskListKind := common.TaskListKindPtr(request.TaskList.GetKind()) - tCtx, err := e.getTask(pollerCtx, taskList, maxDispatch, taskListKind) + task, err := e.getTask(pollerCtx, taskList, maxDispatch, taskListKind) if err != nil { // TODO: Is empty poll the best reply for errPumpClosed? if err == ErrNoTasks || err == errPumpClosed { @@ -399,30 +397,21 @@ pollLoop: } return nil, err } - // Generate a unique requestId for this task which will be used for all retries - requestID := uuid.New() - resp, err := tCtx.RecordActivityTaskStartedWithRetry(ctx, &h.RecordActivityTaskStartedRequest{ - DomainUUID: common.StringPtr(domainID), - WorkflowExecution: &tCtx.workflowExecution, - ScheduleId: &tCtx.info.ScheduleID, - TaskId: &tCtx.info.TaskID, - RequestId: common.StringPtr(requestID), - PollRequest: request, - }) + resp, err := e.recordActivityTaskStarted(ctx, request, task) if err != nil { switch err.(type) { case *workflow.EntityNotExistsError, *h.EventAlreadyStartedError: e.logger.Debug(fmt.Sprintf("Duplicated activity task taskList=%v, taskID=%v", - taskListName, tCtx.info.TaskID)) - tCtx.completeTask(nil) + taskListName, task.info.TaskID)) + task.finish(nil) default: - tCtx.completeTask(err) + task.finish(err) } continue pollLoop } - tCtx.completeTask(nil) - return e.createPollForActivityTaskResponse(tCtx, resp), nil + task.finish(nil) + return e.createPollForActivityTaskResponse(task, resp), nil } } @@ -445,7 +434,7 @@ query_loop: queryRequest: queryRequest, taskID: uuid.New(), } - err = tlMgr.SyncMatchQueryTask(ctx, queryTask) + err = tlMgr.DispatchQueryTask(ctx, queryTask) if err != nil { return nil, err } @@ -577,12 +566,12 @@ func (e *matchingEngineImpl) DescribeTaskList(ctx context.Context, request *m.De // Loads a task from persistence and wraps it in a task context func (e *matchingEngineImpl) getTask( ctx context.Context, taskList *taskListID, maxDispatchPerSecond *float64, taskListKind *workflow.TaskListKind, -) (*taskContext, error) { +) (*internalTask, error) { tlMgr, err := e.getTaskListManager(taskList, taskListKind) if err != nil { return nil, err } - return tlMgr.GetTaskContext(ctx, maxDispatchPerSecond) + return tlMgr.GetTask(ctx, maxDispatchPerSecond) } func (e *matchingEngineImpl) unloadTaskList(id *taskListID) { @@ -598,47 +587,49 @@ func (e *matchingEngineImpl) unloadTaskList(id *taskListID) { } // Populate the decision task response based on context and scheduled/started events. -func (e *matchingEngineImpl) createPollForDecisionTaskResponse(context *taskContext, - historyResponse *h.RecordDecisionTaskStartedResponse) *m.PollForDecisionTaskResponse { - task := context.info +func (e *matchingEngineImpl) createPollForDecisionTaskResponse( + task *internalTask, + historyResponse *h.RecordDecisionTaskStartedResponse, +) *m.PollForDecisionTaskResponse { var token []byte - if context.queryTaskInfo != nil { + if task.queryInfo != nil { // for a query task - queryRequest := context.queryTaskInfo.queryRequest + queryRequest := task.queryInfo.queryRequest taskToken := &common.QueryTaskToken{ DomainID: *queryRequest.DomainUUID, TaskList: *queryRequest.TaskList.Name, - TaskID: context.queryTaskInfo.taskID, + TaskID: task.queryInfo.taskID, } token, _ = e.tokenSerializer.SerializeQueryTaskToken(taskToken) } else { taskoken := &common.TaskToken{ - DomainID: task.DomainID, - WorkflowID: task.WorkflowID, - RunID: task.RunID, + DomainID: task.info.DomainID, + WorkflowID: task.info.WorkflowID, + RunID: task.info.RunID, ScheduleID: historyResponse.GetScheduledEventId(), ScheduleAttempt: historyResponse.GetAttempt(), } token, _ = e.tokenSerializer.Serialize(taskoken) - if context.syncResponseCh == nil { + if task.syncResponseCh == nil { scope := e.metricsClient.Scope(metrics.MatchingPollForDecisionTaskScope) - scope.Tagged(metrics.DomainTag(context.domainName)).RecordTimer(metrics.AsyncMatchLatency, time.Since(task.CreatedTime)) + scope.Tagged(metrics.DomainTag(task.domainName)).RecordTimer(metrics.AsyncMatchLatency, time.Since(task.info.CreatedTime)) } } - response := common.CreateMatchingPollForDecisionTaskResponse(historyResponse, workflowExecutionPtr(context.workflowExecution), token) - if context.queryTaskInfo != nil { - response.Query = context.queryTaskInfo.queryRequest.QueryRequest.Query + response := common.CreateMatchingPollForDecisionTaskResponse(historyResponse, workflowExecutionPtr(task.workflowExecution), token) + if task.queryInfo != nil { + response.Query = task.queryInfo.queryRequest.QueryRequest.Query } - response.BacklogCountHint = common.Int64Ptr(context.backlogCountHint) + response.BacklogCountHint = common.Int64Ptr(task.backlogCountHint) return response } // Populate the activity task response based on context and scheduled/started events. -func (e *matchingEngineImpl) createPollForActivityTaskResponse(context *taskContext, - historyResponse *h.RecordActivityTaskStartedResponse) *workflow.PollForActivityTaskResponse { - task := context.info +func (e *matchingEngineImpl) createPollForActivityTaskResponse( + task *internalTask, + historyResponse *h.RecordActivityTaskStartedResponse, +) *workflow.PollForActivityTaskResponse { scheduledEvent := historyResponse.ScheduledEvent if scheduledEvent.ActivityTaskScheduledEventAttributes == nil { @@ -648,9 +639,9 @@ func (e *matchingEngineImpl) createPollForActivityTaskResponse(context *taskCont if attributes.ActivityId == nil { panic("ActivityTaskScheduledEventAttributes.ActivityID is not set") } - if context.syncResponseCh == nil { + if task.syncResponseCh == nil { scope := e.metricsClient.Scope(metrics.MatchingPollForActivityTaskScope) - scope.Tagged(metrics.DomainTag(context.domainName)).RecordTimer(metrics.AsyncMatchLatency, time.Since(task.CreatedTime)) + scope.Tagged(metrics.DomainTag(task.domainName)).RecordTimer(metrics.AsyncMatchLatency, time.Since(task.info.CreatedTime)) } response := &workflow.PollForActivityTaskResponse{} @@ -658,7 +649,7 @@ func (e *matchingEngineImpl) createPollForActivityTaskResponse(context *taskCont response.ActivityType = attributes.ActivityType response.Header = attributes.Header response.Input = attributes.Input - response.WorkflowExecution = workflowExecutionPtr(context.workflowExecution) + response.WorkflowExecution = workflowExecutionPtr(task.workflowExecution) response.ScheduledTimestampOfThisAttempt = historyResponse.ScheduledTimestampOfThisAttempt response.ScheduledTimestamp = common.Int64Ptr(*scheduledEvent.Timestamp) response.ScheduleToCloseTimeoutSeconds = common.Int32Ptr(*attributes.ScheduleToCloseTimeoutSeconds) @@ -667,10 +658,10 @@ func (e *matchingEngineImpl) createPollForActivityTaskResponse(context *taskCont response.HeartbeatTimeoutSeconds = common.Int32Ptr(*attributes.HeartbeatTimeoutSeconds) token := &common.TaskToken{ - DomainID: task.DomainID, - WorkflowID: task.WorkflowID, - RunID: task.RunID, - ScheduleID: task.ScheduleID, + DomainID: task.info.DomainID, + WorkflowID: task.info.WorkflowID, + RunID: task.info.RunID, + ScheduleID: task.info.ScheduleID, ScheduleAttempt: historyResponse.GetAttempt(), } @@ -682,6 +673,64 @@ func (e *matchingEngineImpl) createPollForActivityTaskResponse(context *taskCont return response } +func (e *matchingEngineImpl) recordDecisionTaskStarted( + ctx context.Context, + pollReq *workflow.PollForDecisionTaskRequest, + task *internalTask, +) (*h.RecordDecisionTaskStartedResponse, error) { + request := &h.RecordDecisionTaskStartedRequest{ + DomainUUID: &task.info.DomainID, + WorkflowExecution: &task.workflowExecution, + ScheduleId: &task.info.ScheduleID, + TaskId: &task.info.TaskID, + RequestId: common.StringPtr(uuid.New()), + PollRequest: pollReq, + } + var resp *h.RecordDecisionTaskStartedResponse + op := func() error { + var err error + resp, err = e.historyService.RecordDecisionTaskStarted(ctx, request) + return err + } + err := backoff.Retry(op, historyServiceOperationRetryPolicy, func(err error) bool { + switch err.(type) { + case *workflow.EntityNotExistsError, *h.EventAlreadyStartedError: + return false + } + return true + }) + return resp, err +} + +func (e *matchingEngineImpl) recordActivityTaskStarted( + ctx context.Context, + pollReq *workflow.PollForActivityTaskRequest, + task *internalTask, +) (*h.RecordActivityTaskStartedResponse, error) { + request := &h.RecordActivityTaskStartedRequest{ + DomainUUID: &task.info.DomainID, + WorkflowExecution: &task.workflowExecution, + ScheduleId: &task.info.ScheduleID, + TaskId: &task.info.TaskID, + RequestId: common.StringPtr(uuid.New()), + PollRequest: pollReq, + } + var resp *h.RecordActivityTaskStartedResponse + op := func() error { + var err error + resp, err = e.historyService.RecordActivityTaskStarted(ctx, request) + return err + } + err := backoff.Retry(op, historyServiceOperationRetryPolicy, func(err error) bool { + switch err.(type) { + case *workflow.EntityNotExistsError, *h.EventAlreadyStartedError: + return false + } + return true + }) + return resp, err +} + func newTaskListID(domainID, taskListName string, taskType int) *taskListID { return &taskListID{domainID: domainID, taskListName: taskListName, taskType: taskType} } diff --git a/service/matching/matchingEngineInterfaces.go b/service/matching/matchingEngineInterfaces.go index 43ea5ff93e7..1942d78166b 100644 --- a/service/matching/matchingEngineInterfaces.go +++ b/service/matching/matchingEngineInterfaces.go @@ -31,8 +31,8 @@ type ( // Engine exposes interfaces for clients to poll for activity and decision tasks. Engine interface { Stop() - AddDecisionTask(addRequest *m.AddDecisionTaskRequest) (syncMatch bool, err error) - AddActivityTask(addRequest *m.AddActivityTaskRequest) (syncMatch bool, err error) + AddDecisionTask(ctx context.Context, addRequest *m.AddDecisionTaskRequest) (syncMatch bool, err error) + AddActivityTask(ctx context.Context, addRequest *m.AddActivityTaskRequest) (syncMatch bool, err error) PollForDecisionTask(ctx context.Context, request *m.PollForDecisionTaskRequest) (*m.PollForDecisionTaskResponse, error) PollForActivityTask(ctx context.Context, request *m.PollForActivityTaskRequest) (*workflow.PollForActivityTaskResponse, error) QueryWorkflow(ctx context.Context, request *m.QueryWorkflowRequest) (*workflow.QueryWorkflowResponse, error) diff --git a/service/matching/matchingEngine_test.go b/service/matching/matchingEngine_test.go index 18eb1a89af9..56ddcae91fc 100644 --- a/service/matching/matchingEngine_test.go +++ b/service/matching/matchingEngine_test.go @@ -263,7 +263,7 @@ func (s *matchingEngineSuite) PollForDecisionTasksResultTest() { ScheduleToStartTimeoutSeconds: common.Int32Ptr(1), } - _, err := s.matchingEngine.AddDecisionTask(&addRequest) + _, err := s.matchingEngine.AddDecisionTask(context.Background(), &addRequest) s.NoError(err) taskList := &workflow.TaskList{} @@ -394,7 +394,7 @@ func (s *matchingEngineSuite) AddTasksTest(taskType int) { ScheduleToStartTimeoutSeconds: common.Int32Ptr(1), } - _, err = s.matchingEngine.AddActivityTask(&addRequest) + _, err = s.matchingEngine.AddActivityTask(context.Background(), &addRequest) } else { addRequest := matching.AddDecisionTaskRequest{ DomainUUID: common.StringPtr(domainID), @@ -404,7 +404,7 @@ func (s *matchingEngineSuite) AddTasksTest(taskType int) { ScheduleToStartTimeoutSeconds: common.Int32Ptr(1), } - _, err = s.matchingEngine.AddDecisionTask(&addRequest) + _, err = s.matchingEngine.AddDecisionTask(context.Background(), &addRequest) } s.NoError(err) } @@ -453,12 +453,12 @@ func (s *matchingEngineSuite) TestTaskWriterShutdown() { // now attempt to add a task scheduleID := int64(5) addRequest.ScheduleId = &scheduleID - _, err = s.matchingEngine.AddActivityTask(&addRequest) + _, err = s.matchingEngine.AddActivityTask(context.Background(), &addRequest) s.Error(err) // test race tlmImpl.taskWriter.stopped = 0 - _, err = s.matchingEngine.AddActivityTask(&addRequest) + _, err = s.matchingEngine.AddActivityTask(context.Background(), &addRequest) s.Error(err) tlmImpl.taskWriter.stopped = 1 // reset it back to old value } @@ -495,7 +495,7 @@ func (s *matchingEngineSuite) TestAddThenConsumeActivities() { ScheduleToStartTimeoutSeconds: common.Int32Ptr(1), } - _, err := s.matchingEngine.AddActivityTask(&addRequest) + _, err := s.matchingEngine.AddActivityTask(context.Background(), &addRequest) s.NoError(err) } s.EqualValues(taskCount, s.taskManager.getTaskCount(tlID)) @@ -598,12 +598,14 @@ func (s *matchingEngineSuite) TestSyncMatchActivities() { dispatchTTL := time.Nanosecond dPtr := _defaultTaskDispatchRPS - tlConfig, err := newTaskListConfig(tlID, s.matchingEngine.config, s.domainCache) + + mgr, err := newTaskListManager(s.matchingEngine, tlID, tlKind, s.matchingEngine.config) s.NoError(err) - mgr := newTaskListManagerWithRateLimiter( - s.matchingEngine, tlID, tlKind, s.domainCache, tlConfig, - newRateLimiter(&dPtr, dispatchTTL, _minBurst), - ) + + mgrImpl, ok := mgr.(*taskListManagerImpl) + s.True(ok) + + mgrImpl.matcher.limiter = newRateLimiter(&dPtr, dispatchTTL, _minBurst) s.matchingEngine.updateTaskList(tlID, mgr) s.taskManager.getTaskListManager(tlID).rangeID = initialRangeID s.NoError(mgr.Start()) @@ -637,11 +639,21 @@ func (s *matchingEngineSuite) TestSyncMatchActivities() { } }, nil) + pollFunc := func(maxDispatch float64) (*workflow.PollForActivityTaskResponse, error) { + return s.matchingEngine.PollForActivityTask(s.callContext, &matching.PollForActivityTaskRequest{ + DomainUUID: common.StringPtr(domainID), + PollRequest: &workflow.PollForActivityTaskRequest{ + TaskList: taskList, + Identity: &identity, + TaskListMetadata: &workflow.TaskListMetadata{MaxTasksPerSecond: &maxDispatch}, + }, + }) + } + for i := int64(0); i < taskCount; i++ { scheduleID := i * 3 var wg sync.WaitGroup - var result *workflow.PollForActivityTaskResponse var pollErr error maxDispatch := _defaultTaskDispatchRPS @@ -651,14 +663,7 @@ func (s *matchingEngineSuite) TestSyncMatchActivities() { wg.Add(1) go func() { defer wg.Done() - result, pollErr = s.matchingEngine.PollForActivityTask(s.callContext, &matching.PollForActivityTaskRequest{ - DomainUUID: common.StringPtr(domainID), - PollRequest: &workflow.PollForActivityTaskRequest{ - TaskList: taskList, - Identity: &identity, - TaskListMetadata: &workflow.TaskListMetadata{MaxTasksPerSecond: &maxDispatch}, - }, - }) + result, pollErr = pollFunc(maxDispatch) }() time.Sleep(20 * time.Millisecond) // Necessary for sync match to happen addRequest := matching.AddActivityTaskRequest{ @@ -669,15 +674,30 @@ func (s *matchingEngineSuite) TestSyncMatchActivities() { TaskList: taskList, ScheduleToStartTimeoutSeconds: common.Int32Ptr(1), } - _, err := s.matchingEngine.AddActivityTask(&addRequest) + _, err := s.matchingEngine.AddActivityTask(context.Background(), &addRequest) wg.Wait() s.NoError(err) s.NoError(pollErr) s.NotNil(result) + if len(result.TaskToken) == 0 { + // when ratelimit is set to zero, poller is expected to return empty result + // reset ratelimit, poll again and make sure task is returned this time s.logger.Debug(fmt.Sprintf("empty poll returned")) - continue + s.Equal(float64(0), maxDispatch) + maxDispatch = _defaultTaskDispatchRPS + wg.Add(1) + go func() { + defer wg.Done() + result, pollErr = pollFunc(maxDispatch) + }() + wg.Wait() + s.NoError(err) + s.NoError(pollErr) + s.NotNil(result) + s.True(len(result.TaskToken) > 0) } + s.EqualValues(activityID, *result.ActivityId) s.EqualValues(activityType, result.ActivityType) s.EqualValues(activityInput, result.Input) @@ -773,12 +793,12 @@ func (s *matchingEngineSuite) concurrentPublishConsumeActivities( dispatchTTL := time.Nanosecond s.matchingEngine.config.RangeSize = rangeSize // override to low number for the test dPtr := _defaultTaskDispatchRPS - tlConfig, err := newTaskListConfig(tlID, s.matchingEngine.config, s.domainCache) + + mgr, err := newTaskListManager(s.matchingEngine, tlID, tlKind, s.matchingEngine.config) s.NoError(err) - mgr := newTaskListManagerWithRateLimiter( - s.matchingEngine, tlID, tlKind, s.domainCache, tlConfig, - newRateLimiter(&dPtr, dispatchTTL, _minBurst), - ) + + mgrImpl := mgr.(*taskListManagerImpl) + mgrImpl.matcher.limiter = newRateLimiter(&dPtr, dispatchTTL, _minBurst) s.matchingEngine.updateTaskList(tlID, mgr) s.taskManager.getTaskListManager(tlID).rangeID = initialRangeID s.NoError(mgr.Start()) @@ -801,7 +821,7 @@ func (s *matchingEngineSuite) concurrentPublishConsumeActivities( ScheduleToStartTimeoutSeconds: common.Int32Ptr(1), } - _, err := s.matchingEngine.AddActivityTask(&addRequest) + _, err := s.matchingEngine.AddActivityTask(context.Background(), &addRequest) if err != nil { s.logger.Info("Failure in AddActivityTask", tag.Error(err)) i-- @@ -940,7 +960,7 @@ func (s *matchingEngineSuite) TestConcurrentPublishConsumeDecisions() { ScheduleToStartTimeoutSeconds: common.Int32Ptr(1), } - _, err := s.matchingEngine.AddDecisionTask(&addRequest) + _, err := s.matchingEngine.AddDecisionTask(context.Background(), &addRequest) if err != nil { panic(err) } @@ -1094,7 +1114,7 @@ func (s *matchingEngineSuite) TestMultipleEnginesActivitiesRangeStealing() { ScheduleToStartTimeoutSeconds: common.Int32Ptr(600), } - _, err := engine.AddActivityTask(&addRequest) + _, err := engine.AddActivityTask(context.Background(), &addRequest) if err != nil { if _, ok := err.(*persistence.ConditionFailedError); ok { i-- // retry adding @@ -1248,7 +1268,7 @@ func (s *matchingEngineSuite) TestMultipleEnginesDecisionsRangeStealing() { ScheduleToStartTimeoutSeconds: common.Int32Ptr(600), } - _, err := engine.AddDecisionTask(&addRequest) + _, err := engine.AddDecisionTask(context.Background(), &addRequest) if err != nil { if _, ok := err.(*persistence.ConditionFailedError); ok { i-- // retry adding @@ -1373,14 +1393,14 @@ func (s *matchingEngineSuite) TestAddTaskAfterStartFailure() { ScheduleToStartTimeoutSeconds: common.Int32Ptr(1), } - _, err := s.matchingEngine.AddActivityTask(&addRequest) + _, err := s.matchingEngine.AddActivityTask(context.Background(), &addRequest) s.NoError(err) s.EqualValues(1, s.taskManager.getTaskCount(tlID)) ctx, err := s.matchingEngine.getTask(context.Background(), tlID, nil, tlKind) s.NoError(err) - ctx.completeTask(errors.New("test error")) + ctx.finish(errors.New("test error")) s.EqualValues(1, s.taskManager.getTaskCount(tlID)) ctx2, err := s.matchingEngine.getTask(context.Background(), tlID, nil, tlKind) s.NoError(err) @@ -1390,7 +1410,7 @@ func (s *matchingEngineSuite) TestAddTaskAfterStartFailure() { s.Equal(ctx.info.RunID, ctx2.info.RunID) s.Equal(ctx.info.ScheduleID, ctx2.info.ScheduleID) - ctx2.completeTask(nil) + ctx2.finish(nil) s.EqualValues(0, s.taskManager.getTaskCount(tlID)) } @@ -1422,7 +1442,7 @@ func (s *matchingEngineSuite) TestTaskListManagerGetTaskBatch() { ScheduleToStartTimeoutSeconds: common.Int32Ptr(1), } - _, err := s.matchingEngine.AddActivityTask(&addRequest) + _, err := s.matchingEngine.AddActivityTask(context.Background(), &addRequest) s.NoError(err) } @@ -1432,8 +1452,8 @@ func (s *matchingEngineSuite) TestTaskListManagerGetTaskBatch() { // wait until all tasks are read by the task pump and enqeued into the in-memory buffer // at the end of this step, ackManager readLevel will also be equal to the buffer size - expectedBufSize := common.MinInt(cap(tlMgr.taskBuffer), taskCount) - s.True(s.awaitCondition(func() bool { return len(tlMgr.taskBuffer) == expectedBufSize }, time.Second)) + expectedBufSize := common.MinInt(cap(tlMgr.taskReader.taskBuffer), taskCount) + s.True(s.awaitCondition(func() bool { return len(tlMgr.taskReader.taskBuffer) == expectedBufSize }, time.Second)) // stop all goroutines that read / write tasks in the background // remainder of this test works with the in-memory buffer @@ -1446,14 +1466,14 @@ func (s *matchingEngineSuite) TestTaskListManagerGetTaskBatch() { // setReadLevel should NEVER be called without updating ackManager.outstandingTasks // This is only for unit test purpose tlMgr.taskAckManager.setReadLevel(tlMgr.taskWriter.GetMaxReadLevel()) - tasks, readLevel, isReadBatchDone, err := tlMgr.getTaskBatch() + tasks, readLevel, isReadBatchDone, err := tlMgr.taskReader.getTaskBatch() s.Nil(err) s.EqualValues(0, len(tasks)) s.EqualValues(tlMgr.taskWriter.GetMaxReadLevel(), readLevel) s.True(isReadBatchDone) tlMgr.taskAckManager.setReadLevel(0) - tasks, readLevel, isReadBatchDone, err = tlMgr.getTaskBatch() + tasks, readLevel, isReadBatchDone, err = tlMgr.taskReader.getTaskBatch() s.Nil(err) s.EqualValues(rangeSize, len(tasks)) s.EqualValues(rangeSize, readLevel) @@ -1484,7 +1504,7 @@ func (s *matchingEngineSuite) TestTaskListManagerGetTaskBatch() { } } s.EqualValues(taskCount-rangeSize, s.taskManager.getTaskCount(tlID)) - tasks, readLevel, isReadBatchDone, err = tlMgr.getTaskBatch() + tasks, readLevel, isReadBatchDone, err = tlMgr.taskReader.getTaskBatch() s.Nil(err) s.True(0 < len(tasks) && len(tasks) <= rangeSize) s.True(isReadBatchDone) @@ -1504,19 +1524,20 @@ func (s *matchingEngineSuite) TestTaskListManagerGetTaskBatch_ReadBatchDone() { config.RangeSize = rangeSize tlMgr0, err := newTaskListManager(s.matchingEngine, tlID, &tlNormal, config) s.NoError(err) + tlMgr, ok := tlMgr0.(*taskListManagerImpl) - s.True(ok, "taskListManger doesn't implement taskListManager interface") + s.True(ok) tlMgr.taskAckManager.setReadLevel(0) atomic.StoreInt64(&tlMgr.taskWriter.maxReadLevel, maxReadLevel) - tasks, readLevel, isReadBatchDone, err := tlMgr.getTaskBatch() + tasks, readLevel, isReadBatchDone, err := tlMgr.taskReader.getTaskBatch() s.Empty(tasks) s.Equal(int64(rangeSize*10), readLevel) s.False(isReadBatchDone) s.NoError(err) tlMgr.taskAckManager.setReadLevel(readLevel) - tasks, readLevel, isReadBatchDone, err = tlMgr.getTaskBatch() + tasks, readLevel, isReadBatchDone, err = tlMgr.taskReader.getTaskBatch() s.Empty(tasks) s.Equal(maxReadLevel, readLevel) s.True(isReadBatchDone) @@ -1566,7 +1587,7 @@ func (s *matchingEngineSuite) TestTaskExpiryAndCompletion() { // simulates creating a task whose scheduledToStartTimeout is already expired addRequest.ScheduleToStartTimeoutSeconds = common.Int32Ptr(-5) } - _, err := s.matchingEngine.AddActivityTask(&addRequest) + _, err := s.matchingEngine.AddActivityTask(context.Background(), &addRequest) s.NoError(err) } @@ -1576,7 +1597,7 @@ func (s *matchingEngineSuite) TestTaskExpiryAndCompletion() { // wait until all tasks are loaded by into in-memory buffers by task list manager // the buffer size should be one less than expected because dispatcher will dequeue the head - s.True(s.awaitCondition(func() bool { return len(tlMgr.taskBuffer) >= (taskCount/2 - 1) }, time.Second)) + s.True(s.awaitCondition(func() bool { return len(tlMgr.taskReader.taskBuffer) >= (taskCount/2 - 1) }, time.Second)) maxTimeBetweenTaskDeletes = tc.maxTimeBtwnDeletes s.matchingEngine.config.MaxTaskDeleteBatchSize = dynamicconfig.GetIntPropertyFilteredByTaskListInfo(tc.batchSize) diff --git a/service/matching/ratelimiter.go b/service/matching/ratelimiter.go index a46f4b70116..0c98793fbd5 100644 --- a/service/matching/ratelimiter.go +++ b/service/matching/ratelimiter.go @@ -99,6 +99,11 @@ func (rl *rateLimiter) Reserve() *rate.Reservation { return limiter.Reserve() } +func (rl *rateLimiter) Allow() bool { + limiter := rl.globalLimiter.Load().(*rate.Limiter) + return limiter.Allow() +} + // Limit returns the current rate per second limit for this ratelimiter func (rl *rateLimiter) Limit() float64 { if rl.maxDispatchPerSecond != nil { diff --git a/service/matching/service.go b/service/matching/service.go index 3ab564db3e2..df608291657 100644 --- a/service/matching/service.go +++ b/service/matching/service.go @@ -21,8 +21,6 @@ package matching import ( - "time" - "github.com/uber/cadence/common" "github.com/uber/cadence/common/log/loggerimpl" "github.com/uber/cadence/common/log/tag" @@ -31,50 +29,6 @@ import ( "github.com/uber/cadence/common/service/dynamicconfig" ) -// Config represents configuration for cadence-matching service -type Config struct { - PersistenceMaxQPS dynamicconfig.IntPropertyFn - EnableSyncMatch dynamicconfig.BoolPropertyFnWithTaskListInfoFilters - RPS dynamicconfig.IntPropertyFn - - // taskListManager configuration - RangeSize int64 - GetTasksBatchSize dynamicconfig.IntPropertyFnWithTaskListInfoFilters - UpdateAckInterval dynamicconfig.DurationPropertyFnWithTaskListInfoFilters - IdleTasklistCheckInterval dynamicconfig.DurationPropertyFnWithTaskListInfoFilters - MaxTasklistIdleTime dynamicconfig.DurationPropertyFnWithTaskListInfoFilters - // Time to hold a poll request before returning an empty response if there are no tasks - LongPollExpirationInterval dynamicconfig.DurationPropertyFnWithTaskListInfoFilters - MinTaskThrottlingBurstSize dynamicconfig.IntPropertyFnWithTaskListInfoFilters - MaxTaskDeleteBatchSize dynamicconfig.IntPropertyFnWithTaskListInfoFilters - - // taskWriter configuration - OutstandingTaskAppendsThreshold dynamicconfig.IntPropertyFnWithTaskListInfoFilters - MaxTaskBatchSize dynamicconfig.IntPropertyFnWithTaskListInfoFilters - - ThrottledLogRPS dynamicconfig.IntPropertyFn -} - -// NewConfig returns new service config with default values -func NewConfig(dc *dynamicconfig.Collection) *Config { - return &Config{ - PersistenceMaxQPS: dc.GetIntProperty(dynamicconfig.MatchingPersistenceMaxQPS, 3000), - EnableSyncMatch: dc.GetBoolPropertyFilteredByTaskListInfo(dynamicconfig.MatchingEnableSyncMatch, true), - RPS: dc.GetIntProperty(dynamicconfig.MatchingRPS, 1200), - RangeSize: 100000, - GetTasksBatchSize: dc.GetIntPropertyFilteredByTaskListInfo(dynamicconfig.MatchingGetTasksBatchSize, 1000), - UpdateAckInterval: dc.GetDurationPropertyFilteredByTaskListInfo(dynamicconfig.MatchingUpdateAckInterval, 1*time.Minute), - IdleTasklistCheckInterval: dc.GetDurationPropertyFilteredByTaskListInfo(dynamicconfig.MatchingIdleTasklistCheckInterval, 5*time.Minute), - MaxTasklistIdleTime: dc.GetDurationPropertyFilteredByTaskListInfo(dynamicconfig.MaxTasklistIdleTime, 5*time.Minute), - LongPollExpirationInterval: dc.GetDurationPropertyFilteredByTaskListInfo(dynamicconfig.MatchingLongPollExpirationInterval, time.Minute), - MinTaskThrottlingBurstSize: dc.GetIntPropertyFilteredByTaskListInfo(dynamicconfig.MatchingMinTaskThrottlingBurstSize, 1), - MaxTaskDeleteBatchSize: dc.GetIntPropertyFilteredByTaskListInfo(dynamicconfig.MatchingMaxTaskDeleteBatchSize, 100), - OutstandingTaskAppendsThreshold: dc.GetIntPropertyFilteredByTaskListInfo(dynamicconfig.MatchingOutstandingTaskAppendsThreshold, 250), - MaxTaskBatchSize: dc.GetIntPropertyFilteredByTaskListInfo(dynamicconfig.MatchingMaxTaskBatchSize, 100), - ThrottledLogRPS: dc.GetIntProperty(dynamicconfig.MatchingThrottledLogRPS, 20), - } -} - // Service represents the cadence-matching service type Service struct { stopC chan struct{} diff --git a/service/matching/task.go b/service/matching/task.go new file mode 100644 index 00000000000..73697ccb672 --- /dev/null +++ b/service/matching/task.go @@ -0,0 +1,93 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package matching + +import ( + m "github.com/uber/cadence/.gen/go/matching" + s "github.com/uber/cadence/.gen/go/shared" + "github.com/uber/cadence/common/persistence" +) + +type ( + queryTaskInfo struct { + taskID string + queryRequest *m.QueryWorkflowRequest + } + // internalTask represents an activity, decision or query task + // holds task specific info and additional metadata + internalTask struct { + info *persistence.TaskInfo + syncResponseCh chan error + workflowExecution s.WorkflowExecution + queryInfo *queryTaskInfo + backlogCountHint int64 + domainName string + completionFunc func(*internalTask, error) + } +) + +func newInternalTask( + info *persistence.TaskInfo, + completionFunc func(*internalTask, error), + forSyncMatch bool, +) *internalTask { + task := &internalTask{ + info: info, + completionFunc: completionFunc, + workflowExecution: s.WorkflowExecution{ + WorkflowId: &info.WorkflowID, + RunId: &info.RunID, + }, + } + if forSyncMatch { + task.syncResponseCh = make(chan error, 1) + } + return task +} + +func newInternalQueryTask( + queryInfo *queryTaskInfo, + completionFunc func(*internalTask, error), +) *internalTask { + return &internalTask{ + info: &persistence.TaskInfo{ + DomainID: queryInfo.queryRequest.GetDomainUUID(), + WorkflowID: queryInfo.queryRequest.QueryRequest.Execution.GetWorkflowId(), + RunID: queryInfo.queryRequest.QueryRequest.Execution.GetRunId(), + }, + completionFunc: completionFunc, + queryInfo: queryInfo, + workflowExecution: *queryInfo.queryRequest.QueryRequest.GetExecution(), + syncResponseCh: make(chan error, 1), + } +} + +// isQuery returns true if the underlying task is a query task +func (task *internalTask) isQuery() bool { + return task.queryInfo != nil +} + +// finish marks a task as finished. Should be called after a poller picks up a task +// and marks it as started. If the task is unable to marked as started, then this +// method should be called with a non-nil error argument. +func (task *internalTask) finish(err error) { + task.completionFunc(task, err) +} diff --git a/service/matching/taskListManager.go b/service/matching/taskListManager.go index cf87b693ce2..264cb223733 100644 --- a/service/matching/taskListManager.go +++ b/service/matching/taskListManager.go @@ -23,14 +23,11 @@ package matching import ( "bytes" "context" - "errors" "fmt" "sync" "sync/atomic" "time" - h "github.com/uber/cadence/.gen/go/history" - m "github.com/uber/cadence/.gen/go/matching" s "github.com/uber/cadence/.gen/go/shared" "github.com/uber/cadence/common" "github.com/uber/cadence/common/backoff" @@ -42,110 +39,61 @@ import ( ) const ( - done time.Duration = -1 - // Time budget for empty task to propagate through the function stack and be returned to // pollForActivityTask or pollForDecisionTask handler. returnEmptyTaskTimeBudget time.Duration = time.Second ) -// NOTE: Is this good enough for stress tests? -const ( - _defaultTaskDispatchRPS = 100000.0 - _defaultTaskDispatchRPSTTL = 60 * time.Second -) - -var errAddTasklistThrottled = errors.New("cannot add to tasklist, limit exceeded") - type ( + addTaskParams struct { + execution *s.WorkflowExecution + taskInfo *persistence.TaskInfo + } + taskListManager interface { Start() error Stop() - AddTask(execution *s.WorkflowExecution, taskInfo *persistence.TaskInfo) (syncMatch bool, err error) - GetTaskContext(ctx context.Context, maxDispatchPerSecond *float64) (*taskContext, error) - SyncMatchQueryTask(ctx context.Context, queryTask *queryTaskInfo) error + // AddTask adds a task to the task list. This method will first attempt a synchronous + // match with a poller. When that fails, task will be written to database and later + // asynchronously matched with a poller + AddTask(ctx context.Context, params addTaskParams) (syncMatch bool, err error) + // GetTask blocks waiting for a task Returns error when context deadline is exceeded + // maxDispatchPerSecond is the max rate at which tasks are allowed to be dispatched + // from this task list to pollers + GetTask(ctx context.Context, maxDispatchPerSecond *float64) (*internalTask, error) + // DispatchTask dispatches a task to a poller. When there are no pollers to pick + // up the task, this method will return error. Task will not be persisted to db + DispatchTask(ctx context.Context, task *internalTask) error + // DispatchQueryTask dispatches a query task to a poller. When there are no pollers + // to pick up the task, this method will return error. Task will not be persisted to + // db and no ratelimits are applied for this call + DispatchQueryTask(ctx context.Context, queryTask *queryTaskInfo) error CancelPoller(pollerID string) GetAllPollerInfo() []*s.PollerInfo + // DescribeTaskList returns information about the target tasklist DescribeTaskList(includeTaskListStatus bool) *s.DescribeTaskListResponse String() string } - taskListConfig struct { - EnableSyncMatch func() bool - // Time to hold a poll request before returning an empty response if there are no tasks - LongPollExpirationInterval func() time.Duration - RangeSize int64 - GetTasksBatchSize func() int - UpdateAckInterval func() time.Duration - IdleTasklistCheckInterval func() time.Duration - MaxTasklistIdleTime func() time.Duration - MinTaskThrottlingBurstSize func() int - MaxTaskDeleteBatchSize func() int - // taskWriter configuration - OutstandingTaskAppendsThreshold func() int - MaxTaskBatchSize func() int - } - - // Contains information needed for current task transition from queue to Workflow execution history. - taskContext struct { - tlMgr *taskListManagerImpl - info *persistence.TaskInfo - syncResponseCh chan<- *syncMatchResponse - workflowExecution s.WorkflowExecution - queryTaskInfo *queryTaskInfo - backlogCountHint int64 - domainName string - } - - queryTaskInfo struct { - taskID string - queryRequest *m.QueryWorkflowRequest - } - // Single task list in memory state taskListManagerImpl struct { - domainCache cache.DomainCache taskListID *taskListID + taskListKind int // sticky taskList has different process in persistence + config *taskListConfig + db *taskListDB + engine *matchingEngineImpl + taskWriter *taskWriter + taskReader *taskReader // reads tasks from db and async matches it with poller + taskGC *taskGC + taskAckManager ackManager // tracks ackLevel for delivered messages + matcher *TaskMatcher // for matching a task producer with a poller + domainCache cache.DomainCache logger log.Logger metricsClient metrics.Client domainNameValue atomic.Value domainScopeValue atomic.Value // domain tagged metric scope - engine *matchingEngineImpl - config *taskListConfig - // pollerHistory stores poller which poll from this tasklist in last few minutes pollerHistory *pollerHistory - - taskWriter *taskWriter - taskBuffer chan *persistence.TaskInfo // tasks loaded from persistence - // tasksForPoll is used to deliver tasks to pollers. - // It must to be unbuffered. addTask publishes to it asynchronously and expects publish to succeed - // only if there is waiting poll that consumes from it. Tasks in taskBuffer will blocking-add to - // this channel - tasksForPoll chan *getTaskResult - // queryTasksForPoll is used for delivering query tasks to pollers. - // It must be unbuffered as query tasks are always Sync Matched. We use a separate channel for query tasks because - // unlike activity/decision tasks, query tasks are enabled for dispatch on both active and standby clusters - queryTasksForPoll chan *getTaskResult - notifyCh chan struct{} // Used as signal to notify pump of new tasks - // Note: We need two shutdown channels so we can stop task pump independently of the deliverBuffer - // loop in getTasksPump in unit tests - shutdownCh chan struct{} // Delivers stop to the pump that populates taskBuffer - deliverBufferShutdownCh chan struct{} // Delivers stop to the pump that populates taskBuffer - startWG sync.WaitGroup // ensures that background processes do not start until setup is ready - stopped int32 - // The cancel objects are to cancel the ratelimiter Wait in deliverBufferTasksLoop. The ideal - // approach is to use request-scoped contexts and use a unique one for each call to Wait. However - // in order to cancel it on shutdown, we need a new goroutine for each call that would wait on - // the shutdown channel. To optimize on efficiency, we instead create one and tag it on the struct - // so the cancel can be called directly on shutdown. - cancelCtx context.Context - cancelFunc context.CancelFunc - - db *taskListDB - taskAckManager ackManager // tracks ackLevel for delivered messages - taskGC *taskGC - // outstandingPollsMap is needed to keep track of all outstanding pollers for a // particular tasklist. PollerID generated by frontend is used as the key and // CancelFunc is the value. This is used to cancel the context to unblock any @@ -153,130 +101,61 @@ type ( // prevent tasks being dispatched to zombie pollers. outstandingPollsLock sync.Mutex outstandingPollsMap map[string]context.CancelFunc - // Rate limiter for task dispatch - rateLimiter *rateLimiter - - taskListKind int // sticky taskList has different process in persistence - } - // getTaskResult contains task info and optional channel to notify createTask caller - // that task is successfully started and returned to a poller - getTaskResult struct { - task *persistence.TaskInfo - C chan *syncMatchResponse - queryTask *queryTaskInfo - syncMatch bool + shutdownCh chan struct{} // Delivers stop to the pump that populates taskBuffer + startWG sync.WaitGroup // ensures that background processes do not start until setup is ready + stopped int32 } +) - // syncMatchResponse result of sync match delivered to a createTask caller - syncMatchResponse struct { - response *persistence.CreateTasksResponse - err error - } +const ( + // maxSyncMatchWaitTime is the max amount of time that we are willing to wait for a sync match to happen + maxSyncMatchWaitTime = 200 * time.Millisecond ) -func newTaskListConfig(id *taskListID, config *Config, domainCache cache.DomainCache) (*taskListConfig, error) { - domainEntry, err := domainCache.GetDomainByID(id.domainID) - if err != nil { - return nil, err - } - - domain := domainEntry.GetInfo().Name - taskListName := id.taskListName - taskType := id.taskType - return &taskListConfig{ - RangeSize: config.RangeSize, - GetTasksBatchSize: func() int { - return config.GetTasksBatchSize(domain, taskListName, taskType) - }, - UpdateAckInterval: func() time.Duration { - return config.UpdateAckInterval(domain, taskListName, taskType) - }, - IdleTasklistCheckInterval: func() time.Duration { - return config.IdleTasklistCheckInterval(domain, taskListName, taskType) - }, - MaxTasklistIdleTime: func() time.Duration { - return config.MaxTasklistIdleTime(domain, taskListName, taskType) - }, - MinTaskThrottlingBurstSize: func() int { - return config.MinTaskThrottlingBurstSize(domain, taskListName, taskType) - }, - EnableSyncMatch: func() bool { - return config.EnableSyncMatch(domain, taskListName, taskType) - }, - LongPollExpirationInterval: func() time.Duration { - return config.LongPollExpirationInterval(domain, taskListName, taskType) - }, - MaxTaskDeleteBatchSize: func() int { - return config.MaxTaskDeleteBatchSize(domain, taskListName, taskType) - }, - OutstandingTaskAppendsThreshold: func() int { - return config.OutstandingTaskAppendsThreshold(domain, taskListName, taskType) - }, - MaxTaskBatchSize: func() int { - return config.MaxTaskBatchSize(domain, taskListName, taskType) - }, - }, nil -} +var _ taskListManager = (*taskListManagerImpl)(nil) func newTaskListManager( - e *matchingEngineImpl, taskList *taskListID, taskListKind *s.TaskListKind, config *Config, + e *matchingEngineImpl, + taskList *taskListID, + taskListKind *s.TaskListKind, + config *Config, ) (taskListManager, error) { - dPtr := _defaultTaskDispatchRPS + taskListConfig, err := newTaskListConfig(taskList, config, e.domainCache) if err != nil { return nil, err } - rl := newRateLimiter( - &dPtr, _defaultTaskDispatchRPSTTL, taskListConfig.MinTaskThrottlingBurstSize(), - ) - return newTaskListManagerWithRateLimiter( - e, taskList, taskListKind, e.domainCache, taskListConfig, rl, - ), nil -} -func newTaskListManagerWithRateLimiter( - e *matchingEngineImpl, taskList *taskListID, taskListKind *s.TaskListKind, - domainCache cache.DomainCache, config *taskListConfig, rl *rateLimiter, -) taskListManager { - // To perform one db operation if there are no pollers - taskBufferSize := config.GetTasksBatchSize() - 1 - ctx, cancel := context.WithCancel(context.Background()) if taskListKind == nil { taskListKind = common.TaskListKindPtr(s.TaskListKindNormal) } db := newTaskListDB(e.taskManager, taskList.domainID, taskList.taskListName, taskList.taskType, int(*taskListKind), e.logger) tlMgr := &taskListManagerImpl{ - domainCache: domainCache, - metricsClient: e.metricsClient, - engine: e, - taskBuffer: make(chan *persistence.TaskInfo, taskBufferSize), - notifyCh: make(chan struct{}, 1), - shutdownCh: make(chan struct{}), - deliverBufferShutdownCh: make(chan struct{}), - cancelCtx: ctx, - cancelFunc: cancel, - taskListID: taskList, + domainCache: e.domainCache, + metricsClient: e.metricsClient, + engine: e, + shutdownCh: make(chan struct{}), + taskListID: taskList, logger: e.logger.WithTags(tag.WorkflowTaskListName(taskList.taskListName), tag.WorkflowTaskListType(taskList.taskType)), db: db, taskAckManager: newAckManager(e.logger), - taskGC: newTaskGC(db, config), - tasksForPoll: make(chan *getTaskResult), - queryTasksForPoll: make(chan *getTaskResult), - config: config, + taskGC: newTaskGC(db, taskListConfig), + config: taskListConfig, pollerHistory: newPollerHistory(), outstandingPollsMap: make(map[string]context.CancelFunc), - rateLimiter: rl, taskListKind: int(*taskListKind), } tlMgr.domainNameValue.Store("") tlMgr.domainScopeValue.Store(e.metricsClient.Scope(metrics.MatchingTaskListMgrScope, metrics.DomainUnknownTag())) tlMgr.tryInitDomainNameAndScope() tlMgr.taskWriter = newTaskWriter(tlMgr) + tlMgr.taskReader = newTaskReader(tlMgr) + tlMgr.matcher = newTaskMatcher(taskListConfig, tlMgr.domainScope) tlMgr.startWG.Add(1) - return tlMgr + return tlMgr, nil } // Starts reading pump for the given task list. @@ -293,8 +172,7 @@ func (c *taskListManagerImpl) Start() error { c.taskAckManager.setAckLevel(state.ackLevel) c.taskWriter.Start(c.rangeIDToTaskIDBlock(state.rangeID)) - c.signalNewTask() - go c.getTasksPump() + c.taskReader.Start() return nil } @@ -304,130 +182,91 @@ func (c *taskListManagerImpl) Stop() { if !atomic.CompareAndSwapInt32(&c.stopped, 0, 1) { return } - close(c.deliverBufferShutdownCh) - c.cancelFunc() close(c.shutdownCh) c.taskWriter.Stop() + c.taskReader.Stop() c.engine.removeTaskListManager(c.taskListID) c.engine.removeTaskListManager(c.taskListID) c.logger.Info("", tag.LifeCycleStopped) } -func (c *taskListManagerImpl) AddTask(execution *s.WorkflowExecution, taskInfo *persistence.TaskInfo) (syncMatch bool, err error) { +// AddTask adds a task to the task list. This method will first attempt a synchronous +// match with a poller. When there are no pollers or if ratelimit is exceeded, task will +// be written to database and later asynchronously matched with a poller +func (c *taskListManagerImpl) AddTask(ctx context.Context, params addTaskParams) (bool, error) { c.startWG.Wait() - _, err = c.executeWithRetry(func() (interface{}, error) { + var syncMatch bool + _, err := c.executeWithRetry(func() (interface{}, error) { - domainEntry, err := c.domainCache.GetDomainByID(taskInfo.DomainID) + domainEntry, err := c.domainCache.GetDomainByID(params.taskInfo.DomainID) if err != nil { return nil, err } + if domainEntry.GetDomainNotActiveErr() != nil { - // domain not active, do not do sync match - r, err := c.taskWriter.appendTask(execution, taskInfo) + r, err := c.taskWriter.appendTask(params.execution, params.taskInfo) syncMatch = false return r, err } - r, err := c.trySyncMatch(taskInfo) - if (err != nil && err != errAddTasklistThrottled) || r != nil { - syncMatch = true - return r, err + syncMatch, err = c.trySyncMatch(ctx, params) + if syncMatch { + return &persistence.CreateTasksResponse{}, err } - r, err = c.taskWriter.appendTask(execution, taskInfo) + + r, err := c.taskWriter.appendTask(params.execution, params.taskInfo) syncMatch = false return r, err }) if err == nil { - c.signalNewTask() + c.taskReader.Signal() } return syncMatch, err } -func (c *taskListManagerImpl) SyncMatchQueryTask(ctx context.Context, queryTask *queryTaskInfo) error { - c.startWG.Wait() - - domainID := queryTask.queryRequest.GetDomainUUID() - we := queryTask.queryRequest.QueryRequest.Execution - taskInfo := &persistence.TaskInfo{ - DomainID: domainID, - RunID: we.GetRunId(), - WorkflowID: we.GetWorkflowId(), - } +// DispatchTask dispatches a task to a poller. When there are no pollers to pick +// up the task or if rate limit is exceeded, this method will return error. Task +// *will not* be persisted to db +func (c *taskListManagerImpl) DispatchTask(ctx context.Context, task *internalTask) error { + return c.matcher.MustOffer(ctx, task) +} - request := &getTaskResult{task: taskInfo, C: make(chan *syncMatchResponse, 1), queryTask: queryTask} - select { - case c.queryTasksForPoll <- request: - <-request.C - return nil - case <-ctx.Done(): +// DispatchQueryTask dispatches a query task to a poller. When there are no pollers +// to pick up the task, this method will return error. Task will not be persisted to +// db and no ratelimits will be applied for this call +func (c *taskListManagerImpl) DispatchQueryTask(ctx context.Context, queryTask *queryTaskInfo) error { + c.startWG.Wait() + task := newInternalQueryTask(queryTask, c.completeTask) + _, err := c.matcher.Offer(ctx, task) + if err == context.DeadlineExceeded { return &s.QueryFailedError{Message: "timeout: no workflow worker polling for given tasklist"} } + return err } -// Loads a task from DB or from sync match and wraps it in a task context -func (c *taskListManagerImpl) GetTaskContext( +// GetTask blocks waiting for a task. +// Returns error when context deadline is exceeded +// maxDispatchPerSecond is the max rate at which tasks are allowed +// to be dispatched from this task list to pollers +func (c *taskListManagerImpl) GetTask( ctx context.Context, maxDispatchPerSecond *float64, -) (*taskContext, error) { - result, err := c.getTask(ctx, maxDispatchPerSecond) +) (*internalTask, error) { + task, err := c.getTask(ctx, maxDispatchPerSecond) if err != nil { return nil, err } - task := result.task - workflowExecution := s.WorkflowExecution{ - WorkflowId: common.StringPtr(task.WorkflowID), - RunId: common.StringPtr(task.RunID), - } - - tCtx := &taskContext{ - info: task, - workflowExecution: workflowExecution, - tlMgr: c, - syncResponseCh: result.C, // nil if task is loaded from persistence - queryTaskInfo: result.queryTask, // non-nil for query task - backlogCountHint: c.taskAckManager.getBacklogCountHint(), - domainName: c.domainName(), - } - return tCtx, nil -} - -func (c *taskListManagerImpl) persistAckLevel() error { - return c.db.UpdateState(c.taskAckManager.getAckLevel()) -} - -func (c *taskListManagerImpl) getAckLevel() (ackLevel int64) { - return c.taskAckManager.getAckLevel() -} - -func (c *taskListManagerImpl) getTaskListKind() int { - // there is no need to lock here, - // since c.taskListKind is assigned when taskListManager been created and never changed. - return c.taskListKind -} - -// completeTaskPoll should be called after task poll is done even if append has failed. -// There is no correspondent initiateTaskPoll as append is initiated in getTasksPump -func (c *taskListManagerImpl) completeTaskPoll(taskID int64) int64 { - ackLevel := c.taskAckManager.completeTask(taskID) - c.taskGC.Run(ackLevel) - return ackLevel + task.domainName = c.domainName() + task.backlogCountHint = c.taskAckManager.getBacklogCountHint() + return task, nil } -// Loads task from taskBuffer (which is populated from persistence) or from sync match to add task call -func (c *taskListManagerImpl) getTask(ctx context.Context, maxDispatchPerSecond *float64) (*getTaskResult, error) { - childCtxTimeout := c.config.LongPollExpirationInterval() - if deadline, ok := ctx.Deadline(); ok { - // We need to set a shorter timeout than the original ctx; otherwise, by the time ctx deadline is - // reached, instead of emptyTask, context timeout error is returned to the frontend by the rpc stack, - // which counts against our SLO. By shortening the timeout by a very small amount, the emptyTask can be - // returned to the handler before a context timeout error is generated. - shortenedCtxTimeout := deadline.Sub(time.Now()) - returnEmptyTaskTimeBudget - if shortenedCtxTimeout < childCtxTimeout { - childCtxTimeout = shortenedCtxTimeout - } - } - // ChildCtx timeout will be the shorter of longPollExpirationInterval and shortened parent context timeout. - childCtx, cancel := context.WithTimeout(ctx, childCtxTimeout) +func (c *taskListManagerImpl) getTask(ctx context.Context, maxDispatchPerSecond *float64) (*internalTask, error) { + // We need to set a shorter timeout than the original ctx; otherwise, by the time ctx deadline is + // reached, instead of emptyTask, context timeout error is returned to the frontend by the rpc stack, + // which counts against our SLO. By shortening the timeout by a very small amount, the emptyTask can be + // returned to the handler before a context timeout error is generated. + childCtx, cancel := c.newChildContext(ctx, c.config.LongPollExpirationInterval(), returnEmptyTaskTimeBudget) defer cancel() pollerID, ok := ctx.Value(pollerIDKey).(string) @@ -449,40 +288,28 @@ func (c *taskListManagerImpl) getTask(ctx context.Context, maxDispatchPerSecond c.pollerHistory.updatePollerInfo(pollerIdentity(identity), maxDispatchPerSecond) } - var tasksForPoll chan *getTaskResult domainEntry, err := c.domainCache.GetDomainByID(c.taskListID.domainID) if err != nil { return nil, err } - if domainEntry.GetDomainNotActiveErr() == nil { - // domain active - tasksForPoll = c.tasksForPoll - } // the desired global rate limit for the task list comes from the // poller, which lives inside the client side worker. There is // one rateLimiter for this entire task list and as we get polls, // we update the ratelimiter rps if it has changed from the last // value. Last poller wins if different pollers provide different values - c.rateLimiter.UpdateMaxDispatch(maxDispatchPerSecond) + c.matcher.UpdateRatelimit(maxDispatchPerSecond) - select { - case result := <-tasksForPoll: - if result.syncMatch { - c.domainScope().IncCounter(metrics.PollSuccessWithSyncCounter) - } - c.domainScope().IncCounter(metrics.PollSuccessCounter) - return result, nil - case result := <-c.queryTasksForPoll: - if result.syncMatch { - c.domainScope().IncCounter(metrics.PollSuccessWithSyncCounter) - } - c.domainScope().IncCounter(metrics.PollSuccessCounter) - return result, nil - case <-childCtx.Done(): - c.domainScope().IncCounter(metrics.PollTimeoutCounter) - return nil, ErrNoTasks + if domainEntry.GetDomainNotActiveErr() != nil { + return c.matcher.PollForQuery(childCtx) } + + return c.matcher.Poll(childCtx) +} + +// GetAllPollerInfo returns all pollers that polled from this tasklist in last few minutes +func (c *taskListManagerImpl) GetAllPollerInfo() []*s.PollerInfo { + return c.pollerHistory.getAllPollerInfo() } func (c *taskListManagerImpl) CancelPoller(pollerID string) { @@ -495,6 +322,89 @@ func (c *taskListManagerImpl) CancelPoller(pollerID string) { } } +// DescribeTaskList returns information about the target tasklist, right now this API returns the +// pollers which polled this tasklist in last few minutes and status of tasklist's ackManager +// (readLevel, ackLevel, backlogCountHint and taskIDBlock). +func (c *taskListManagerImpl) DescribeTaskList(includeTaskListStatus bool) *s.DescribeTaskListResponse { + response := &s.DescribeTaskListResponse{Pollers: c.GetAllPollerInfo()} + if !includeTaskListStatus { + return response + } + + taskIDBlock := c.rangeIDToTaskIDBlock(c.db.RangeID()) + response.TaskListStatus = &s.TaskListStatus{ + ReadLevel: common.Int64Ptr(c.taskAckManager.getReadLevel()), + AckLevel: common.Int64Ptr(c.taskAckManager.getAckLevel()), + BacklogCountHint: common.Int64Ptr(c.taskAckManager.getBacklogCountHint()), + RatePerSecond: common.Float64Ptr(c.matcher.Rate()), + TaskIDBlock: &s.TaskIDBlock{ + StartID: common.Int64Ptr(taskIDBlock.start), + EndID: common.Int64Ptr(taskIDBlock.end), + }, + } + + return response +} + +func (c *taskListManagerImpl) String() string { + buf := new(bytes.Buffer) + if c.taskListID.taskType == persistence.TaskListTypeActivity { + buf.WriteString("Activity") + } else { + buf.WriteString("Decision") + } + rangeID := c.db.RangeID() + fmt.Fprintf(buf, " task list %v\n", c.taskListID.taskListName) + fmt.Fprintf(buf, "RangeID=%v\n", rangeID) + fmt.Fprintf(buf, "TaskIDBlock=%+v\n", c.rangeIDToTaskIDBlock(rangeID)) + fmt.Fprintf(buf, "AckLevel=%v\n", c.taskAckManager.ackLevel) + fmt.Fprintf(buf, "MaxReadLevel=%v\n", c.taskAckManager.getReadLevel()) + + return buf.String() +} + +// completeTask marks a task as processed. If this task was synchronously matched, a notification +// is sent in the syncMatch response channel to be picked by addTask goroutine. If this task was +// created by taskReader (i.e. backlog from db): +// - it is deleted from the database when err is nil +// - new task is created and current task is deleted when err is not nil +func (c *taskListManagerImpl) completeTask(task *internalTask, err error) { + if task.syncResponseCh != nil { + // It is OK to succeed task creation as it was already completed + task.syncResponseCh <- err + return + } + + if err != nil { + // failed to start the task. + // We cannot just remove it from persistence because then it will be lost. + // We handle this by writing the task back to persistence with a higher taskID. + // This will allow subsequent tasks to make progress, and hopefully by the time this task is picked-up + // again the underlying reason for failing to start will be resolved. + // Note that RecordTaskStarted only fails after retrying for a long time, so a single task will not be + // re-written to persistence frequently. + _, err = c.executeWithRetry(func() (interface{}, error) { + return c.taskWriter.appendTask(&task.workflowExecution, task.info) + }) + + if err != nil { + // OK, we also failed to write to persistence. + // This should only happen in very extreme cases where persistence is completely down. + // We still can't lose the old task so we just unload the entire task list + c.logger.Error("Persistent store operation failure", + tag.StoreOperationStopTaskList, + tag.Error(err), + tag.WorkflowTaskListName(c.taskListID.taskListName), + tag.WorkflowTaskListType(c.taskListID.taskType)) + c.Stop() + return + } + c.taskReader.Signal() + } + ackLevel := c.taskAckManager.completeTask(task.info.TaskID) + c.taskGC.Run(ackLevel) +} + func (c *taskListManagerImpl) renewLeaseWithRetry() (taskListState, error) { var newState taskListState op := func() (err error) { @@ -531,82 +441,14 @@ func (c *taskListManagerImpl) allocTaskIDBlock(prevBlockEnd int64) (taskIDBlock, return c.rangeIDToTaskIDBlock(state.rangeID), nil } -func (c *taskListManagerImpl) String() string { - buf := new(bytes.Buffer) - if c.taskListID.taskType == persistence.TaskListTypeActivity { - buf.WriteString("Activity") - } else { - buf.WriteString("Decision") - } - rangeID := c.db.RangeID() - fmt.Fprintf(buf, " task list %v\n", c.taskListID.taskListName) - fmt.Fprintf(buf, "RangeID=%v\n", rangeID) - fmt.Fprintf(buf, "TaskIDBlock=%+v\n", c.rangeIDToTaskIDBlock(rangeID)) - fmt.Fprintf(buf, "AckLevel=%v\n", c.taskAckManager.ackLevel) - fmt.Fprintf(buf, "MaxReadLevel=%v\n", c.taskAckManager.getReadLevel()) - - return buf.String() -} - -// getAllPollerInfo return poller which poll from this tasklist in last few minutes -func (c *taskListManagerImpl) GetAllPollerInfo() []*s.PollerInfo { - return c.pollerHistory.getAllPollerInfo() -} - -// DescribeTaskList returns information about the target tasklist, right now this API returns the -// pollers which polled this tasklist in last few minutes and status of tasklist's ackManager -// (readLevel, ackLevel, backlogCountHint and taskIDBlock). -func (c *taskListManagerImpl) DescribeTaskList(includeTaskListStatus bool) *s.DescribeTaskListResponse { - response := &s.DescribeTaskListResponse{Pollers: c.GetAllPollerInfo()} - if !includeTaskListStatus { - return response - } - - taskIDBlock := c.rangeIDToTaskIDBlock(c.db.RangeID()) - response.TaskListStatus = &s.TaskListStatus{ - ReadLevel: common.Int64Ptr(c.taskAckManager.getReadLevel()), - AckLevel: common.Int64Ptr(c.taskAckManager.getAckLevel()), - BacklogCountHint: common.Int64Ptr(c.taskAckManager.getBacklogCountHint()), - RatePerSecond: common.Float64Ptr(c.rateLimiter.Limit()), - TaskIDBlock: &s.TaskIDBlock{ - StartID: common.Int64Ptr(taskIDBlock.start), - EndID: common.Int64Ptr(taskIDBlock.end), - }, - } - - return response +func (c *taskListManagerImpl) getAckLevel() (ackLevel int64) { + return c.taskAckManager.getAckLevel() } -// Tries to match task to a poller that is already waiting on getTask. -// When this method returns non nil response without error it is guaranteed that the task is started -// and sent to a poller. So it not necessary to persist it. -// Returns (nil, nil) if there is no waiting poller which indicates that task has to be persisted. -func (c *taskListManagerImpl) trySyncMatch(task *persistence.TaskInfo) (*persistence.CreateTasksResponse, error) { - if !c.config.EnableSyncMatch() { - return nil, nil - } - // Request from the point of view of Add(Activity|Decision)Task operation. - // But it is getTask result from the point of view of a poll operation. - request := &getTaskResult{task: task, C: make(chan *syncMatchResponse, 1), syncMatch: true} - - rsv := c.rateLimiter.Reserve() - // If we have to wait too long for reservation, better to store in task buffer and handle later. - if !rsv.OK() || rsv.Delay() > time.Second { - if rsv.OK() { // if we were indeed given a reservation, return it before we bail out - rsv.Cancel() - } - c.domainScope().IncCounter(metrics.SyncThrottleCounter) - return nil, errAddTasklistThrottled - } - time.Sleep(rsv.Delay()) - select { - case c.tasksForPoll <- request: // poller goroutine picked up the task - r := <-request.C - return r.response, r.err - default: // no poller waiting for tasks - rsv.Cancel() - return nil, nil - } +func (c *taskListManagerImpl) getTaskListKind() int { + // there is no need to lock here, + // since c.taskListKind is assigned when taskListManager been created and never changed. + return c.taskListKind } // Retry operation on transient error. On rangeID update by another process calls c.Stop(). @@ -635,99 +477,44 @@ func (c *taskListManagerImpl) executeWithRetry( return } -func (c *taskListManagerImpl) signalNewTask() { - var event struct{} +func (c *taskListManagerImpl) trySyncMatch(ctx context.Context, params addTaskParams) (bool, error) { + childCtx, cancel := c.newChildContext(ctx, maxSyncMatchWaitTime, time.Second) + matched, err := c.matcher.Offer(childCtx, newInternalTask(params.taskInfo, c.completeTask, true)) + cancel() + return matched, err +} + +// newChildContext creates a child context with desired timeout. +// if tailroom is non-zero, then child context timeout will be +// the minOf(parentCtx.Deadline()-tailroom, timeout). Use this +// method to create child context when childContext cannot use +// all of parent's deadline but instead there is a need to leave +// some time for parent to do some post-work +func (c *taskListManagerImpl) newChildContext( + parent context.Context, + timeout time.Duration, + tailroom time.Duration, +) (context.Context, context.CancelFunc) { select { - case c.notifyCh <- event: - default: // channel already has an event, don't block + case <-parent.Done(): + return parent, func() {} + default: } -} - -func (c *taskContext) RecordDecisionTaskStartedWithRetry(ctx context.Context, - request *h.RecordDecisionTaskStartedRequest) (resp *h.RecordDecisionTaskStartedResponse, err error) { - op := func() error { - var err error - resp, err = c.tlMgr.engine.historyService.RecordDecisionTaskStarted(ctx, request) - return err + deadline, ok := parent.Deadline() + if !ok { + return context.WithTimeout(parent, timeout) } - err = backoff.Retry(op, historyServiceOperationRetryPolicy, func(err error) bool { - switch err.(type) { - case *s.EntityNotExistsError, *h.EventAlreadyStartedError: - return false - } - return true - }) - return -} - -func (c *taskContext) RecordActivityTaskStartedWithRetry(ctx context.Context, - request *h.RecordActivityTaskStartedRequest) (resp *h.RecordActivityTaskStartedResponse, err error) { - op := func() error { - var err error - resp, err = c.tlMgr.engine.historyService.RecordActivityTaskStarted(ctx, request) - return err + remaining := deadline.Sub(time.Now()) - tailroom + if remaining < timeout { + timeout = time.Duration(common.MaxInt64(0, int64(remaining))) } - err = backoff.Retry(op, historyServiceOperationRetryPolicy, func(err error) bool { - switch err.(type) { - case *s.EntityNotExistsError, *h.EventAlreadyStartedError: - return false - } - return true - }) - return -} - -// If poll received task from addTask directly the addTask goroutine is notified about start task result. -// If poll received task from persistence then task is deleted from it if no error was reported. -func (c *taskContext) completeTask(err error) { - tlMgr := c.tlMgr - tlMgr.logger.Debug(fmt.Sprintf("completeTask task taskList=%v, taskID=%v, err=%v", - tlMgr.taskListID.taskListName, c.info.TaskID, err)) - if c.syncResponseCh != nil { - // It is OK to succeed task creation as it was already completed - c.syncResponseCh <- &syncMatchResponse{ - response: &persistence.CreateTasksResponse{}, err: err} - return - } - - if err != nil { - // failed to start the task. - // We cannot just remove it from persistence because then it will be lost. - // We handle this by writing the task back to persistence with a higher taskID. - // This will allow subsequent tasks to make progress, and hopefully by the time this task is picked-up - // again the underlying reason for failing to start will be resolved. - // Note that RecordTaskStarted only fails after retrying for a long time, so a single task will not be - // re-written to persistence frequently. - _, err = tlMgr.executeWithRetry(func() (interface{}, error) { - return tlMgr.taskWriter.appendTask(&c.workflowExecution, c.info) - }) - - if err != nil { - // OK, we also failed to write to persistence. - // This should only happen in very extreme cases where persistence is completely down. - // We still can't lose the old task so we just unload the entire task list - tlMgr.logger.Error("Persistent store operation failure", - tag.StoreOperationStopTaskList, - tag.Error(err), - tag.WorkflowTaskListName(tlMgr.taskListID.taskListName), - tag.WorkflowTaskListType(tlMgr.taskListID.taskType)) - tlMgr.Stop() - return - } - tlMgr.signalNewTask() - } - - tlMgr.completeTaskPoll(c.info.TaskID) + return context.WithTimeout(parent, timeout) } func createServiceBusyError(msg string) *s.ServiceBusyError { return &s.ServiceBusyError{Message: msg} } -func (c *taskListManagerImpl) isTaskAddedRecently(lastAddTime time.Time) bool { - return time.Now().Sub(lastAddTime) <= c.config.MaxTasklistIdleTime() -} - func (c *taskListManagerImpl) domainScope() metrics.Scope { scope := c.domainScopeValue.Load().(metrics.Scope) if scope != nil { diff --git a/service/matching/taskListManager_test.go b/service/matching/taskListManager_test.go index 4a1f096a316..2d02e19069e 100644 --- a/service/matching/taskListManager_test.go +++ b/service/matching/taskListManager_test.go @@ -26,6 +26,8 @@ import ( "testing" "time" + "context" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" @@ -44,9 +46,15 @@ const _minBurst = 10000 func TestDeliverBufferTasks(t *testing.T) { tests := []func(tlm *taskListManagerImpl){ - func(tlm *taskListManagerImpl) { close(tlm.taskBuffer) }, - func(tlm *taskListManagerImpl) { close(tlm.deliverBufferShutdownCh) }, - func(tlm *taskListManagerImpl) { tlm.cancelFunc() }, + func(tlm *taskListManagerImpl) { close(tlm.taskReader.taskBuffer) }, + func(tlm *taskListManagerImpl) { close(tlm.taskReader.dispatcherShutdownC) }, + func(tlm *taskListManagerImpl) { + rps := 0.1 + tlm.matcher.UpdateRatelimit(&rps) + tlm.taskReader.taskBuffer <- &persistence.TaskInfo{} + tlm.matcher.ratelimit(context.Background()) // consume the token + tlm.taskReader.cancelFunc() + }, } for _, test := range tests { tlm := createTestTaskListManager() @@ -54,25 +62,25 @@ func TestDeliverBufferTasks(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - tlm.deliverBufferTasksForPoll() + tlm.taskReader.dispatchBufferedTasks() }() test(tlm) - // deliverBufferTasksForPoll should stop after invocation of the test function + // dispatchBufferedTasks should stop after invocation of the test function wg.Wait() } } func TestDeliverBufferTasks_NoPollers(t *testing.T) { tlm := createTestTaskListManager() - tlm.taskBuffer <- &persistence.TaskInfo{} + tlm.taskReader.taskBuffer <- &persistence.TaskInfo{} var wg sync.WaitGroup wg.Add(1) go func() { - tlm.deliverBufferTasksForPoll() + tlm.taskReader.dispatchBufferedTasks() wg.Done() }() time.Sleep(100 * time.Millisecond) // let go routine run first and block on tasksForPoll - close(tlm.deliverBufferShutdownCh) + tlm.taskReader.cancelFunc() wg.Wait() } @@ -111,10 +119,10 @@ func createTestTaskListManagerWithConfig(cfg *Config) *taskListManagerImpl { func TestIsTaskAddedRecently(t *testing.T) { tlm := createTestTaskListManager() - require.True(t, tlm.isTaskAddedRecently(time.Now())) - require.False(t, tlm.isTaskAddedRecently(time.Now().Add(-tlm.config.MaxTasklistIdleTime()))) - require.True(t, tlm.isTaskAddedRecently(time.Now().Add(1*time.Second))) - require.False(t, tlm.isTaskAddedRecently(time.Time{})) + require.True(t, tlm.taskReader.isTaskAddedRecently(time.Now())) + require.False(t, tlm.taskReader.isTaskAddedRecently(time.Now().Add(-tlm.config.MaxTasklistIdleTime()))) + require.True(t, tlm.taskReader.isTaskAddedRecently(time.Now().Add(1*time.Second))) + require.False(t, tlm.taskReader.isTaskAddedRecently(time.Time{})) } func TestDescribeTaskList(t *testing.T) { @@ -177,7 +185,8 @@ func TestDescribeTaskList(t *testing.T) { func tlMgrStartWithoutNotifyEvent(tlm *taskListManagerImpl) { // mimic tlm.Start() but avoid calling notifyEvent tlm.startWG.Done() - go tlm.getTasksPump() + go tlm.taskReader.dispatchBufferedTasks() + go tlm.taskReader.getTasksPump() } func TestCheckIdleTaskList(t *testing.T) { @@ -204,7 +213,7 @@ func TestCheckIdleTaskList(t *testing.T) { tlm = createTestTaskListManagerWithConfig(cfg) require.Equal(t, 0, len(tlm.GetAllPollerInfo())) tlMgrStartWithoutNotifyEvent(tlm) - tlm.signalNewTask() + tlm.taskReader.Signal() time.Sleep(20 * time.Millisecond) require.Equal(t, int32(0), tlm.stopped) tlm.Stop() diff --git a/service/matching/taskReader.go b/service/matching/taskReader.go index 498d0f33426..be2719b8469 100644 --- a/service/matching/taskReader.go +++ b/service/matching/taskReader.go @@ -22,10 +22,10 @@ package matching import ( "context" - "fmt" "runtime" "time" + "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/log/tag" "github.com/uber/cadence/common/metrics" "github.com/uber/cadence/common/persistence" @@ -33,103 +33,147 @@ import ( var epochStartTime = time.Unix(0, 0) -func (c *taskListManagerImpl) deliverBufferTasksForPoll() { -deliverBufferTasksLoop: +type ( + taskReader struct { + taskBuffer chan *persistence.TaskInfo // tasks loaded from persistence + notifyC chan struct{} // Used as signal to notify pump of new tasks + tlMgr *taskListManagerImpl + // The cancel objects are to cancel the ratelimiter Wait in dispatchBufferedTasks. The ideal + // approach is to use request-scoped contexts and use a unique one for each call to Wait. However + // in order to cancel it on shutdown, we need a new goroutine for each call that would wait on + // the shutdown channel. To optimize on efficiency, we instead create one and tag it on the struct + // so the cancel can be called directly on shutdown. + cancelCtx context.Context + cancelFunc context.CancelFunc + // separate shutdownC needed for dispatchTasks go routine to allow + // getTasksPump to be stopped without stopping dispatchTasks in unit tests + dispatcherShutdownC chan struct{} + } +) + +func newTaskReader(tlMgr *taskListManagerImpl) *taskReader { + ctx, cancel := context.WithCancel(context.Background()) + return &taskReader{ + tlMgr: tlMgr, + cancelCtx: ctx, + cancelFunc: cancel, + notifyC: make(chan struct{}, 1), + dispatcherShutdownC: make(chan struct{}), + // we always dequeue the head of the buffer and try to dispatch it to a poller + // so allocate one less than desired target buffer size + taskBuffer: make(chan *persistence.TaskInfo, tlMgr.config.GetTasksBatchSize()-1), + } +} + +func (tr *taskReader) Start() { + tr.Signal() + go tr.dispatchBufferedTasks() + go tr.getTasksPump() +} + +func (tr *taskReader) Stop() { + tr.cancelFunc() + close(tr.dispatcherShutdownC) +} + +func (tr *taskReader) Signal() { + var event struct{} + select { + case tr.notifyC <- event: + default: // channel already has an event, don't block + } +} + +func (tr *taskReader) dispatchBufferedTasks() { +dispatchLoop: for { - err := c.rateLimiter.Wait(c.cancelCtx) - if err != nil { - if err == context.Canceled { - c.logger.Info("Tasklist manager context is cancelled, shutting down") - break deliverBufferTasksLoop - } - c.logger.Debug(fmt.Sprintf( - "Unable to add buffer task, rate limit failed, domainId: %s, tasklist: %s, error: %s", - c.taskListID.domainID, c.taskListID.taskListName, err.Error()), - ) - c.domainScope().IncCounter(metrics.BufferThrottleCounter) - // This is to prevent busy looping when throttling is set to 0 - runtime.Gosched() - continue - } select { - case task, ok := <-c.taskBuffer: + case taskInfo, ok := <-tr.taskBuffer: if !ok { // Task list getTasks pump is shutdown - break deliverBufferTasksLoop + break dispatchLoop } - select { - case c.tasksForPoll <- &getTaskResult{task: task}: - case <-c.deliverBufferShutdownCh: - break deliverBufferTasksLoop + task := newInternalTask(taskInfo, tr.tlMgr.completeTask, false) + for { + err := tr.tlMgr.DispatchTask(tr.cancelCtx, task) + if err == nil { + break + } + if err == context.Canceled { + tr.tlMgr.logger.Info("Tasklist manager context is cancelled, shutting down") + break dispatchLoop + } + // this should never happen unless there is a bug - don't drop the task + tr.scope().IncCounter(metrics.BufferThrottleCounter) + tr.logger().Error("taskReader: unexpected error dispatching task", tag.Error(err)) + runtime.Gosched() } - case <-c.deliverBufferShutdownCh: - break deliverBufferTasksLoop + case <-tr.dispatcherShutdownC: + break dispatchLoop } } } -func (c *taskListManagerImpl) getTasksPump() { - defer close(c.taskBuffer) - c.startWG.Wait() +func (tr *taskReader) getTasksPump() { + tr.tlMgr.startWG.Wait() + defer close(tr.taskBuffer) - go c.deliverBufferTasksForPoll() - updateAckTimer := time.NewTimer(c.config.UpdateAckInterval()) - checkIdleTaskListTimer := time.NewTimer(c.config.IdleTasklistCheckInterval()) + updateAckTimer := time.NewTimer(tr.tlMgr.config.UpdateAckInterval()) + checkIdleTaskListTimer := time.NewTimer(tr.tlMgr.config.IdleTasklistCheckInterval()) lastTimeWriteTask := time.Time{} getTasksPumpLoop: for { select { - case <-c.shutdownCh: + case <-tr.tlMgr.shutdownCh: break getTasksPumpLoop - case <-c.notifyCh: + case <-tr.notifyC: { lastTimeWriteTask = time.Now() - tasks, readLevel, isReadBatchDone, err := c.getTaskBatch() + tasks, readLevel, isReadBatchDone, err := tr.getTaskBatch() if err != nil { - c.signalNewTask() // re-enqueue the event + tr.Signal() // re-enqueue the event // TODO: Should we ever stop retrying on db errors? continue getTasksPumpLoop } if len(tasks) == 0 { - c.taskAckManager.setReadLevel(readLevel) + tr.tlMgr.taskAckManager.setReadLevel(readLevel) if !isReadBatchDone { - c.signalNewTask() + tr.Signal() } continue getTasksPumpLoop } - if !c.addTasksToBuffer(tasks, lastTimeWriteTask, checkIdleTaskListTimer) { + if !tr.addTasksToBuffer(tasks, lastTimeWriteTask, checkIdleTaskListTimer) { break getTasksPumpLoop } // There maybe more tasks. We yield now, but signal pump to check again later. - c.signalNewTask() + tr.Signal() } case <-updateAckTimer.C: { - err := c.persistAckLevel() - //var err error + err := tr.persistAckLevel() if err != nil { if _, ok := err.(*persistence.ConditionFailedError); ok { // This indicates the task list may have moved to another host. - c.Stop() + tr.tlMgr.Stop() } else { - c.logger.Error("Persistent store operation failure", + tr.logger().Error("Persistent store operation failure", tag.StoreOperationUpdateTaskList, tag.Error(err)) } // keep going as saving ack is not critical } - c.signalNewTask() // periodically signal pump to check persistence for tasks - updateAckTimer = time.NewTimer(c.config.UpdateAckInterval()) + tr.Signal() // periodically signal pump to check persistence for tasks + updateAckTimer = time.NewTimer(tr.tlMgr.config.UpdateAckInterval()) } case <-checkIdleTaskListTimer.C: { - if c.isIdle(lastTimeWriteTask) { - c.handleIdleTimeout() + if tr.isIdle(lastTimeWriteTask) { + tr.handleIdleTimeout() break getTasksPumpLoop } - checkIdleTaskListTimer = time.NewTimer(c.config.IdleTasklistCheckInterval()) + checkIdleTaskListTimer = time.NewTimer(tr.tlMgr.config.IdleTasklistCheckInterval()) } } } @@ -138,9 +182,9 @@ getTasksPumpLoop: checkIdleTaskListTimer.Stop() } -func (c *taskListManagerImpl) getTaskBatchWithRange(readLevel int64, maxReadLevel int64) ([]*persistence.TaskInfo, error) { - response, err := c.executeWithRetry(func() (interface{}, error) { - return c.db.GetTasks(readLevel, maxReadLevel, c.config.GetTasksBatchSize()) +func (tr *taskReader) getTaskBatchWithRange(readLevel int64, maxReadLevel int64) ([]*persistence.TaskInfo, error) { + response, err := tr.tlMgr.executeWithRetry(func() (interface{}, error) { + return tr.tlMgr.db.GetTasks(readLevel, maxReadLevel, tr.tlMgr.config.GetTasksBatchSize()) }) if err != nil { return nil, err @@ -151,18 +195,18 @@ func (c *taskListManagerImpl) getTaskBatchWithRange(readLevel int64, maxReadLeve // Returns a batch of tasks from persistence starting form current read level. // Also return a number that can be used to update readLevel // Also return a bool to indicate whether read is finished -func (c *taskListManagerImpl) getTaskBatch() ([]*persistence.TaskInfo, int64, bool, error) { +func (tr *taskReader) getTaskBatch() ([]*persistence.TaskInfo, int64, bool, error) { var tasks []*persistence.TaskInfo - readLevel := c.taskAckManager.getReadLevel() - maxReadLevel := c.taskWriter.GetMaxReadLevel() + readLevel := tr.tlMgr.taskAckManager.getReadLevel() + maxReadLevel := tr.tlMgr.taskWriter.GetMaxReadLevel() // counter i is used to break and let caller check whether tasklist is still alive and need resume read. for i := 0; i < 10 && readLevel < maxReadLevel; i++ { - upper := readLevel + c.config.RangeSize + upper := readLevel + tr.tlMgr.config.RangeSize if upper > maxReadLevel { upper = maxReadLevel } - tasks, err := c.getTaskBatchWithRange(readLevel, upper) + tasks, err := tr.getTaskBatchWithRange(readLevel, upper) if err != nil { return nil, readLevel, true, err } @@ -175,49 +219,65 @@ func (c *taskListManagerImpl) getTaskBatch() ([]*persistence.TaskInfo, int64, bo return tasks, readLevel, readLevel == maxReadLevel, nil // caller will update readLevel when no task grabbed } -func (c *taskListManagerImpl) isTaskExpired(t *persistence.TaskInfo, now time.Time) bool { +func (tr *taskReader) isTaskExpired(t *persistence.TaskInfo, now time.Time) bool { return t.Expiry.After(epochStartTime) && time.Now().After(t.Expiry) } -func (c *taskListManagerImpl) isIdle(lastWriteTime time.Time) bool { - return !c.isTaskAddedRecently(lastWriteTime) && len(c.GetAllPollerInfo()) == 0 +func (tr *taskReader) isIdle(lastWriteTime time.Time) bool { + return !tr.isTaskAddedRecently(lastWriteTime) && len(tr.tlMgr.GetAllPollerInfo()) == 0 } -func (c *taskListManagerImpl) handleIdleTimeout() { - c.persistAckLevel() - c.taskGC.RunNow(c.taskAckManager.getAckLevel()) - c.Stop() +func (tr *taskReader) handleIdleTimeout() { + tr.persistAckLevel() + tr.tlMgr.taskGC.RunNow(tr.tlMgr.taskAckManager.getAckLevel()) + tr.tlMgr.Stop() } -func (c *taskListManagerImpl) addTasksToBuffer( +func (tr *taskReader) addTasksToBuffer( tasks []*persistence.TaskInfo, lastWriteTime time.Time, idleTimer *time.Timer) bool { now := time.Now() for _, t := range tasks { - if c.isTaskExpired(t, now) { - c.domainScope().IncCounter(metrics.ExpiredTasksCounter) + if tr.isTaskExpired(t, now) { + tr.scope().IncCounter(metrics.ExpiredTasksCounter) continue } - if !c.addSingleTaskToBuffer(t, lastWriteTime, idleTimer) { + if !tr.addSingleTaskToBuffer(t, lastWriteTime, idleTimer) { return false // we are shutting down the task list } } return true } -func (c *taskListManagerImpl) addSingleTaskToBuffer( +func (tr *taskReader) addSingleTaskToBuffer( task *persistence.TaskInfo, lastWriteTime time.Time, idleTimer *time.Timer) bool { - c.taskAckManager.addTask(task.TaskID) + tr.tlMgr.taskAckManager.addTask(task.TaskID) for { select { - case c.taskBuffer <- task: + case tr.taskBuffer <- task: return true case <-idleTimer.C: - if c.isIdle(lastWriteTime) { - c.handleIdleTimeout() + if tr.isIdle(lastWriteTime) { + tr.handleIdleTimeout() return false } - case <-c.shutdownCh: + case <-tr.tlMgr.shutdownCh: return false } } } + +func (tr *taskReader) persistAckLevel() error { + return tr.tlMgr.db.UpdateState(tr.tlMgr.taskAckManager.getAckLevel()) +} + +func (tr *taskReader) isTaskAddedRecently(lastAddTime time.Time) bool { + return time.Now().Sub(lastAddTime) <= tr.tlMgr.config.MaxTasklistIdleTime() +} + +func (tr *taskReader) logger() log.Logger { + return tr.tlMgr.logger +} + +func (tr *taskReader) scope() metrics.Scope { + return tr.tlMgr.domainScope() +}