Skip to content

Commit

Permalink
scanner: first pass at background job that cleans up tasks/tasklists (c…
Browse files Browse the repository at this point in the history
  • Loading branch information
venkat1109 authored Apr 8, 2019
1 parent f793ca1 commit 22f6827
Show file tree
Hide file tree
Showing 24 changed files with 1,770 additions and 20 deletions.
24 changes: 23 additions & 1 deletion common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -782,6 +782,8 @@ const (
ArchiverArchivalWorkflowScope
// ArchiverClientScope is scope used by all metrics emitted by archiver.Client
ArchiverClientScope
// TaskListScavengerScope is scope used by all metrics emitted by worker.tasklist.Scavenger module
TaskListScavengerScope

NumWorkerScopes
)
Expand Down Expand Up @@ -1138,6 +1140,7 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{
ArchiverPumpScope: {operation: "ArchiverPump"},
ArchiverArchivalWorkflowScope: {operation: "ArchiverArchivalWorkflow"},
ArchiverClientScope: {operation: "ArchiverClient"},
TaskListScavengerScope: {operation: "tasklistscavenger"},
},
// Blobstore Scope Names
Blobstore: {
Expand Down Expand Up @@ -1378,7 +1381,16 @@ const (
ArchiverPumpedNotEqualHandledCount
ArchiverHandleAllRequestsLatency
ArchiverWorkflowStoppingCount

ArchiverClientSendSignalFailureCount
TaskProcessedCount
TaskDeletedCount
TaskListProcessedCount
TaskListDeletedCount
TaskListOutstandingCount
StartedCount
StoppedCount
ExecutorTasksDeferredCount
ExecutorTasksDroppedCount
NumWorkerMetrics
)

Expand Down Expand Up @@ -1593,6 +1605,16 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
ArchiverPumpedNotEqualHandledCount: {metricName: "archiver_pumped_not_equal_handled", oldMetricName: "archiver.pumped-not-equal-handled"},
ArchiverHandleAllRequestsLatency: {metricName: "archiver_handle_all_requests_latency", oldMetricName: "archiver.handle-all-requests-latency"},
ArchiverWorkflowStoppingCount: {metricName: "archiver_workflow_stopping", oldMetricName: "archiver.workflow-stopping"},
ArchiverClientSendSignalFailureCount: {metricName: "archiver_client_send_signal_error", oldMetricName: "archiver.client-send-signal-error"},
TaskProcessedCount: {metricName: "task_processed", metricType: Gauge},
TaskDeletedCount: {metricName: "task_deleted", metricType: Gauge},
TaskListProcessedCount: {metricName: "tasklist_processed", metricType: Gauge},
TaskListDeletedCount: {metricName: "tasklist_deleted", metricType: Gauge},
TaskListOutstandingCount: {metricName: "tasklist_outstanding", metricType: Gauge},
StartedCount: {metricName: "started", metricType: Counter},
StoppedCount: {metricName: "stopped", metricType: Counter},
ExecutorTasksDeferredCount: {metricName: "executor_deferred", metricType: Counter},
ExecutorTasksDroppedCount: {metricName: "executor_dropped", metricType: Counter},
},
}

Expand Down
62 changes: 60 additions & 2 deletions common/mocks/TaskManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,64 @@ func (_m *TaskManager) CompleteTask(request *persistence.CompleteTaskRequest) er
return r0
}

// CompleteTasksLessThan
func (_m *TaskManager) CompleteTasksLessThan(request *persistence.CompleteTasksLessThanRequest) (int, error) {
ret := _m.Called(request)

var r0 int
if rf, ok := ret.Get(0).(func(*persistence.CompleteTasksLessThanRequest) int); ok {
r0 = rf(request)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(int)
}
}

var r1 error
if rf, ok := ret.Get(1).(func(*persistence.CompleteTasksLessThanRequest) error); ok {
r1 = rf(request)
} else {
r1 = ret.Error(1)
}

return r0, r1
}

