Skip to content

Commit

Permalink
tasklistManager: support for range deletes / skipping over expired ta…
Browse files Browse the repository at this point in the history
  • Loading branch information
venkat1109 authored Mar 13, 2019
1 parent 4c32d3e commit 7dfce22
Show file tree
Hide file tree
Showing 16 changed files with 747 additions and 388 deletions.
1 change: 1 addition & 0 deletions common/logging/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ const (
// TagStoreOperation values
TagValueStoreOperationGetTasks = "get-tasks"
TagValueStoreOperationCompleteTask = "complete-task"
TagValueStoreOperationCompleteTasksLessThan = "complete-tasks-less-than"
TagValueStoreOperationCreateWorkflowExecution = "create-wf-execution"
TagValueStoreOperationGetWorkflowExecution = "get-wf-execution"
TagValueStoreOperationUpdateWorkflowExecution = "update-wf-execution"
Expand Down
2 changes: 2 additions & 0 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1283,6 +1283,7 @@ const (
SyncThrottleCounter
BufferThrottleCounter
SyncMatchLatency
ExpiredTasksCounter

NumMatchingMetrics
)
Expand Down Expand Up @@ -1487,6 +1488,7 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
RespondQueryTaskFailedCounter: {metricName: "respond-query-failed"},
SyncThrottleCounter: {metricName: "sync.throttle.count"},
BufferThrottleCounter: {metricName: "buffer.throttle.count"},
ExpiredTasksCounter: {metricName: "tasks.expired"},
SyncMatchLatency: {metricName: "syncmatch.latency", metricType: Timer},
},
Worker: {
Expand Down
1 change: 1 addition & 0 deletions common/persistence/dataInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,7 @@ type (
TaskID int64
ScheduleID int64
ScheduleToStartTimeout int32
Expiry time.Time
}

// Task is the generic interface for workflow tasks
Expand Down
31 changes: 22 additions & 9 deletions common/persistence/persistence-tests/matchingPersistenceTest.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,17 +84,30 @@ func (s *MatchingPersistenceSuite) TestCreateTask() {
s.NotEmpty(t, "Expected non empty task identifier.")
}

tasks2, err2 := s.CreateActivityTasks(domainID, workflowExecution, map[int64]string{
20: "a5b38106793a",
30: "a5b38106793b",
40: "a5b38106793c",
50: "a5b38106793d",
60: "a5b38106793e",
})
tasks := map[int64]string{
20: uuid.New(),
30: uuid.New(),
40: uuid.New(),
50: uuid.New(),
60: uuid.New(),
}
tasks2, err2 := s.CreateActivityTasks(domainID, workflowExecution, tasks)
s.NoError(err2)
s.Equal(5, len(tasks2), "expected single valid task identifier.")
for _, t := range tasks2 {
s.NotEmpty(t, "Expected non empty task identifier.")

for sid, tlName := range tasks {
resp, err := s.GetTasks(domainID, tlName, p.TaskListTypeActivity, 100)
s.NoError(err)
s.Equal(1, len(resp.Tasks))
s.Equal(domainID, resp.Tasks[0].DomainID)
s.Equal(*workflowExecution.WorkflowId, resp.Tasks[0].WorkflowID)
s.Equal(*workflowExecution.RunId, resp.Tasks[0].RunID)
s.Equal(sid, resp.Tasks[0].ScheduleID)
if s.TaskMgr.GetName() != "cassandra" {
// cassandra uses TTL and expiry isn't stored as part of task state
s.True(time.Now().Before(resp.Tasks[0].Expiry))
s.True(resp.Tasks[0].Expiry.Before(time.Now().Add((defaultScheduleToStartTimeout + 1) * time.Second)))
}
}
}

Expand Down
15 changes: 10 additions & 5 deletions common/persistence/persistence-tests/persistenceTestBase.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,10 @@ type (
}
)

const (
defaultScheduleToStartTimeout = 111
)

// NewTestBaseWithCassandra returns a persistence test base backed by cassandra datastore
func NewTestBaseWithCassandra(options *TestBaseOptions) TestBase {
if options.DBName == "" {
Expand Down Expand Up @@ -1079,11 +1083,12 @@ func (s *TestBase) CreateActivityTasks(domainID string, workflowExecution workfl
TaskID: taskID,
Execution: workflowExecution,
Data: &p.TaskInfo{
DomainID: domainID,
WorkflowID: *workflowExecution.WorkflowId,
RunID: *workflowExecution.RunId,
TaskID: taskID,
ScheduleID: activityScheduleID,
DomainID: domainID,
WorkflowID: *workflowExecution.WorkflowId,
RunID: *workflowExecution.RunId,
TaskID: taskID,
ScheduleID: activityScheduleID,
ScheduleToStartTimeout: defaultScheduleToStartTimeout,
},
},
}
Expand Down
1 change: 1 addition & 0 deletions common/persistence/sql/sqlTaskManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,7 @@ func (m *sqlTaskManager) GetTasks(request *persistence.GetTasksRequest) (*persis
RunID: v.RunID.String(),
TaskID: v.TaskID,
ScheduleID: v.ScheduleID,
Expiry: v.ExpiryTs,
}
}

