Skip to content

Commit

Permalink
Do not blindly retry timer task read (temporalio#2500)
Browse files Browse the repository at this point in the history
* Do not blindly retry timer task read, rely on timer controller rate limiter instead
  • Loading branch information
wxing1292 authored Feb 13, 2022
1 parent b99986f commit be1f908
Show file tree
Hide file tree
Showing 3 changed files with 2 additions and 39 deletions.
22 changes: 0 additions & 22 deletions common/persistence/nosql/nosqlplugin/cassandra/gocql/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,28 +63,6 @@ func ConvertError(
}
}

func IsTimeoutError(err error) bool {
if err == context.DeadlineExceeded {
return true
}
if err == gocql.ErrTimeoutNoResponse {
return true
}
if err == gocql.ErrConnectionClosed {
return true
}
_, ok := err.(*gocql.RequestErrWriteTimeout)
return ok
}

func IsNotFoundError(err error) bool {
return err == gocql.ErrNotFound
}

func IsThrottlingError(err error) bool {
if req, ok := err.(gocql.RequestError); ok {
// gocql does not expose the constant errOverloaded = 0x1001
return req.Code() == 0x1001
}
return false
}
17 changes: 2 additions & 15 deletions service/history/timerQueueAckMgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (

"go.temporal.io/server/common/backoff"
"go.temporal.io/server/service/history/configs"
"go.temporal.io/server/service/history/consts"
"go.temporal.io/server/service/history/shard"
"go.temporal.io/server/service/history/tasks"

Expand All @@ -44,8 +43,6 @@ import (

var (
maximumTime = time.Unix(0, math.MaxInt64).UTC()

timerRetryPolicy = createTimerRetryPolicy()
)

const (
Expand Down Expand Up @@ -415,19 +412,9 @@ func (t *timerQueueAckMgrImpl) getTimerTasks(minTimestamp time.Time, maxTimestam
BatchSize: batchSize,
NextPageToken: pageToken,
}

var response *persistence.GetTimerTasksResponse
var err error
op := func() error {
response, err = t.executionMgr.GetTimerTasks(request)
return err
}

err = backoff.Retry(op, timerRetryPolicy, func(err error) bool {
return true
})
response, err := t.executionMgr.GetTimerTasks(request)
if err != nil {
return nil, nil, consts.ErrMaxAttemptsExceeded
return nil, nil, err
}
return response.Tasks, response.NextPageToken, nil
}
Expand Down
2 changes: 0 additions & 2 deletions service/history/transferQueueProcessorBase.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,9 @@ func (t *transferQueueProcessorBase) readTasks(
MaxReadLevel: t.maxReadAckLevel(),
BatchSize: t.options.BatchSize(),
})

if err != nil {
return nil, false, err
}

return response.Tasks, len(response.NextPageToken) != 0, nil
}

Expand Down

0 comments on commit be1f908

Please sign in to comment.