Skip to content

Commit

Permalink
Shorten context timeout for pollForDecision/ActivityTask at matching …
Browse files Browse the repository at this point in the history
…side (cadence-workflow#1548)

* When frontend gets a pollForDecision/ActivityTask call, it will first check
if the timeout is too short. If so, return an error directly without calling
matching service. Otherwise, at matching side, the context timeout will be
shorten by a small amount to make sure EmptyTask can be returned to frontend
before the context timeout error generated by rpc stack.

* Perform the same check at matching handler.
  • Loading branch information
yycptt authored Mar 20, 2019
1 parent 3b8ac2d commit 326c6bf
Show file tree
Hide file tree
Showing 8 changed files with 139 additions and 21 deletions.
9 changes: 9 additions & 0 deletions common/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,3 +89,12 @@ const (
// SystemDomainName is domain name for all cadence system workflows
SystemDomainName = "cadence-system"
)

const (
// MinLongPollTimeout is the minimum context timeout for long poll API, below which
// the request won't be processed
MinLongPollTimeout = time.Second * 2
// CriticalLongPollTimeout is a threshold for the context timeout passed into long poll API,
// below which a warning will be logged
CriticalLongPollTimeout = time.Second * 20
)
1 change: 1 addition & 0 deletions common/logging/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ const (
TagESKey = "es-mapping-key"
TagESField = "es-field"
TagContextTimeout = "context-timeout"
TagHandlerName = "handler-name"

// workflow logging tag values
// TagWorkflowComponent Values
Expand Down
37 changes: 37 additions & 0 deletions common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ const (
var (
// ErrBlobSizeExceedsLimit is error for event blob size exceeds limit
ErrBlobSizeExceedsLimit = &workflow.BadRequestError{Message: "Blob data size exceeds limit."}
// ErrContextTimeoutTooShort is error for setting a very short context timeout when calling a long poll API
ErrContextTimeoutTooShort = &workflow.BadRequestError{Message: "Context timeout is too short."}
// ErrContextTimeoutNotSet is error for not setting a context timeout when calling a long poll API
ErrContextTimeoutNotSet = &workflow.BadRequestError{Message: "Context timeout is not set."}
)

// AwaitWaitGroup calls Wait on the given wait
Expand Down Expand Up @@ -444,3 +448,36 @@ func CheckEventBlobSizeLimit(actualSize, warnLimit, errorLimit int, domainID, wo
}
return nil
}

// ValidateLongPollContextTimeout check if the context timeout for a long poll handler is too short or below a normal value.
// If the timeout is not set or too short, it logs an error, and return ErrContextTimeoutNotSet or ErrContextTimeoutTooShort
// accordingly. If the timeout is only below a normal value, it just logs an info and return nil.
func ValidateLongPollContextTimeout(ctx context.Context, handlerName string, logger bark.Logger) error {
deadline, ok := ctx.Deadline()
if !ok {
err := ErrContextTimeoutNotSet
logger.WithFields(bark.Fields{
logging.TagHandlerName: handlerName,
logging.TagErr: err,
}).Error("Context timeout not set for long poll API.")
return err
}

timeout := deadline.Sub(time.Now())
if timeout < MinLongPollTimeout {
err := ErrContextTimeoutTooShort
logger.WithFields(bark.Fields{
logging.TagHandlerName: handlerName,
logging.TagContextTimeout: timeout,
logging.TagErr: err,
}).Error("Context timeout is too short for long poll API.")
return err
}
if timeout < CriticalLongPollTimeout {
logger.WithFields(bark.Fields{
logging.TagHandlerName: handlerName,
logging.TagContextTimeout: timeout,
}).Warn("Context timeout is lower than critical value for long poll API.")
}
return nil
}
8 changes: 8 additions & 0 deletions service/frontend/workflowHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -766,6 +766,10 @@ func (wh *WorkflowHandler) PollForActivityTask(
}

wh.Service.GetLogger().Debug("Received PollForActivityTask")
if err := common.ValidateLongPollContextTimeout(ctx, "PollForActivityTask", wh.Service.GetLogger()); err != nil {
return nil, wh.error(err, scope)
}

if pollRequest.Domain == nil || pollRequest.GetDomain() == "" {
return nil, wh.error(errDomainNotSet, scope)
}
Expand Down Expand Up @@ -839,6 +843,10 @@ func (wh *WorkflowHandler) PollForDecisionTask(
}

wh.Service.GetLogger().Debug("Received PollForDecisionTask")
if err := common.ValidateLongPollContextTimeout(ctx, "PollForDecisionTask", wh.Service.GetLogger()); err != nil {
return nil, wh.error(err, scope)
}

if pollRequest.Domain == nil || pollRequest.GetDomain() == "" {
return nil, wh.error(errDomainNotSet, scope)
}
Expand Down
29 changes: 28 additions & 1 deletion service/frontend/workflowHandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"context"
"encoding/json"
"errors"
"github.com/uber/cadence/common/cluster"
"log"
"os"
"testing"
Expand All @@ -43,6 +42,7 @@ import (
"github.com/uber/cadence/common/blobstore"
"github.com/uber/cadence/common/blobstore/blob"
"github.com/uber/cadence/common/cache"
"github.com/uber/cadence/common/cluster"
"github.com/uber/cadence/common/messaging"
"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/common/mocks"
Expand Down Expand Up @@ -254,6 +254,33 @@ func (s *workflowHandlerSuite) TestDisableListVisibilityByFilter() {
assert.Equal(s.T(), errNoPermission, err)
}

func (s *workflowHandlerSuite) TestPollForTask_Failed_ContextTimeoutTooShort() {
config := s.newConfig()
wh := s.getWorkflowHandler(config)
wh.metricsClient = wh.Service.GetMetricsClient()
wh.startWG.Done()

bgCtx := context.Background()
_, err := wh.PollForDecisionTask(bgCtx, &shared.PollForDecisionTaskRequest{})
assert.Error(s.T(), err)
assert.Equal(s.T(), common.ErrContextTimeoutNotSet, err)

_, err = wh.PollForActivityTask(bgCtx, &shared.PollForActivityTaskRequest{})
assert.Error(s.T(), err)
assert.Equal(s.T(), common.ErrContextTimeoutNotSet, err)

shortCtx, cancel := context.WithTimeout(bgCtx, common.MinLongPollTimeout-time.Millisecond)
defer cancel()

_, err = wh.PollForDecisionTask(shortCtx, &shared.PollForDecisionTaskRequest{})
assert.Error(s.T(), err)
assert.Equal(s.T(), common.ErrContextTimeoutTooShort, err)

_, err = wh.PollForActivityTask(shortCtx, &shared.PollForActivityTaskRequest{})
assert.Error(s.T(), err)
assert.Equal(s.T(), common.ErrContextTimeoutTooShort, err)
}

func (s *workflowHandlerSuite) TestStartWorkflowExecution_Failed_RequestIdNotSet() {
config := s.newConfig()
config.RPS = dc.GetIntPropertyFn(10)
Expand Down
8 changes: 8 additions & 0 deletions service/matching/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,10 @@ func (h *Handler) PollForActivityTask(ctx context.Context,
return nil, h.handleErr(errMatchingHostThrottle, scope)
}

if err := common.ValidateLongPollContextTimeout(ctx, "PollForActivityTask", h.Service.GetLogger()); err != nil {
return nil, h.handleErr(err, scope)
}

response, err := h.engine.PollForActivityTask(ctx, pollRequest)
return response, h.handleErr(err, scope)
}
Expand All @@ -178,6 +182,10 @@ func (h *Handler) PollForDecisionTask(ctx context.Context,
return nil, h.handleErr(errMatchingHostThrottle, scope)
}

if err := common.ValidateLongPollContextTimeout(ctx, "PollForDecisionTask", h.Service.GetLogger()); err != nil {
return nil, h.handleErr(err, scope)
}

response, err := h.engine.PollForDecisionTask(ctx, pollRequest)
return response, h.handleErr(err, scope)
}
Expand Down
33 changes: 27 additions & 6 deletions service/matching/matchingEngine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,11 +194,25 @@ func (s *matchingEngineSuite) TestAckManager() {
}

func (s *matchingEngineSuite) TestPollForActivityTasksEmptyResult() {
s.PollForTasksEmptyResultTest(persistence.TaskListTypeActivity)
s.PollForTasksEmptyResultTest(s.callContext, persistence.TaskListTypeActivity)
}

func (s *matchingEngineSuite) TestPollForDecisionTasksEmptyResult() {
s.PollForTasksEmptyResultTest(persistence.TaskListTypeDecision)
s.PollForTasksEmptyResultTest(s.callContext, persistence.TaskListTypeDecision)
}

func (s *matchingEngineSuite) TestPollForActivityTasksEmptyResultWithShortContext() {
shortContextTimeout := returnEmptyTaskTimeBudget + 10*time.Millisecond
callContext, cancel := context.WithTimeout(s.callContext, shortContextTimeout)
defer cancel()
s.PollForTasksEmptyResultTest(callContext, persistence.TaskListTypeActivity)
}

func (s *matchingEngineSuite) TestPollForDecisionTasksEmptyResultWithShortContext() {
shortContextTimeout := returnEmptyTaskTimeBudget + 10*time.Millisecond
callContext, cancel := context.WithTimeout(s.callContext, shortContextTimeout)
defer cancel()
s.PollForTasksEmptyResultTest(callContext, persistence.TaskListTypeDecision)
}

func (s *matchingEngineSuite) TestPollForDecisionTasks() {
Expand Down Expand Up @@ -285,9 +299,11 @@ func (s *matchingEngineSuite) PollForDecisionTasksResultTest() {
s.Equal(expectedResp, resp)
}

func (s *matchingEngineSuite) PollForTasksEmptyResultTest(taskType int) {
func (s *matchingEngineSuite) PollForTasksEmptyResultTest(callContext context.Context, taskType int) {
s.matchingEngine.config.RangeSize = 2 // to test that range is not updated without tasks
s.matchingEngine.config.LongPollExpirationInterval = dynamicconfig.GetDurationPropertyFnFilteredByTaskListInfo(10 * time.Millisecond)
if _, ok := callContext.Deadline(); !ok {
s.matchingEngine.config.LongPollExpirationInterval = dynamicconfig.GetDurationPropertyFnFilteredByTaskListInfo(10 * time.Millisecond)
}

domainID := "domainId"
tl := "makeToast"
Expand All @@ -301,7 +317,7 @@ func (s *matchingEngineSuite) PollForTasksEmptyResultTest(taskType int) {
const pollCount = 10
for i := 0; i < pollCount; i++ {
if taskType == persistence.TaskListTypeActivity {
pollResp, err := s.matchingEngine.PollForActivityTask(s.callContext, &matching.PollForActivityTaskRequest{
pollResp, err := s.matchingEngine.PollForActivityTask(callContext, &matching.PollForActivityTaskRequest{
DomainUUID: common.StringPtr(domainID),
PollRequest: &workflow.PollForActivityTaskRequest{
TaskList: taskList,
Expand All @@ -313,7 +329,7 @@ func (s *matchingEngineSuite) PollForTasksEmptyResultTest(taskType int) {

taskListType = workflow.TaskListTypeActivity
} else {
resp, err := s.matchingEngine.PollForDecisionTask(s.callContext, &matching.PollForDecisionTaskRequest{
resp, err := s.matchingEngine.PollForDecisionTask(callContext, &matching.PollForDecisionTaskRequest{
DomainUUID: common.StringPtr(domainID),
PollRequest: &workflow.PollForDecisionTaskRequest{
TaskList: taskList,
Expand All @@ -324,6 +340,11 @@ func (s *matchingEngineSuite) PollForTasksEmptyResultTest(taskType int) {

taskListType = workflow.TaskListTypeDecision
}
select {
case <-callContext.Done():
s.FailNow("Call context has expired.")
default:
}
// check the poller information
descResp, err := s.matchingEngine.DescribeTaskList(s.callContext, &matching.DescribeTaskListRequest{
DomainUUID: common.StringPtr(domainID),
Expand Down
35 changes: 21 additions & 14 deletions service/matching/taskListManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ import (

const (
done time.Duration = -1

// Time budget for empty task to propagate through the function stack and be returned to
// pollForActivityTask or pollForDecisionTask handler.
returnEmptyTaskTimeBudget time.Duration = time.Second
)

// NOTE: Is this good enough for stress tests?
Expand Down Expand Up @@ -406,24 +410,34 @@ func (c *taskListManagerImpl) completeTaskPoll(taskID int64) int64 {
// Loads task from taskBuffer (which is populated from persistence) or from sync match to add task call
func (c *taskListManagerImpl) getTask(ctx context.Context) (*getTaskResult, error) {
scope := metrics.MatchingTaskListMgrScope
timer := time.NewTimer(c.config.LongPollExpirationInterval())
defer timer.Stop()

pollerID, ok := ctx.Value(pollerIDKey).(string)
childCtx := ctx

childCtxTimeout := c.config.LongPollExpirationInterval()
if deadline, ok := ctx.Deadline(); ok {
// We need to set a shorter timeout than the original ctx; otherwise, by the time ctx deadline is
// reached, instead of emptyTask, context timeout error is returned to the frontend by the rpc stack,
// which counts against our SLO. By shortening the timeout by a very small amount, the emptyTask can be
// returned to the handler before a context timeout error is generated.
shortenedCtxTimeout := deadline.Sub(time.Now()) - returnEmptyTaskTimeBudget
if shortenedCtxTimeout < childCtxTimeout {
childCtxTimeout = shortenedCtxTimeout
}
}
// ChildCtx timeout will be the shorter of longPollExpirationInterval and shortened parent context timeout.
childCtx, cancel := context.WithTimeout(ctx, childCtxTimeout)
defer cancel()

if ok && pollerID != "" {
// Found pollerID on context, add it to the map to allow it to be canceled in
// response to CancelPoller call
var cancel context.CancelFunc
childCtx, cancel = context.WithCancel(ctx)
c.outstandingPollsLock.Lock()
c.outstandingPollsMap[pollerID] = cancel
c.outstandingPollsLock.Unlock()
defer func() {
c.outstandingPollsLock.Lock()
delete(c.outstandingPollsMap, pollerID)
c.outstandingPollsLock.Unlock()
cancel()
}()
}

Expand Down Expand Up @@ -457,16 +471,9 @@ func (c *taskListManagerImpl) getTask(ctx context.Context) (*getTaskResult, erro
}
c.metricsClient.IncCounter(scope, metrics.PollSuccessCounter)
return result, nil
case <-timer.C:
c.metricsClient.IncCounter(scope, metrics.PollTimeoutCounter)
return nil, ErrNoTasks
case <-childCtx.Done():
err := childCtx.Err()
if err == context.DeadlineExceeded || err == context.Canceled {
err = ErrNoTasks
}
c.metricsClient.IncCounter(scope, metrics.PollTimeoutCounter)
return nil, err
return nil, ErrNoTasks
}
}

Expand Down

0 comments on commit 326c6bf

Please sign in to comment.