Skip to content

Commit

Permalink
matching: fix bug in ratelimiter, increase visibility (cadence-workfl…
Browse files Browse the repository at this point in the history
  • Loading branch information
venkat1109 authored Apr 12, 2019
1 parent e1eae28 commit f6bd09d
Show file tree
Hide file tree
Showing 9 changed files with 210 additions and 78 deletions.
4 changes: 2 additions & 2 deletions .gen/go/shared/idl.go

Large diffs are not rendered by default.

120 changes: 104 additions & 16 deletions .gen/go/shared/types.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions idl/github.com/uber/cadence/shared.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -1251,6 +1251,7 @@ struct TaskListStatus {
10: optional i64 (js.type = "Long") backlogCountHint
20: optional i64 (js.type = "Long") readLevel
30: optional i64 (js.type = "Long") ackLevel
35: optional double ratePerSecond
40: optional TaskIDBlock taskIDBlock
}

Expand Down Expand Up @@ -1294,6 +1295,7 @@ struct PollerInfo {
// Unix Nano
10: optional i64 (js.type = "Long") lastAccessTime
20: optional string identity
30: optional double ratePerSecond
}

struct RetryPolicy {
Expand Down
24 changes: 21 additions & 3 deletions service/matching/matchingEngine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -701,7 +701,7 @@ func (s *matchingEngineSuite) TestSyncMatchActivities() {
}

time.Sleep(20 * time.Millisecond) // So any buffer tasks from 0 rps get picked up
syncCtr := scope.Snapshot().Counters()["test.sync.throttle.count+operation=TaskListMgr"]
syncCtr := scope.Snapshot().Counters()["test.sync_throttle_count+domain=domainName,operation=TaskListMgr"]
s.Equal(1, int(syncCtr.Value())) // Check times zero rps is set = throttle counter
s.EqualValues(1, s.taskManager.getCreateTaskCount(tlID)) // Check times zero rps is set = Tasks stored in persistence
s.EqualValues(0, s.taskManager.getTaskCount(tlID))
Expand All @@ -711,6 +711,24 @@ func (s *matchingEngineSuite) TestSyncMatchActivities() {
}
// Due to conflicts some ids are skipped and more real ranges are used.
s.True(expectedRange <= s.taskManager.getTaskListManager(tlID).rangeID)

// check the poller information
tlType := workflow.TaskListTypeActivity
descResp, err := s.matchingEngine.DescribeTaskList(s.callContext, &matching.DescribeTaskListRequest{
DomainUUID: common.StringPtr(domainID),
DescRequest: &workflow.DescribeTaskListRequest{
TaskList: taskList,
TaskListType: &tlType,
IncludeTaskListStatus: common.BoolPtr(true),
},
})
s.NoError(err)
s.Equal(1, len(descResp.Pollers))
s.Equal(identity, descResp.Pollers[0].GetIdentity())
s.NotEmpty(descResp.Pollers[0].GetLastAccessTime())
s.Equal(_defaultTaskDispatchRPS, descResp.Pollers[0].GetRatePerSecond())
s.NotNil(descResp.GetTaskListStatus())
s.True(descResp.GetTaskListStatus().GetRatePerSecond() >= (_defaultTaskDispatchRPS - 1))
}

func (s *matchingEngineSuite) TestConcurrentPublishConsumeActivities() {
Expand Down Expand Up @@ -875,8 +893,8 @@ func (s *matchingEngineSuite) concurrentPublishConsumeActivities(
s.True(expectedRange <= s.taskManager.getTaskListManager(tlID).rangeID)
s.EqualValues(0, s.taskManager.getTaskCount(tlID))

syncCtr := scope.Snapshot().Counters()["test.sync.throttle.count+operation=TaskListMgr"]
bufCtr := scope.Snapshot().Counters()["test.buffer.throttle.count+operation=TaskListMgr"]
syncCtr := scope.Snapshot().Counters()["test.sync_throttle_count+domain=domainName,operation=TaskListMgr"]
bufCtr := scope.Snapshot().Counters()["test.buffer_throttle_count+domain=domainName,operation=TaskListMgr"]
total := int64(0)
if syncCtr != nil {
total += syncCtr.Value()
Expand Down
37 changes: 20 additions & 17 deletions service/matching/pollerHistory.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ package matching
import (
"time"

"github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/cache"
)

Expand All @@ -33,21 +35,16 @@ const (
)

type (
pollerIdentity struct {
identity string
// TODO add IP, T1396795
}
pollerIdentity string

pollerInfo struct {
identity string
// TODO add IP, T1396795
lastAccessTime time.Time
ratePerSecond float64
}
)

type pollerHistory struct {
// poller ID -> last access time
// pollers map[pollerID]time.Time
// poller ID -> pollerInfo
// pollers map[pollerID]pollerInfo
history cache.Cache
}

Expand All @@ -63,23 +60,29 @@ func newPollerHistory() *pollerHistory {
}
}

func (pollers *pollerHistory) updatePollerInfo(id pollerIdentity) {
pollers.history.Put(id, nil)
func (pollers *pollerHistory) updatePollerInfo(id pollerIdentity, ratePerSecond *float64) {
rps := _defaultTaskDispatchRPS
if ratePerSecond != nil {
rps = *ratePerSecond
}
pollers.history.Put(id, &pollerInfo{ratePerSecond: rps})
}

func (pollers *pollerHistory) getAllPollerInfo() []*pollerInfo {
result := []*pollerInfo{}
func (pollers *pollerHistory) getAllPollerInfo() []*shared.PollerInfo {
var result []*shared.PollerInfo

ite := pollers.history.Iterator()
defer ite.Close()
for ite.HasNext() {
entry := ite.Next()
key := entry.Key().(pollerIdentity)
value := entry.Value().(*pollerInfo)
// TODO add IP, T1396795
lastAccessTime := entry.CreateTime()
result = append(result, &pollerInfo{
identity: key.identity,
// TODO add IP, T1396795
lastAccessTime: lastAccessTime,
result = append(result, &shared.PollerInfo{
Identity: common.StringPtr(string(key)),
LastAccessTime: common.Int64Ptr(lastAccessTime.UnixNano()),
RatePerSecond: common.Float64Ptr(value.ratePerSecond),
})
}

Expand Down
Loading

0 comments on commit f6bd09d

Please sign in to comment.