Skip to content

Commit

Permalink
First batch of timer changes.
Browse files Browse the repository at this point in the history
Summary:
Merge branch 'master' of code.uber.internal:devexp/minions into timer

Add schema and APIs to data layer.

Add timeout to decision context.

Add Timer Processor.

Timer Processor tests.

Reviewers: maxim, nx, tamer, samar

Reviewed By: samar

Differential Revision: https://code.uberinternal.com/D681427
  • Loading branch information
sivakku committed Jan 4, 2017
1 parent 86b7088 commit 4f7ff90
Show file tree
Hide file tree
Showing 23 changed files with 1,458 additions and 186 deletions.
5 changes: 3 additions & 2 deletions common/metrics/simplereporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,9 @@ const (
// NewSimpleReporter create an instance of Reporter which can be used for driver to emit metric to console
func NewSimpleReporter(scope tally.Scope, tags map[string]string, logger bark.Logger) Reporter {
reporter := &SimpleReporter{
scope: scope,
tags: make(map[string]string),
scope: scope,
tags: make(map[string]string),
logger: logger,
}

if tags != nil {
Expand Down
6 changes: 6 additions & 0 deletions common/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,9 @@ func AwaitWaitGroup(wg *sync.WaitGroup, timeout time.Duration) bool {
return false
}
}

// AddSecondsToBaseTime - Gets the UnixNano with given duration and base time.
func AddSecondsToBaseTime(baseTimeInNanoSec int64, durationInSeconds int64) int64 {
timeOut := time.Duration(durationInSeconds) * time.Second
return time.Unix(0, baseTimeInNanoSec).Add(timeOut).UnixNano()
}
168 changes: 157 additions & 11 deletions persistence/cassandraPersistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ const (
cassandraProtoVersion = 4
defaultSessionTimeout = 10 * time.Second
rowTypeExecutionTaskID = int64(77)
permanentWorkflowID = "dcb940ac-0c63-ffff-ffea-a6c305881d71"
permanentRunID = "dcb940ac-0c63-ffa2-ffea-a6c305881d71"
rowTypeShardWorkflowID = "3fe89dad-8326-fac5-fd40-fe08cfa25dec"
rowTypeShardRunID = "228ce20b-af54-fe2f-ff17-be728a00f785"
Expand Down Expand Up @@ -63,6 +64,15 @@ const (
`delivery_count: ?` +
`}`

templateTimerTaskType = `{` +
`workflow_id: ?, ` +
`run_id: ?, ` +
`task_id: ?, ` +
`type: ?, ` +
`timeoutType: ?, ` +
`event_id: ?` +
`}`

templateCreateShardQuery = `INSERT INTO executions (` +
`shard_id, type, workflow_id, run_id, task_id, shard, range_id)` +
`VALUES(?, ?, ?, ?, ?, ` + templateShardType + `, ?) IF NOT EXISTS`
Expand Down Expand Up @@ -96,6 +106,10 @@ const (
`shard_id, type, workflow_id, run_id, transfer, task_id, lock_token) ` +
`VALUES(?, ?, ?, ?, ` + templateTaskType + `, ?, ?)`

templateCreateTimerTaskQuery = `INSERT INTO executions (` +
`shard_id, type, workflow_id, run_id, timer, task_id, lock_token) ` +
`VALUES(?, ?, ?, ?, ` + templateTimerTaskType + `, ?, ?)`

templateUpdateLeaseQuery = `UPDATE executions ` +
`SET range_id = ? ` +
`WHERE shard_id = ? ` +
Expand Down Expand Up @@ -151,6 +165,22 @@ const (
`and task_id = ?` +
`IF lock_token = ?`

templateGetTimerTasksQuery = `SELECT timer ` +
`FROM executions ` +
`WHERE shard_id = ? ` +
`and type = ?` +
`and workflow_id = ?` +
`and run_id = ?` +
`and task_id >= ?` +
`and task_id < ?`

templateCompleteTimerTaskQuery = `DELETE FROM executions ` +
`WHERE shard_id = ? ` +
`and type = ? ` +
`and workflow_id = ?` +
`and run_id = ?` +
`and task_id = ?`

templateCreateTaskQuery = `INSERT INTO tasks (` +
`task_list, type, workflow_id, run_id, task_id, task, lock_token) ` +
`VALUES(?, ?, ?, ?, ?, ` + templateTaskType + `, ?)`
Expand Down Expand Up @@ -198,6 +228,7 @@ func NewCassandraWorkflowExecutionPersistence(hosts string, keyspace string, sha
if err != nil {
return nil, err
}

return &cassandraPersistence{shardID: shardID, session: session, lowConslevel: gocql.One}, nil
}

Expand Down Expand Up @@ -346,6 +377,7 @@ func (d *cassandraPersistence) CreateWorkflowExecution(request *CreateWorkflowEx

d.createTransferTasks(batch, request.TransferTasks, request.Execution.GetWorkflowId(), request.Execution.GetRunId(),
cqlNowTimestamp)
d.createTimerTasks(batch, request.TimerTasks, nil, request.Execution.GetWorkflowId(), request.Execution.GetRunId(), cqlNowTimestamp)

batch.Query(templateUpdateLeaseQuery,
request.RangeID,
Expand Down Expand Up @@ -433,6 +465,8 @@ func (d *cassandraPersistence) UpdateWorkflowExecution(request *UpdateWorkflowEx

d.createTransferTasks(batch, request.TransferTasks, executionInfo.WorkflowID, executionInfo.RunID, cqlNowTimestamp)

d.createTimerTasks(batch, request.TimerTasks, request.DeleteTimerTask, executionInfo.WorkflowID, executionInfo.RunID, cqlNowTimestamp)

previous := make(map[string]interface{})
applied, _, err := d.session.MapExecuteBatchCAS(batch, previous)
if err != nil {
Expand Down Expand Up @@ -598,8 +632,8 @@ func (d *cassandraPersistence) CompleteTransferTask(request *CompleteTransferTas
}

return &workflow.EntityNotExistsError{
Message: fmt.Sprintf("Task not found. WorkflowId: %v, RunId: %v, TaskId: %v", execution.GetWorkflowId(),
execution.GetRunId(), request.TaskID),
Message: fmt.Sprintf("Task not found. WorkflowId: %v, RunId: %v, TaskId: %v, TaskToken: %s", execution.GetWorkflowId(),
execution.GetRunId(), request.TaskID, request.LockToken),
}
}

Expand All @@ -611,16 +645,15 @@ func (d *cassandraPersistence) CreateTask(request *CreateTaskRequest) (*CreateTa
taskUUID := uuid.New()
lockToken := uuid.New()

var taskID int64
var taskList string
var scheduleID int64

switch request.Data.GetType() {
case TaskTypeActivity:
taskID = request.Data.(*ActivityTask).TaskID
taskList = request.Data.(*ActivityTask).TaskList
scheduleID = request.Data.(*ActivityTask).ScheduleID

case TaskTypeDecision:
taskID = request.Data.(*DecisionTask).TaskID
taskList = request.Data.(*DecisionTask).TaskList
scheduleID = request.Data.(*DecisionTask).ScheduleID
}
Expand All @@ -633,7 +666,7 @@ func (d *cassandraPersistence) CreateTask(request *CreateTaskRequest) (*CreateTa
taskUUID,
request.Execution.GetWorkflowId(),
request.Execution.GetRunId(),
taskID,
request.Data.GetTaskID(),
taskList,
request.Data.GetType(),
scheduleID,
Expand Down Expand Up @@ -763,19 +796,64 @@ func (d *cassandraPersistence) CompleteTask(request *CompleteTaskRequest) error
return nil
}

func (d *cassandraPersistence) GetTimerIndexTasks(request *GetTimerIndexTasksRequest) (*GetTimerIndexTasksResponse, error) {
query := d.session.Query(templateGetTimerTasksQuery,
d.shardID,
rowTypeTimerTask,
permanentWorkflowID,
permanentRunID,
request.MinKey,
request.MaxKey).Consistency(d.lowConslevel)

iter := query.Iter()
if iter == nil {
return nil, &workflow.InternalServiceError{
Message: "GetTimerTasks operation failed. Not able to create query iterator.",
}
}

response := &GetTimerIndexTasksResponse{}
task := make(map[string]interface{})
PopulateTasks:
for iter.MapScan(task) {
t := createTimerInfo(task["timer"].(map[string]interface{}))
// Reset task map to get it ready for next scan
task = make(map[string]interface{})
// Skip the task if it is not in the bounds.
if t.TaskID < request.MinKey {
continue
}
if t.TaskID >= request.MaxKey {
break PopulateTasks
}

response.Timers = append(response.Timers, t)
if len(response.Timers) == request.BatchSize {
break PopulateTasks
}
}

if err := iter.Close(); err != nil {
return nil, &workflow.InternalServiceError{
Message: fmt.Sprintf("GetTimerTasks operation failed. Error: %v", err),
}
}

return response, nil
}

func (d *cassandraPersistence) createTransferTasks(batch *gocql.Batch, transferTasks []Task, workflowID string,
runID string, cqlNowTimestamp int64) {
for _, task := range transferTasks {
var transferTaskID int64
var taskList string
var scheduleID int64

switch task.GetType() {
case TaskTypeActivity:
transferTaskID = task.(*ActivityTask).TaskID
taskList = task.(*ActivityTask).TaskList
scheduleID = task.(*ActivityTask).ScheduleID

case TaskTypeDecision:
transferTaskID = task.(*DecisionTask).TaskID
taskList = task.(*DecisionTask).TaskList
scheduleID = task.(*DecisionTask).ScheduleID
}
Expand All @@ -788,18 +866,64 @@ func (d *cassandraPersistence) createTransferTasks(batch *gocql.Batch, transferT
runID,
workflowID,
runID,
transferTaskID,
task.GetTaskID(),
taskList,
task.GetType(),
scheduleID,
cqlNowTimestamp,
lockToken,
0,
transferTaskID,
task.GetTaskID(),
lockToken)
}
}

func (d *cassandraPersistence) createTimerTasks(batch *gocql.Batch, timerTasks []Task, deleteTimerTask Task, workflowID string,
runID string, cqlNowTimestamp int64) {

for _, task := range timerTasks {
var eventID int64

lockToken := uuid.New()
timeoutType := 0

switch task.GetType() {
case TaskTypeDecisionTimeout:
eventID = task.(*DecisionTimeoutTask).EventID

case TaskTypeActivityTimeout:
eventID = task.(*ActivityTimeoutTask).EventID
timeoutType = task.(*ActivityTimeoutTask).TimeoutType

case TaskTypeUserTimer:
eventID = task.(*UserTimerTask).EventID
}

batch.Query(templateCreateTimerTaskQuery,
d.shardID,
rowTypeTimerTask,
permanentWorkflowID,
permanentRunID,
workflowID,
runID,
task.GetTaskID(),
task.GetType(),
timeoutType,
eventID,
task.GetTaskID(),
lockToken)
}

if deleteTimerTask != nil {
batch.Query(templateCompleteTimerTaskQuery,
d.shardID,
rowTypeTimerTask,
permanentWorkflowID,
permanentRunID,
deleteTimerTask.GetTaskID())
}
}

func createShardInfo(result map[string]interface{}) *ShardInfo {
info := &ShardInfo{}
for k, v := range result {
Expand Down Expand Up @@ -873,3 +997,25 @@ func createTaskInfo(result map[string]interface{}) *TaskInfo {

return info
}

func createTimerInfo(result map[string]interface{}) *TimerInfo {
info := &TimerInfo{}
for k, v := range result {
switch k {
case "workflow_id":
info.WorkflowID = v.(string)
case "run_id":
info.RunID = v.(gocql.UUID).String()
case "task_id":
info.TaskID = v.(int64)
case "type":
info.TaskType = v.(int)
case "timeoutType":
info.TimeoutType = v.(int)
case "event_id":
info.EventID = v.(int64)
}
}

return info
}
Loading

0 comments on commit 4f7ff90

Please sign in to comment.