Expand Down
2 changes: 1 addition & 1 deletion common/persistence/sql/storage/mysql/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ task_type = :task_type
lockTaskListQry = `SELECT range_id FROM task_lists ` +
`WHERE shard_id = ? AND domain_id = ? AND name = ? AND task_type = ? FOR UPDATE`

getTaskQry = `SELECT workflow_id, run_id, schedule_id, task_id ` +
getTaskQry = `SELECT workflow_id, run_id, schedule_id, task_id, expiry_ts ` +
`FROM tasks ` +
`WHERE domain_id = ? AND task_list_name = ? AND task_type = ? AND task_id > ? AND task_id <= ? LIMIT ?`

Expand Down
3 changes: 3 additions & 0 deletions common/service/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ var keys = map[Key]string{
MaxTasklistIdleTime: "matching.maxTasklistIdleTime",
MatchingOutstandingTaskAppendsThreshold: "matching.outstandingTaskAppendsThreshold",
MatchingMaxTaskBatchSize: "matching.maxTaskBatchSize",
MatchingMaxTaskDeleteBatchSize: "matching.maxTaskDeleteBatchSize",

// history settings
HistoryRPS: "history.rps",
Expand Down Expand Up @@ -265,6 +266,8 @@ const (
MatchingOutstandingTaskAppendsThreshold
// MatchingMaxTaskBatchSize is max batch size for task writer
MatchingMaxTaskBatchSize
// MatchingMaxTaskDeleteBatchSize is the max batch size for range deletion of tasks
MatchingMaxTaskDeleteBatchSize

// key for history

Expand Down
8 changes: 8 additions & 0 deletions common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,14 @@ func MinInt32(a, b int32) int32 {
return b
}

// MinInt returns the smaller of two given integers
func MinInt(a, b int) int {
if a < b {
return a
}
return b
}

// ValidateRetryPolicy validates a retry policy
func ValidateRetryPolicy(policy *workflow.RetryPolicy) error {
if policy == nil {
Expand Down
23 changes: 20 additions & 3 deletions service/matching/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@
package matching

import (
"fmt"
"sync"

"sync/atomic"

"github.com/uber-common/bark"
"github.com/uber/cadence/common/logging"
"github.com/uber/cadence/common/persistence"
)

Expand All @@ -38,6 +41,7 @@ type (
rangeID int64
ackLevel int64
store persistence.TaskManager
logger bark.Logger
}
taskListState struct {
rangeID int64
Expand All @@ -55,13 +59,14 @@ type (
// - To provide the guarantee that there is only writer who updates taskList in persistence at any given point in time
// This guarantee makes some of the other code simpler and there is no impact to perf because updates to tasklist are
// spread out and happen in background routines
func newTaskListDB(store persistence.TaskManager, domainID string, name string, taskType int, kind int) *taskListDB {
func newTaskListDB(store persistence.TaskManager, domainID string, name string, taskType int, kind int, logger bark.Logger) *taskListDB {
return &taskListDB{
domainID: domainID,
taskListName: name,
taskListKind: kind,
taskType: taskType,
store: store,
logger: logger,
}
}

Expand Down Expand Up @@ -143,25 +148,37 @@ func (db *taskListDB) GetTasks(minTaskID int64, maxTaskID int64, batchSize int)

// CompleteTask deletes a single task from this task list
func (db *taskListDB) CompleteTask(taskID int64) error {
return db.store.CompleteTask(&persistence.CompleteTaskRequest{
err := db.store.CompleteTask(&persistence.CompleteTaskRequest{
TaskList: &persistence.TaskListInfo{
DomainID: db.domainID,
Name: db.taskListName,
TaskType: db.taskType,
},
TaskID: taskID,
})
if err != nil {
logging.LogPersistantStoreErrorEvent(db.logger, logging.TagValueStoreOperationCompleteTask, err,
fmt.Sprintf("{taskID: %v, taskType: %v, taskList: %v}",
taskID, db.taskType, db.taskListName))
}
return err
}

// CompleteTasksLessThan deletes of tasks less than the given taskID. Limit is
// the upper bound of number of tasks that can be deleted by this method. It may
// or may not be honored
func (db *taskListDB) CompleteTasksLessThan(taskID int64, limit int) (int, error) {
return db.store.CompleteTasksLessThan(&persistence.CompleteTasksLessThanRequest{
n, err := db.store.CompleteTasksLessThan(&persistence.CompleteTasksLessThanRequest{
DomainID: db.domainID,
TaskListName: db.taskListName,
TaskType: db.taskType,
TaskID: taskID,
Limit: limit,
})
if err != nil {
logging.LogPersistantStoreErrorEvent(db.logger, logging.TagValueStoreOperationCompleteTasksLessThan, err,
fmt.Sprintf("{taskID: %v, taskType: %v, taskList: %v}",
taskID, db.taskType, db.taskListName))
}
return n, err
}
Loading

0 comments on commit 7dfce22

Please sign in to comment.