Skip to content

Commit

Permalink
Add DescribeTaskList command to admin client (cadence-workflow#1542)
Browse files Browse the repository at this point in the history
A new flag includeTaskListStatus is introduced into DescribeTaskListRequest, 
when it's set, in addition to poller info, status information of the requested taskList 
will also be returned. For now, it includes ackLevel, readLevel, backlogCountHint 
and DB lease start/end TaskID.
  • Loading branch information
yycptt authored Mar 18, 2019
1 parent 643fea8 commit bf7fd3d
Show file tree
Hide file tree
Showing 13 changed files with 756 additions and 22 deletions.
4 changes: 2 additions & 2 deletions .gen/go/shared/idl.go

Large diffs are not rendered by default.

524 changes: 516 additions & 8 deletions .gen/go/shared/types.go

Large diffs are not rendered by default.

14 changes: 14 additions & 0 deletions idl/github.com/uber/cadence/shared.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -1230,10 +1230,24 @@ struct DescribeTaskListRequest {
10: optional string domain
20: optional TaskList taskList
30: optional TaskListType taskListType
40: optional bool includeTaskListStatus
}

struct DescribeTaskListResponse {
10: optional list<PollerInfo> pollers
20: optional TaskListStatus taskListStatus
}

struct TaskListStatus {
10: optional i64 (js.type = "Long") backlogCountHint
20: optional i64 (js.type = "Long") readLevel
30: optional i64 (js.type = "Long") ackLevel
40: optional TaskIDBlock taskIDBlock
}

struct TaskIDBlock {
10: optional i64 (js.type = "Long") startID
20: optional i64 (js.type = "Long") endID
}

//At least one of the parameters needs to be provided
Expand Down
1 change: 1 addition & 0 deletions service/frontend/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package frontend

import (
"fmt"

"github.com/uber/cadence/.gen/go/cadence/workflowserviceserver"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/messaging"
Expand Down
3 changes: 2 additions & 1 deletion service/frontend/workflowHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2713,7 +2713,8 @@ func (wh *WorkflowHandler) DescribeWorkflowExecution(ctx context.Context, reques
}

// DescribeTaskList returns information about the target tasklist, right now this API returns the
// pollers which polled this tasklist in last few minutes.
// pollers which polled this tasklist in last few minutes. If includeTaskListStatus field is true,
// it will also return status of tasklist's ackManager (readLevel, ackLevel, backlogCountHint and taskIDBlock).
func (wh *WorkflowHandler) DescribeTaskList(ctx context.Context, request *gen.DescribeTaskListRequest) (*gen.DescribeTaskListResponse, error) {

scope := metrics.FrontendDescribeTaskListScope
Expand Down
3 changes: 2 additions & 1 deletion service/matching/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,8 @@ func (h *Handler) CancelOutstandingPoll(ctx context.Context,
}

// DescribeTaskList returns information about the target tasklist, right now this API returns the
// pollers which polled this tasklist in last few minutes.
// pollers which polled this tasklist in last few minutes. If includeTaskListStatus field is true,
// it will also return status of tasklist's ackManager (readLevel, ackLevel, backlogCountHint and taskIDBlock).
func (h *Handler) DescribeTaskList(ctx context.Context, request *m.DescribeTaskListRequest) (*gen.DescribeTaskListResponse, error) {
scope := metrics.MatchingDescribeTaskListScope
sw := h.startRequestProfile("DescribeTaskList", scope)
Expand Down
9 changes: 1 addition & 8 deletions service/matching/matchingEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -570,14 +570,7 @@ func (e *matchingEngineImpl) DescribeTaskList(ctx context.Context, request *m.De
return nil, err
}

pollers := []*workflow.PollerInfo{}
for _, poller := range tlMgr.GetAllPollerInfo() {
pollers = append(pollers, &workflow.PollerInfo{
Identity: common.StringPtr(poller.identity),
LastAccessTime: common.Int64Ptr(poller.lastAccessTime.UnixNano()),
})
}
return &workflow.DescribeTaskListResponse{Pollers: pollers}, nil
return tlMgr.DescribeTaskList(request.DescRequest.GetIncludeTaskListStatus()), nil
}

// Loads a task from persistence and wraps it in a task context
Expand Down
6 changes: 4 additions & 2 deletions service/matching/matchingEngine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,14 +328,16 @@ func (s *matchingEngineSuite) PollForTasksEmptyResultTest(taskType int) {
descResp, err := s.matchingEngine.DescribeTaskList(s.callContext, &matching.DescribeTaskListRequest{
DomainUUID: common.StringPtr(domainID),
DescRequest: &workflow.DescribeTaskListRequest{
TaskList: taskList,
TaskListType: &taskListType,
TaskList: taskList,
TaskListType: &taskListType,
IncludeTaskListStatus: common.BoolPtr(false),
},
})
s.NoError(err)
s.Equal(1, len(descResp.Pollers))
s.Equal(identity, descResp.Pollers[0].GetIdentity())
s.NotEmpty(descResp.Pollers[0].GetLastAccessTime())
s.Nil(descResp.GetTaskListStatus())
}
s.EqualValues(1, s.taskManager.taskLists[*tlID].rangeID)
}
Expand Down
32 changes: 32 additions & 0 deletions service/matching/taskListManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ type (
SyncMatchQueryTask(ctx context.Context, queryTask *queryTaskInfo) error
CancelPoller(pollerID string)
GetAllPollerInfo() []*pollerInfo
DescribeTaskList(includeTaskListStatus bool) *s.DescribeTaskListResponse
String() string
}

Expand Down Expand Up @@ -542,6 +543,37 @@ func (c *taskListManagerImpl) GetAllPollerInfo() []*pollerInfo {
return c.pollerHistory.getAllPollerInfo()
}

// DescribeTaskList returns information about the target tasklist, right now this API returns the
// pollers which polled this tasklist in last few minutes and status of tasklist's ackManager
// (readLevel, ackLevel, backlogCountHint and taskIDBlock).
func (c *taskListManagerImpl) DescribeTaskList(includeTaskListStatus bool) *s.DescribeTaskListResponse {
pollers := []*s.PollerInfo{}
for _, poller := range c.GetAllPollerInfo() {
pollers = append(pollers, &s.PollerInfo{
Identity: common.StringPtr(poller.identity),
LastAccessTime: common.Int64Ptr(poller.lastAccessTime.UnixNano()),
})
}

response := &s.DescribeTaskListResponse{Pollers: pollers}
if !includeTaskListStatus {
return response
}

taskIDBlock := c.rangeIDToTaskIDBlock(c.db.RangeID())
response.TaskListStatus = &s.TaskListStatus{
ReadLevel: common.Int64Ptr(c.taskAckManager.getReadLevel()),
AckLevel: common.Int64Ptr(c.taskAckManager.getAckLevel()),
BacklogCountHint: common.Int64Ptr(c.taskAckManager.getBacklogCountHint()),
TaskIDBlock: &s.TaskIDBlock{
StartID: common.Int64Ptr(taskIDBlock.start),
EndID: common.Int64Ptr(taskIDBlock.end),
},
}

return response
}

// Tries to match task to a poller that is already waiting on getTask.
// When this method returns non nil response without error it is guaranteed that the task is started
// and sent to a poller. So it not necessary to persist it.
Expand Down
47 changes: 47 additions & 0 deletions service/matching/taskListManager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,53 @@ func TestIsTaskAddedRecently(t *testing.T) {
require.False(t, tlm.isTaskAddedRecently(time.Time{}))
}

func TestDescribeTaskList(t *testing.T) {
startTaskID := int64(1)
taskCount := int64(3)
PollerIdentity := "test-poll"

// Create taskList Manager and set taskList state
tlm := createTestTaskListManager()
tlm.db.rangeID = int64(1)
tlm.db.ackLevel = int64(0)
tlm.taskAckManager.setAckLevel(tlm.db.ackLevel)

for i := int64(0); i < taskCount; i++ {
tlm.taskAckManager.addTask(startTaskID + i)
}

includeTaskStatus := false
descResp := tlm.DescribeTaskList(includeTaskStatus)
require.Equal(t, 0, len(descResp.GetPollers()))
require.Nil(t, descResp.GetTaskListStatus())

includeTaskStatus = true
taskListStatus := tlm.DescribeTaskList(includeTaskStatus).GetTaskListStatus()
require.NotNil(t, taskListStatus)
require.Zero(t, taskListStatus.GetAckLevel())
require.Equal(t, taskCount, taskListStatus.GetReadLevel())
require.Equal(t, taskCount, taskListStatus.GetBacklogCountHint())
taskIDBlock := taskListStatus.GetTaskIDBlock()
require.Equal(t, int64(1), taskIDBlock.GetStartID())
require.Equal(t, tlm.config.RangeSize, taskIDBlock.GetEndID())

// Add a poller and complete all tasks
tlm.updatePollerInfo(pollerIdentity{identity: PollerIdentity})
for i := int64(0); i < taskCount; i++ {
tlm.taskAckManager.completeTask(startTaskID + i)
}

descResp = tlm.DescribeTaskList(includeTaskStatus)
require.Equal(t, 1, len(descResp.GetPollers()))
require.Equal(t, PollerIdentity, descResp.Pollers[0].GetIdentity())
require.NotEmpty(t, descResp.Pollers[0].GetLastAccessTime())

taskListStatus = descResp.GetTaskListStatus()
require.NotNil(t, taskListStatus)
require.Equal(t, taskCount, taskListStatus.GetAckLevel())
require.Zero(t, taskListStatus.GetBacklogCountHint())
}

func tlMgrStartWithoutNotifyEvent(tlm *taskListManagerImpl) {
// mimic tlm.Start() but avoid calling notifyEvent
tlm.startWG.Done()
Expand Down
24 changes: 24 additions & 0 deletions tools/cli/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -615,3 +615,27 @@ func newAdminElasticSearchCommands() []cli.Command {
},
}
}

func newAdminTaskListCommands() []cli.Command {
return []cli.Command{
{
Name: "describe",
Aliases: []string{"desc"},
Usage: "Describe pollers and status information of tasklist",
Flags: []cli.Flag{
cli.StringFlag{
Name: FlagTaskListWithAlias,
Usage: "TaskList description",
},
cli.StringFlag{
Name: FlagTaskListTypeWithAlias,
Value: "decision",
Usage: "Optional TaskList type [decision|activity]",
},
},
Action: func(c *cli.Context) {
AdminDescribeTaskList(c)
},
},
}
}
105 changes: 105 additions & 0 deletions tools/cli/adminTaskListCommands.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
// Copyright (c) 2017 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package cli

import (
"fmt"
"os"
"strconv"
"strings"

"github.com/olekukonko/tablewriter"
s "github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/common"
"github.com/urfave/cli"
)

// AdminDescribeTaskList displays poller and status information of task list.
func AdminDescribeTaskList(c *cli.Context) {
frontendClient := cFactory.ServerFrontendClient(c)
domain := getRequiredGlobalOption(c, FlagDomain)
taskList := getRequiredOption(c, FlagTaskList)
taskListType := s.TaskListTypeDecision
if strings.ToLower(c.String(FlagTaskListType)) == "activity" {
taskListType = s.TaskListTypeActivity
}

ctx, cancel := newContext()
defer cancel()
request := &s.DescribeTaskListRequest{
Domain: common.StringPtr(domain),
TaskList: &s.TaskList{Name: common.StringPtr(taskList)},
TaskListType: &taskListType,
IncludeTaskListStatus: common.BoolPtr(true),
}

response, err := frontendClient.DescribeTaskList(ctx, request)
if err != nil {
ErrorAndExit("Operation DescribeTaskList failed.", err)
}

taskListStatus := response.GetTaskListStatus()
if taskListStatus == nil {
ErrorAndExit(colorMagenta("No tasklist status information."), nil)
}
printTaskListStatus(taskListStatus)
fmt.Printf("\n")

pollers := response.Pollers
if len(pollers) == 0 {
ErrorAndExit(colorMagenta("No poller for tasklist: "+taskList), nil)
}
printPollerInfo(pollers, taskListType)
}

func printTaskListStatus(taskListStatus *s.TaskListStatus) {
taskIDBlock := taskListStatus.GetTaskIDBlock()

table := tablewriter.NewWriter(os.Stdout)
table.SetBorder(false)
table.SetColumnSeparator("|")
table.SetHeader([]string{"Read Level", "Ack Level", "Backlog", "Lease Start TaskID", "Lease End TaskID"})
table.SetHeaderLine(false)
table.SetHeaderColor(tableHeaderBlue, tableHeaderBlue, tableHeaderBlue, tableHeaderBlue, tableHeaderBlue)
table.Append([]string{strconv.FormatInt(taskListStatus.GetReadLevel(), 10),
strconv.FormatInt(taskListStatus.GetAckLevel(), 10),
strconv.FormatInt(taskListStatus.GetBacklogCountHint(), 10),
strconv.FormatInt(taskIDBlock.GetStartID(), 10),
strconv.FormatInt(taskIDBlock.GetEndID(), 10)})
table.Render()
}

func printPollerInfo(pollers []*s.PollerInfo, taskListType s.TaskListType) {
table := tablewriter.NewWriter(os.Stdout)
table.SetBorder(false)
table.SetColumnSeparator("|")
if taskListType == s.TaskListTypeActivity {
table.SetHeader([]string{"Activity Poller Identity", "Last Access Time"})
} else {
table.SetHeader([]string{"Decision Poller Identity", "Last Access Time"})
}
table.SetHeaderLine(false)
table.SetHeaderColor(tableHeaderBlue, tableHeaderBlue)
for _, poller := range pollers {
table.Append([]string{poller.GetIdentity(), convertTime(poller.GetLastAccessTime(), false)})
}
table.Render()
}
6 changes: 6 additions & 0 deletions tools/cli/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,12 @@ func NewCliApp() *cli.App {
Usage: "Run admin operation on ElasticSearch",
Subcommands: newAdminElasticSearchCommands(),
},
{
Name: "tasklist",
Aliases: []string{"tl"},
Usage: "Run admin operation on taskList",
Subcommands: newAdminTaskListCommands(),
},
},
},
}
Expand Down

0 comments on commit bf7fd3d

Please sign in to comment.