Skip to content

Commit

Permalink
Add taskList throttling to allow users to limit activities executed p…
Browse files Browse the repository at this point in the history
…er second (cadence-workflow#432)
  • Loading branch information
madhuravi authored Jan 5, 2018
1 parent aa29ebb commit 58a3ad2
Show file tree
Hide file tree
Showing 6 changed files with 370 additions and 83 deletions.
6 changes: 4 additions & 2 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -591,12 +591,13 @@ const (
MatchingFailures = iota + NumCommonMetrics
PollSuccessCounter
PollTimeoutCounter
PollErrorsCounter
PollSuccessWithSyncCounter
LeaseRequestCounter
LeaseFailureCounter
ConditionFailedErrorCounter
RespondQueryTaskFailedCounter
SyncThrottleCounter
BufferThrottleCounter
)

// MetricDefs record the metrics for all services
Expand Down Expand Up @@ -673,12 +674,13 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
MatchingFailures: {metricName: "matching.errors", metricType: Counter},
PollSuccessCounter: {metricName: "poll.success"},
PollTimeoutCounter: {metricName: "poll.timeouts"},
PollErrorsCounter: {metricName: "poll.errors"},
PollSuccessWithSyncCounter: {metricName: "poll.success.sync"},
LeaseRequestCounter: {metricName: "lease.requests"},
LeaseFailureCounter: {metricName: "lease.failures"},
ConditionFailedErrorCounter: {metricName: "condition-failed-errors"},
RespondQueryTaskFailedCounter: {metricName: "respond-query-failed"},
SyncThrottleCounter: {metricName: "sync.throttle.count"},
BufferThrottleCounter: {metricName: "buffer.throttle.count"},
},
}

Expand Down
33 changes: 25 additions & 8 deletions service/matching/matchingEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,8 @@ func (e *matchingEngineImpl) Stop() {
}

func (e *matchingEngineImpl) getTaskLists(maxCount int) (lists []taskListManager) {
e.taskListsLock.Lock()
e.taskListsLock.RLock()
defer e.taskListsLock.RUnlock()
lists = make([]taskListManager, 0, len(e.taskLists))
count := 0
for _, tlMgr := range e.taskLists {
Expand All @@ -139,7 +140,6 @@ func (e *matchingEngineImpl) getTaskLists(maxCount int) (lists []taskListManager
break
}
}
e.taskListsLock.Unlock()
return
}

Expand All @@ -153,20 +153,24 @@ func (e *matchingEngineImpl) String() string {
return r
}

// Returns taskListManager for a task list. If not already cached gets new range from DB and if successful creates one.
// Returns taskListManager for a task list. If not already cached gets new range from DB and
// if successful creates one.
func (e *matchingEngineImpl) getTaskListManager(taskList *taskListID) (taskListManager, error) {
// The first check is an optimization so almost all requests will have a task list manager
// and return avoiding the write lock
e.taskListsLock.RLock()
if result, ok := e.taskLists[*taskList]; ok {
e.taskListsLock.RUnlock()
return result, nil
}
e.taskListsLock.RUnlock()
mgr := newTaskListManager(e, taskList, e.config)
// If it gets here, write lock and check again in case a task list is created between the two locks
e.taskListsLock.Lock()
if result, ok := e.taskLists[*taskList]; ok {
e.taskListsLock.Unlock()
return result, nil
}
mgr := newTaskListManager(e, taskList, e.config)
e.taskLists[*taskList] = mgr
e.taskListsLock.Unlock()
logging.LogTaskListLoadingEvent(e.logger, taskList.taskListName, taskList.taskType)
Expand All @@ -179,6 +183,13 @@ func (e *matchingEngineImpl) getTaskListManager(taskList *taskListID) (taskListM
return mgr, nil
}

// For use in tests
func (e *matchingEngineImpl) updateTaskList(taskList *taskListID, mgr taskListManager) {
e.taskListsLock.Lock()
defer e.taskListsLock.Unlock()
e.taskLists[*taskList] = mgr
}

func (e *matchingEngineImpl) removeTaskListManager(id *taskListID) {
e.taskListsLock.Lock()
defer e.taskListsLock.Unlock()
Expand Down Expand Up @@ -247,7 +258,7 @@ pollLoop:
// long-poll when frontend calls CancelOutstandingPoll API
pollerCtx := context.WithValue(ctx, pollerIDKey, pollerID)
taskList := newTaskListID(domainID, taskListName, persistence.TaskListTypeDecision)
tCtx, err := e.getTask(pollerCtx, taskList)
tCtx, err := e.getTask(pollerCtx, taskList, nil)
if err != nil {
// TODO: Is empty poll the best reply for errPumpClosed?
if err == ErrNoTasks || err == errPumpClosed {
Expand Down Expand Up @@ -341,10 +352,14 @@ pollLoop:
}

taskList := newTaskListID(domainID, taskListName, persistence.TaskListTypeActivity)
var maxDispatch *float64
if request.TaskListMetadata != nil {
maxDispatch = request.TaskListMetadata.MaxTasksPerSecond
}
// Add frontend generated pollerID to context so tasklistMgr can support cancellation of
// long-poll when frontend calls CancelOutstandingPoll API
pollerCtx := context.WithValue(ctx, pollerIDKey, pollerID)
tCtx, err := e.getTask(pollerCtx, taskList)
tCtx, err := e.getTask(pollerCtx, taskList, maxDispatch)
if err != nil {
// TODO: Is empty poll the best reply for errPumpClosed?
if err == ErrNoTasks || err == errPumpClosed {
Expand Down Expand Up @@ -450,12 +465,14 @@ func (e *matchingEngineImpl) CancelOutstandingPoll(ctx context.Context, request
}

// Loads a task from persistence and wraps it in a task context
func (e *matchingEngineImpl) getTask(ctx context.Context, taskList *taskListID) (*taskContext, error) {
func (e *matchingEngineImpl) getTask(
ctx context.Context, taskList *taskListID, maxDispatchPerSecond *float64,
) (*taskContext, error) {
tlMgr, err := e.getTaskListManager(taskList)
if err != nil {
return nil, err
}
return tlMgr.GetTaskContext(ctx)
return tlMgr.GetTaskContext(ctx, maxDispatchPerSecond)
}

func (e *matchingEngineImpl) unloadTaskList(id *taskListID) {
Expand Down
Loading

0 comments on commit 58a3ad2

Please sign in to comment.