Skip to content

Commit

Permalink
Adds further observability to matching and tasklist errors
Browse files Browse the repository at this point in the history
Debugging what was going on for a tasklist was quite difficult, so added a bunch of debug logging and some error handling improvements. This wraps a number of errors and adds logging and metrics.
  • Loading branch information
davidporter-id-au authored Dec 22, 2023
1 parent a031fe6 commit d4d205f
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 16 deletions.
25 changes: 20 additions & 5 deletions service/matching/matcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,13 @@ package matching
import (
"context"
"errors"
"fmt"
"time"

"golang.org/x/time/rate"

"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/common/quotas"
"github.com/uber/cadence/common/types"
Expand All @@ -36,6 +39,7 @@ import (
// Producers are usually rpc calls from history or taskReader
// that drains backlog from db. Consumers are the task list pollers
type TaskMatcher struct {
log log.Logger
// synchronous task channel to match producer/consumer for any isolation group
// tasks having no isolation requirement are added to this channel
// and pollers from all isolation groups read from this channel
Expand Down Expand Up @@ -67,14 +71,15 @@ var ErrTasklistThrottled = errors.New("tasklist limit exceeded")
// newTaskMatcher returns an task matcher instance. The returned instance can be
// used by task producers and consumers to find a match. Both sync matches and non-sync
// matches should use this implementation
func newTaskMatcher(config *taskListConfig, fwdr *Forwarder, scope metrics.Scope, isolationGroups []string) *TaskMatcher {
func newTaskMatcher(config *taskListConfig, fwdr *Forwarder, scope metrics.Scope, isolationGroups []string, log log.Logger) *TaskMatcher {
dPtr := _defaultTaskDispatchRPS
limiter := quotas.NewRateLimiter(&dPtr, _defaultTaskDispatchRPSTTL, config.MinTaskThrottlingBurstSize())
isolatedTaskC := make(map[string]chan *InternalTask)
for _, g := range isolationGroups {
isolatedTaskC[g] = make(chan *InternalTask)
}
return &TaskMatcher{
log: log,
limiter: limiter,
scope: scope,
fwdr: fwdr,
Expand Down Expand Up @@ -222,7 +227,7 @@ func (tm *TaskMatcher) OfferQuery(ctx context.Context, task *InternalTask) (*typ
// Returns error only when context is canceled, expired or the ratelimit is set to zero (allow nothing)
func (tm *TaskMatcher) MustOffer(ctx context.Context, task *InternalTask) error {
if _, err := tm.ratelimit(ctx); err != nil {
return err
return fmt.Errorf("rate limit error dispatching: %w", err)
}

// attempt a match with local poller first. When that
Expand All @@ -232,7 +237,7 @@ func (tm *TaskMatcher) MustOffer(ctx context.Context, task *InternalTask) error
case taskC <- task: // poller picked up the task
return nil
case <-ctx.Done():
return ctx.Err()
return fmt.Errorf("context done when trying to forward local task: %w", ctx.Err())
default:
}

Expand All @@ -246,6 +251,11 @@ forLoop:
err := tm.fwdr.ForwardTask(childCtx, task)
token.release("")
if err != nil {

tm.log.Debug("failed to forward task",
tag.Error(err),
tag.TaskID(task.event.TaskID),
)
// forwarder returns error only when the call is rate limited. To
// avoid a busy loop on such rate limiting events, we only attempt to make
// the next forwarded call after this childCtx expires. Till then, we block
Expand All @@ -257,7 +267,7 @@ forLoop:
case <-childCtx.Done():
case <-ctx.Done():
cancel()
return ctx.Err()
return fmt.Errorf("failed to dispatch after failing to forward task: %w", ctx.Err())
}
cancel()
continue forLoop
Expand All @@ -269,7 +279,7 @@ forLoop:
task.finish(nil)
return nil
case <-ctx.Done():
return ctx.Err()
return fmt.Errorf("failed to offer task: %w", ctx.Err())
}
}
}
Expand All @@ -291,6 +301,11 @@ func (tm *TaskMatcher) Poll(ctx context.Context, isolationGroup string) (*Intern
// there is no local poller available to pickup this task. Now block waiting
// either for a local poller or a forwarding token to be available. When a
// forwarding token becomes available, send this poll to a parent partition
tm.log.Debug("falling back to non-local polling",
tag.IsolationGroup(isolationGroup),
tag.Dynamic("isolated channel", len(isolatedTaskC)),
tag.Dynamic("fallback channel", len(tm.taskC)),
)
return tm.pollOrForward(ctx, isolationGroup, isolatedTaskC, tm.taskC, tm.queryTaskC)
}

Expand Down
5 changes: 3 additions & 2 deletions service/matching/matcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/cache"
"github.com/uber/cadence/common/dynamicconfig"
"github.com/uber/cadence/common/log/loggerimpl"
"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/types"
Expand Down Expand Up @@ -72,12 +73,12 @@ func (t *MatcherTestSuite) SetupTest() {
t.cfg = tlCfg
t.isolationGroups = []string{"dca1", "dca2"}
t.fwdr = newForwarder(&t.cfg.forwarderConfig, t.taskList, types.TaskListKindNormal, t.client, []string{"dca1", "dca2"})
t.matcher = newTaskMatcher(tlCfg, t.fwdr, metrics.NoopScope(metrics.Matching), []string{"dca1", "dca2"})
t.matcher = newTaskMatcher(tlCfg, t.fwdr, metrics.NoopScope(metrics.Matching), []string{"dca1", "dca2"}, loggerimpl.NewNopLogger())

rootTaskList := newTestTaskListID(t.taskList.domainID, t.taskList.Parent(20), persistence.TaskListTypeDecision)
rootTasklistCfg, err := newTaskListConfig(rootTaskList, cfg, t.newDomainCache())
t.NoError(err)
t.rootMatcher = newTaskMatcher(rootTasklistCfg, nil, metrics.NoopScope(metrics.Matching), []string{"dca1", "dca2"})
t.rootMatcher = newTaskMatcher(rootTasklistCfg, nil, metrics.NoopScope(metrics.Matching), []string{"dca1", "dca2"}, loggerimpl.NewNopLogger())
}

func (t *MatcherTestSuite) TearDownTest() {
Expand Down
29 changes: 23 additions & 6 deletions service/matching/matchingEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,7 @@ pollLoop:

taskList, err := newTaskListID(domainID, taskListName, persistence.TaskListTypeDecision)
if err != nil {
return nil, err
return nil, fmt.Errorf("couldn't create new decision tasklist %w", err)
}

// Add frontend generated pollerID to context so tasklistMgr can support cancellation of
Expand All @@ -449,16 +449,20 @@ pollLoop:
task, err := e.getTask(pollerCtx, taskList, nil, taskListKind)
if err != nil {
// TODO: Is empty poll the best reply for errPumpClosed?
if err == ErrNoTasks || err == errPumpClosed {
if errors.Is(err, ErrNoTasks) || errors.Is(err, errPumpClosed) {
e.logger.Debug("no decision tasks",
tag.WorkflowTaskListName(taskListName),
tag.WorkflowDomainID(domainID),
tag.Error(err),
)
return emptyPollForDecisionTaskResponse, nil
}
return nil, err
return nil, fmt.Errorf("couldn't get task: %w", err)
}

e.emitForwardedFromStats(hCtx.scope, task.isForwarded(), req.GetForwardedFrom())

if task.isStarted() {
// tasks received from remote are already started. So, simply forward the response
return task.pollForDecisionResponse(), nil
// TODO: Maybe add history expose here?
}
Expand Down Expand Up @@ -510,9 +514,17 @@ pollLoop:
tag.WorkflowTaskListName(taskListName),
tag.WorkflowScheduleID(task.event.ScheduleID),
tag.TaskID(task.event.TaskID),
tag.Error(err),
)
task.finish(nil)
default:
e.emitInfoOrDebugLog(
task.event.DomainID,
"unknown error recording task started",
tag.WorkflowDomainID(domainID),
tag.Error(err),
tag.WorkflowTaskListName(taskListName),
)
task.finish(err)
}

Expand Down Expand Up @@ -564,9 +576,14 @@ pollLoop:
task, err := e.getTask(pollerCtx, taskList, maxDispatch, taskListKind)
if err != nil {
// TODO: Is empty poll the best reply for errPumpClosed?
if err == ErrNoTasks || err == errPumpClosed {
if errors.Is(err, ErrNoTasks) || errors.Is(err, errPumpClosed) {
return emptyPollForActivityTaskResponse, nil
}
e.logger.Error("Received unexpected err while getting task",
tag.WorkflowTaskListName(taskListName),
tag.WorkflowDomainID(domainID),
tag.Error(err),
)
return nil, err
}

Expand Down Expand Up @@ -884,7 +901,7 @@ func (e *matchingEngineImpl) getAllPartitions(
func (e *matchingEngineImpl) getTask(ctx context.Context, taskList *taskListID, maxDispatchPerSecond *float64, taskListKind *types.TaskListKind) (*InternalTask, error) {
tlMgr, err := e.getTaskListManager(taskList, taskListKind)
if err != nil {
return nil, err
return nil, fmt.Errorf("couldn't load tasklist namanger: %w", err)
}
return tlMgr.GetTask(ctx, maxDispatchPerSecond)
}
Expand Down
6 changes: 3 additions & 3 deletions service/matching/taskListManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ func newTaskListManager(
if tlMgr.isFowardingAllowed(taskList, *taskListKind) {
fwdr = newForwarder(&taskListConfig.forwarderConfig, taskList, *taskListKind, e.matchingClient, isolationGroups)
}
tlMgr.matcher = newTaskMatcher(taskListConfig, fwdr, tlMgr.scope, isolationGroups)
tlMgr.matcher = newTaskMatcher(taskListConfig, fwdr, tlMgr.scope, isolationGroups, tlMgr.logger)
tlMgr.taskWriter = newTaskWriter(tlMgr)
tlMgr.taskReader = newTaskReader(tlMgr, isolationGroups)
tlMgr.startWG.Add(1)
Expand Down Expand Up @@ -350,7 +350,7 @@ func (c *taskListManagerImpl) GetTask(
c.liveness.markAlive(time.Now())
task, err := c.getTask(ctx, maxDispatchPerSecond)
if err != nil {
return nil, err
return nil, fmt.Errorf("couldn't get task: %w", err)
}
task.domainName = c.domainName
task.backlogCountHint = c.taskAckManager.GetBacklogCount()
Expand Down Expand Up @@ -391,7 +391,7 @@ func (c *taskListManagerImpl) getTask(ctx context.Context, maxDispatchPerSecond

domainEntry, err := c.domainCache.GetDomainByID(c.taskListID.domainID)
if err != nil {
return nil, err
return nil, fmt.Errorf("unable to fetch domain from cache: %w", err)
}

// the desired global rate limit for the task list comes from the
Expand Down
1 change: 1 addition & 0 deletions service/matching/taskReader.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,7 @@ func (tr *taskReader) dispatchSingleTaskFromBuffer(isolationGroup string, taskIn
tag.WorkflowRunID(taskInfo.RunID),
tag.WorkflowID(taskInfo.WorkflowID),
tag.TaskID(taskInfo.TaskID),
tag.Error(err),
tag.WorkflowDomainID(taskInfo.DomainID),
)
tr.scope.IncCounter(metrics.AsyncMatchDispatchTimeoutCounterPerTaskList)
Expand Down
3 changes: 3 additions & 0 deletions service/matching/taskWriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,9 @@ writerLoop:

taskIDs, err := w.allocTaskIDs(batchSize)
if err != nil {
w.logger.Error("error allocating task ids",
tag.Error(err),
)
w.sendWriteResponse(reqs, err, nil)
continue writerLoop
}
Expand Down

0 comments on commit d4d205f

Please sign in to comment.