func (_m *TaskManager) ListTaskList(request *persistence.ListTaskListRequest) (*persistence.ListTaskListResponse, error) {
ret := _m.Called(request)

var r0 *persistence.ListTaskListResponse
if rf, ok := ret.Get(0).(func(request *persistence.ListTaskListRequest) *persistence.ListTaskListResponse); ok {
r0 = rf(request)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*persistence.ListTaskListResponse)
}
}

var r1 error
if rf, ok := ret.Get(1).(func(*persistence.ListTaskListRequest) error); ok {
r1 = rf(request)
} else {
r1 = ret.Error(1)
}

return r0, r1
}

func (_m *TaskManager) DeleteTaskList(request *persistence.DeleteTaskListRequest) error {
ret := _m.Called(request)

var r0 error
if rf, ok := ret.Get(0).(func(*persistence.DeleteTaskListRequest) error); ok {
r0 = rf(request)
} else {
r0 = ret.Error(0)
}

return r0
}

// CreateTasks provides a mock function with given fields: request
func (_m *TaskManager) CreateTasks(request *persistence.CreateTasksRequest) (*persistence.CreateTasksResponse, error) {
ret := _m.Called(request)
Expand Down Expand Up @@ -133,8 +191,8 @@ func (_m *TaskManager) GetTasks(request *persistence.GetTasksRequest) (*persiste
ret := _m.Called(request)

var r0 *persistence.GetTasksResponse
if rf, ok := ret.Get(0).(func(*persistence.GetTasksRequest) (*persistence.GetTasksResponse, error)); ok {
return rf(request)
if rf, ok := ret.Get(0).(func(*persistence.GetTasksRequest) *persistence.GetTasksResponse); ok {
r0 = rf(request)
} else if ret.Get(0) != nil {
r0 = ret.Get(0).(*persistence.GetTasksResponse)
}
Expand Down
9 changes: 7 additions & 2 deletions common/persistence/cassandra/cassandraPersistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -2853,7 +2853,12 @@ func (d *cassandraPersistence) CreateTasks(request *p.CreateTasksRequest) (*p.Cr

// From TaskManager interface
func (d *cassandraPersistence) GetTasks(request *p.GetTasksRequest) (*p.GetTasksResponse, error) {
if request.ReadLevel > request.MaxReadLevel {
if request.MaxReadLevel == nil {
return nil, &workflow.InternalServiceError{
Message: "getTasks: both readLevel and maxReadLevel MUST be specified for cassandra persistence",
}
}
if request.ReadLevel > *request.MaxReadLevel {
return &p.GetTasksResponse{}, nil
}

Expand All @@ -2864,7 +2869,7 @@ func (d *cassandraPersistence) GetTasks(request *p.GetTasksRequest) (*p.GetTasks
request.TaskType,
rowTypeTask,
request.ReadLevel,
request.MaxReadLevel,
*request.MaxReadLevel,
).PageSize(request.BatchSize)

iter := query.Iter()
Expand Down
4 changes: 2 additions & 2 deletions common/persistence/dataInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -939,8 +939,8 @@ type (
DomainID string
TaskList string
TaskType int
ReadLevel int64
MaxReadLevel int64 // inclusive
ReadLevel int64 // range exclusive
MaxReadLevel *int64 // optional: range inclusive when specified
BatchSize int
}

Expand Down
49 changes: 49 additions & 0 deletions common/persistence/persistence-tests/matchingPersistenceTest.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,55 @@ func (s *MatchingPersistenceSuite) TestGetDecisionTasks() {
s.Equal(int64(5), tasks1Response.Tasks[0].ScheduleID)
}

// TestGetTasksWithNoMaxReadLevel test
func (s *MatchingPersistenceSuite) TestGetTasksWithNoMaxReadLevel() {
if s.TaskMgr.GetName() == "cassandra" {
s.T().Skip("this test is not applicable for cassandra persistence")
}
domainID := "f1116985-d1f1-40e0-aba9-83344db915bc"
workflowExecution := gen.WorkflowExecution{WorkflowId: common.StringPtr("complete-decision-task-test"),
RunId: common.StringPtr("2aa0a74e-16ee-4f27-983d-48b07ec1915d")}
taskList := "48b07ec1915d"
_, err0 := s.CreateActivityTasks(domainID, workflowExecution, map[int64]string{
10: taskList,
20: taskList,
30: taskList,
40: taskList,
50: taskList,
})
s.NoError(err0)

nTasks := 5
firstTaskID := s.GetNextSequenceNumber() - int64(nTasks)

testCases := []struct {
batchSz int
readLevel int64
taskIDs []int64
}{
{1, -1, []int64{firstTaskID}},
{2, firstTaskID, []int64{firstTaskID + 1, firstTaskID + 2}},
{5, firstTaskID + 2, []int64{firstTaskID + 3, firstTaskID + 4}},
}

for _, tc := range testCases {
s.Run(fmt.Sprintf("tc_%v_%v", tc.batchSz, tc.readLevel), func() {
response, err := s.TaskMgr.GetTasks(&p.GetTasksRequest{
DomainID: domainID,
TaskList: taskList,
TaskType: p.TaskListTypeActivity,
BatchSize: tc.batchSz,
ReadLevel: tc.readLevel,
})
s.NoError(err)
s.Equal(len(tc.taskIDs), len(response.Tasks), "wrong number of tasks")
for i := range tc.taskIDs {
s.Equal(tc.taskIDs[i], response.Tasks[i].TaskID, "wrong set of tasks")
}
})
}
}

// TestCompleteDecisionTask test
func (s *MatchingPersistenceSuite) TestCompleteDecisionTask() {
domainID := "f1116985-d1f1-40e0-aba9-83344db915bc"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1123,7 +1123,7 @@ func (s *TestBase) GetTasks(domainID, taskList string, taskType int, batchSize i
TaskList: taskList,
TaskType: taskType,
BatchSize: batchSize,
MaxReadLevel: math.MaxInt64,
MaxReadLevel: common.Int64Ptr(math.MaxInt64),
})

if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion common/persistence/sql/sqlTaskManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ func (m *sqlTaskManager) GetTasks(request *persistence.GetTasksRequest) (*persis
TaskListName: request.TaskList,
TaskType: int64(request.TaskType),
MinTaskID: &request.ReadLevel,
MaxTaskID: &request.MaxReadLevel,
MaxTaskID: request.MaxReadLevel,
PageSize: &request.BatchSize,
})
if err != nil {
Expand Down
19 changes: 15 additions & 4 deletions common/persistence/sql/storage/mysql/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,14 @@ task_type = :task_type
lockTaskListQry = `SELECT range_id FROM task_lists ` +
`WHERE shard_id = ? AND domain_id = ? AND name = ? AND task_type = ? FOR UPDATE`

getTaskQry = `SELECT workflow_id, run_id, schedule_id, task_id, expiry_ts ` +
getTaskMinMaxQry = `SELECT workflow_id, run_id, schedule_id, task_id, expiry_ts ` +
`FROM tasks ` +
`WHERE domain_id = ? AND task_list_name = ? AND task_type = ? AND task_id > ? AND task_id <= ? LIMIT ?`
`WHERE domain_id = ? AND task_list_name = ? AND task_type = ? AND task_id > ? AND task_id <= ? ` +
` ORDER BY task_id LIMIT ?`

getTaskMinQry = `SELECT workflow_id, run_id, schedule_id, task_id, expiry_ts ` +
`FROM tasks ` +
`WHERE domain_id = ? AND task_list_name = ? AND task_type = ? AND task_id > ? ORDER BY task_id LIMIT ?`

createTaskQry = `INSERT INTO ` +
`tasks(domain_id, workflow_id, run_id, schedule_id, task_list_name, task_type, task_id, expiry_ts) ` +
Expand All @@ -92,8 +97,14 @@ func (mdb *DB) InsertIntoTasks(rows []sqldb.TasksRow) (sql.Result, error) {
func (mdb *DB) SelectFromTasks(filter *sqldb.TasksFilter) ([]sqldb.TasksRow, error) {
var err error
var rows []sqldb.TasksRow
err = mdb.conn.Select(&rows, getTaskQry, filter.DomainID,
filter.TaskListName, filter.TaskType, *filter.MinTaskID, *filter.MaxTaskID, *filter.PageSize)
switch {
case filter.MaxTaskID != nil:
err = mdb.conn.Select(&rows, getTaskMinMaxQry, filter.DomainID,
filter.TaskListName, filter.TaskType, *filter.MinTaskID, *filter.MaxTaskID, *filter.PageSize)
default:
err = mdb.conn.Select(&rows, getTaskMinQry, filter.DomainID,
filter.TaskListName, filter.TaskType, *filter.MinTaskID, *filter.PageSize)
}
if err != nil {
return nil, err
}
Expand Down
3 changes: 3 additions & 0 deletions common/service/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ var keys = map[Key]string{
WorkerArchivalsPerIteration: "worker.ArchivalsPerIteration",
WorkerDeterministicConstructionCheckProbability: "worker.DeterministicConstructionCheckProbability",
WorkerThrottledLogRPS: "worker.throttledLogRPS",
ScannerPersistenceMaxQPS: "worker.scannerPersistenceMaxQPS",
}

const (
Expand Down Expand Up @@ -464,6 +465,8 @@ const (
WorkerDeterministicConstructionCheckProbability
// WorkerThrottledLogRPS is the rate limit on number of log messages emitted per second for throttled logger
WorkerThrottledLogRPS
// ScannerPersistenceMaxQPS is the maximum rate of persistence calls from worker.Scanner
ScannerPersistenceMaxQPS

// lastKeyForTest must be the last one in this const group for testing purpose
lastKeyForTest
Expand Down
4 changes: 2 additions & 2 deletions host/onebox.go
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,7 @@ func (c *cadenceImpl) startWorker(rpHosts []string, startWG *sync.WaitGroup) {

func (c *cadenceImpl) startWorkerReplicator(params *service.BootstrapParams, service service.Service, domainCache cache.DomainCache) {
metadataManager := persistence.NewMetadataPersistenceMetricsClient(c.metadataMgrV2, service.GetMetricsClient(), c.logger)
workerConfig := worker.NewConfig(dynamicconfig.NewNopCollection())
workerConfig := worker.NewConfig(params)
workerConfig.ReplicationCfg.ReplicatorMessageConcurrency = dynamicconfig.GetIntPropertyFn(10)
c.replicator = replicator.NewReplicator(
c.clusterMetadata,
Expand All @@ -505,7 +505,7 @@ func (c *cadenceImpl) startWorkerClientWorker(params *service.BootstrapParams, s
blobstore.NewMetricClient(c.blobstoreClient, service.GetMetricsClient()),
c.blobstoreClient.GetRetryPolicy(),
c.blobstoreClient.IsRetryableError)
workerConfig := worker.NewConfig(dynamicconfig.NewNopCollection())
workerConfig := worker.NewConfig(params)
workerConfig.ArchiverConfig.ArchiverConcurrency = dynamicconfig.GetIntPropertyFn(10)
bc := &archiver.BootstrapContainer{
PublicClient: publicClient,
Expand Down
4 changes: 2 additions & 2 deletions service/matching/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,8 @@ func (db *taskListDB) GetTasks(minTaskID int64, maxTaskID int64, batchSize int)
TaskList: db.taskListName,
TaskType: db.taskType,
BatchSize: batchSize,
ReadLevel: minTaskID, // exclusive
MaxReadLevel: maxTaskID, // inclusive
ReadLevel: minTaskID, // exclusive
MaxReadLevel: &maxTaskID, // inclusive
})
}

Expand Down
2 changes: 1 addition & 1 deletion service/matching/matchingEngine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1867,7 +1867,7 @@ func (m *testTaskManager) GetTasks(request *persistence.GetTasksRequest) (*persi
if taskID <= request.ReadLevel {
continue
}
if taskID > request.MaxReadLevel {
if taskID > *request.MaxReadLevel {
break
}
tasks = append(tasks, it.Value().(*persistence.TaskInfo))
Expand Down
Loading

0 comments on commit 22f6827

Please sign in to comment.