Skip to content

Commit

Permalink
Timer Processor to checkpoint ack level (cadence-workflow#233)
Browse files Browse the repository at this point in the history
* Scheme change to persist timer Ack level.

* Shard Context to allocate sequence numbers for timer task ID

* Timer Q Processor to maintain ack level.

* Update History Engine.

* Update ShardContext, Remove timer sequence number.

* Checkpoint timestamp up to which timer tasks are processed.

* TimerIDs never go below checkpoint ack level.

* Logs and fixes.

* Remove un-used variables.

* go format.

* Add unit test for shardContext.TimerAckLevel

* rename updateShardInfo -> updateShardInfoLocked

* remove shardContext from timerBuilder.

* linting.

* Add visibility_timestamp to executions schema

* Update schema to add visibility_ts

* rename visibility_ts

* Timer q processor to use timestamp, sequence.

* Notify new timer doesn't query until it is current.

* linting.

* Fix timer mock test.

* move Get/Set Visibility Time out of task to server methods.

* Move to use mock timer.

* linting.

* Fix the quick pending timer level.

* Adjust batch size.
  • Loading branch information
sivakku authored Jun 13, 2017
1 parent 89bae56 commit 4614c23
Show file tree
Hide file tree
Showing 19 changed files with 784 additions and 448 deletions.
118 changes: 103 additions & 15 deletions common/persistence/cassandraPersistence.go

Large diffs are not rendered by default.

10 changes: 5 additions & 5 deletions common/persistence/cassandraPersistence_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
package persistence

import (
"math"
"os"
"testing"
"time"
Expand Down Expand Up @@ -641,18 +640,19 @@ func (s *cassandraPersistenceSuite) TestTimerTasks() {
updatedInfo := copyWorkflowExecutionInfo(info0)
updatedInfo.NextEventID = int64(5)
updatedInfo.LastProcessedEvent = int64(2)
tasks := []Task{&DecisionTimeoutTask{1, 2}}
tasks := []Task{&DecisionTimeoutTask{time.Now(), 1, 2}}
err2 := s.UpdateWorkflowExecution(updatedInfo, []int64{int64(4)}, nil, int64(3), tasks, nil, nil, nil, nil, nil)
s.Nil(err2, "No error expected.")

timerTasks, err1 := s.GetTimerIndexTasks(-1, math.MaxInt64)
timerTasks, err1 := s.GetTimerIndexTasks()
s.Nil(err1, "No error expected.")
s.NotNil(timerTasks, "expected valid list of tasks.")

err2 = s.UpdateWorkflowExecution(updatedInfo, nil, nil, int64(5), nil, &DecisionTimeoutTask{TaskID: timerTasks[0].TaskID}, nil, nil, nil, nil)
deleteTimerTask := &DecisionTimeoutTask{VisibilityTimestamp: timerTasks[0].VisibilityTimestamp, TaskID: timerTasks[0].TaskID}
err2 = s.UpdateWorkflowExecution(updatedInfo, nil, nil, int64(5), nil, deleteTimerTask, nil, nil, nil, nil)
s.Nil(err2, "No error expected.")

timerTasks2, err2 := s.GetTimerIndexTasks(-1, math.MaxInt64)
timerTasks2, err2 := s.GetTimerIndexTasks()
s.Nil(err2, "No error expected.")
s.Empty(timerTasks2, "expected empty task list.")
}
Expand Down
72 changes: 54 additions & 18 deletions common/persistence/dataInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ type (
StolenSinceRenew int
UpdatedAt time.Time
TransferAckLevel int64
TimerAckLevel time.Time
}

// WorkflowExecutionInfo describes a workflow execution
Expand Down Expand Up @@ -150,13 +151,14 @@ type (

// TimerTaskInfo describes a timer task.
TimerTaskInfo struct {
DomainID string
WorkflowID string
RunID string
TaskID int64
TaskType int
TimeoutType int
EventID int64
DomainID string
WorkflowID string
RunID string
VisibilityTimestamp time.Time
TaskID int64
TaskType int
TimeoutType int
EventID int64
}

// TaskListInfo describes a state of a task list implementation.
Expand Down Expand Up @@ -208,8 +210,9 @@ type (

// DecisionTimeoutTask identifies a timeout task.
DecisionTimeoutTask struct {
TaskID int64
EventID int64
VisibilityTimestamp time.Time
TaskID int64
EventID int64
}

// CancelExecutionTask identifies a transfer task for cancel of execution
Expand All @@ -231,15 +234,17 @@ type (

// ActivityTimeoutTask identifies a timeout task.
ActivityTimeoutTask struct {
TaskID int64
TimeoutType int
EventID int64
VisibilityTimestamp time.Time
TaskID int64
TimeoutType int
EventID int64
}

// UserTimerTask identifies a timeout task.
UserTimerTask struct {
TaskID int64
EventID int64
VisibilityTimestamp time.Time
TaskID int64
EventID int64
}

// WorkflowMutableState indicates workflow related state
Expand Down Expand Up @@ -400,7 +405,8 @@ type (

// CompleteTimerTaskRequest is used to complete a task in the timer task queue
CompleteTimerTaskRequest struct {
TaskID int64
VisibilityTimestamp time.Time
TaskID int64
}

// LeaseTaskListRequest is used to request lease of a task list
Expand Down Expand Up @@ -469,9 +475,9 @@ type (
// GetTimerIndexTasksRequest is the request for GetTimerIndexTasks
// TODO: replace this with an iterator that can configure min and max index.
GetTimerIndexTasksRequest struct {
MinKey int64
MaxKey int64
BatchSize int
MinTimestamp time.Time
MaxTimestamp time.Time
BatchSize int
}

// GetTimerIndexTasksResponse is the response for GetTimerIndexTasks
Expand Down Expand Up @@ -719,6 +725,16 @@ func (d *DecisionTimeoutTask) SetTaskID(id int64) {
d.TaskID = id
}

// GetVisibilityTimestamp gets the visibility time stamp
func (d *DecisionTimeoutTask) GetVisibilityTimestamp() time.Time {
return d.VisibilityTimestamp
}

// SetVisibilityTimestamp gets the visibility time stamp
func (d *DecisionTimeoutTask) SetVisibilityTimestamp(t time.Time) {
d.VisibilityTimestamp = t
}

// GetType returns the type of the timer task
func (a *ActivityTimeoutTask) GetType() int {
return TaskTypeActivityTimeout
Expand All @@ -734,6 +750,16 @@ func (a *ActivityTimeoutTask) SetTaskID(id int64) {
a.TaskID = id
}

// GetVisibilityTimestamp gets the visibility time stamp
func (a *ActivityTimeoutTask) GetVisibilityTimestamp() time.Time {
return a.VisibilityTimestamp
}

// SetVisibilityTimestamp gets the visibility time stamp
func (a *ActivityTimeoutTask) SetVisibilityTimestamp(t time.Time) {
a.VisibilityTimestamp = t
}

// GetType returns the type of the timer task
func (u *UserTimerTask) GetType() int {
return TaskTypeUserTimer
Expand All @@ -749,6 +775,16 @@ func (u *UserTimerTask) SetTaskID(id int64) {
u.TaskID = id
}

// GetVisibilityTimestamp gets the visibility time stamp
func (u *UserTimerTask) GetVisibilityTimestamp() time.Time {
return u.VisibilityTimestamp
}

// SetVisibilityTimestamp gets the visibility time stamp
func (u *UserTimerTask) SetVisibilityTimestamp(t time.Time) {
u.VisibilityTimestamp = t
}

// GetType returns the type of the cancel transfer task
func (u *CancelExecutionTask) GetType() int {
return TransferTaskTypeCancelExecution
Expand Down
103 changes: 76 additions & 27 deletions common/persistence/persistenceTestBase.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"math"
"math/rand"
"strings"
"sync"
"sync/atomic"
"time"

Expand Down Expand Up @@ -65,7 +66,7 @@ type (
MetadataManager MetadataManager
VisibilityMgr VisibilityManager
ShardInfo *ShardInfo
ShardContext *testShardContext
ShardContext *TestShardContext
readLevel int64
CassandraTestCluster
}
Expand All @@ -77,10 +78,12 @@ type (
session *gocql.Session
}

testShardContext struct {
// TestShardContext shard context for testing.
// TODO: Cleanup, move this out of persistence
TestShardContext struct {
sync.RWMutex
shardInfo *ShardInfo
transferSequenceNumber int64
timerSequeceNumber int64
historyMgr HistoryManager
executionMgr ExecutionManager
logger bark.Logger
Expand All @@ -95,8 +98,8 @@ type (
)

func newTestShardContext(shardInfo *ShardInfo, transferSequenceNumber int64, historyMgr HistoryManager,
executionMgr ExecutionManager, logger bark.Logger) *testShardContext {
return &testShardContext{
executionMgr ExecutionManager, logger bark.Logger) *TestShardContext {
return &TestShardContext{
shardInfo: shardInfo,
transferSequenceNumber: transferSequenceNumber,
historyMgr: historyMgr,
Expand All @@ -106,66 +109,102 @@ func newTestShardContext(shardInfo *ShardInfo, transferSequenceNumber int64, his
}
}

func (s *testShardContext) GetExecutionManager() ExecutionManager {
// GetExecutionManager test implementation
func (s *TestShardContext) GetExecutionManager() ExecutionManager {
return s.executionMgr
}

func (s *testShardContext) GetHistoryManager() HistoryManager {
// GetHistoryManager test implementation
func (s *TestShardContext) GetHistoryManager() HistoryManager {
return s.historyMgr
}

func (s *testShardContext) GetNextTransferTaskID() (int64, error) {
// GetNextTransferTaskID test implementation
func (s *TestShardContext) GetNextTransferTaskID() (int64, error) {
return atomic.AddInt64(&s.transferSequenceNumber, 1), nil
}

func (s *testShardContext) GetTransferMaxReadLevel() int64 {
// GetTransferMaxReadLevel test implementation
func (s *TestShardContext) GetTransferMaxReadLevel() int64 {
return atomic.LoadInt64(&s.transferSequenceNumber)
}

func (s *testShardContext) GetTransferAckLevel() int64 {
// GetTransferAckLevel test implementation
func (s *TestShardContext) GetTransferAckLevel() int64 {
return atomic.LoadInt64(&s.shardInfo.TransferAckLevel)
}

func (s *testShardContext) GetTimerSequenceNumber() int64 {
return atomic.AddInt64(&s.timerSequeceNumber, 1)
}

func (s *testShardContext) UpdateAckLevel(ackLevel int64) error {
// UpdateTransferAckLevel test implementation
func (s *TestShardContext) UpdateTransferAckLevel(ackLevel int64) error {
atomic.StoreInt64(&s.shardInfo.TransferAckLevel, ackLevel)
return nil
}

func (s *testShardContext) GetTransferSequenceNumber() int64 {
// GetTransferSequenceNumber test implementation
func (s *TestShardContext) GetTransferSequenceNumber() int64 {
return atomic.LoadInt64(&s.transferSequenceNumber)
}

func (s *testShardContext) CreateWorkflowExecution(request *CreateWorkflowExecutionRequest) (
// GetTimerAckLevel test implementation
func (s *TestShardContext) GetTimerAckLevel() time.Time {
s.RLock()
defer s.RLock()
return s.shardInfo.TimerAckLevel
}

// UpdateTimerAckLevel test implementation
func (s *TestShardContext) UpdateTimerAckLevel(ackLevel time.Time) error {
s.Lock()
defer s.Unlock()
s.shardInfo.TimerAckLevel = ackLevel
return nil
}

// CreateWorkflowExecution test implementation
func (s *TestShardContext) CreateWorkflowExecution(request *CreateWorkflowExecutionRequest) (
*CreateWorkflowExecutionResponse, error) {
return s.executionMgr.CreateWorkflowExecution(request)
}

func (s *testShardContext) UpdateWorkflowExecution(request *UpdateWorkflowExecutionRequest) error {
// UpdateWorkflowExecution test implementation
func (s *TestShardContext) UpdateWorkflowExecution(request *UpdateWorkflowExecutionRequest) error {
// assign IDs for the timer tasks. They need to be assigned under shard lock.
// TODO: This needs to be moved out of persistence.
for _, task := range request.TimerTasks {
seqID, err := s.GetNextTransferTaskID()
if err != nil {
panic(err)
}
task.SetTaskID(seqID)
s.logger.Infof("%v: TestShardContext: Assigning timer (timestamp: %v, seq: %v)",
time.Now().UTC(), GetVisibilityTSFrom(task).UTC(), task.GetTaskID())
}
return s.executionMgr.UpdateWorkflowExecution(request)
}

func (s *testShardContext) AppendHistoryEvents(request *AppendHistoryEventsRequest) error {
// AppendHistoryEvents test implementation
func (s *TestShardContext) AppendHistoryEvents(request *AppendHistoryEventsRequest) error {
return s.historyMgr.AppendHistoryEvents(request)
}

func (s *testShardContext) GetLogger() bark.Logger {
// GetLogger test implementation
func (s *TestShardContext) GetLogger() bark.Logger {
return s.logger
}

func (s *testShardContext) GetMetricsClient() metrics.Client {
// GetMetricsClient test implementation
func (s *TestShardContext) GetMetricsClient() metrics.Client {
return s.metricsClient
}

func (s *testShardContext) Reset() {
// Reset test implementation
func (s *TestShardContext) Reset() {
atomic.StoreInt64(&s.shardInfo.RangeID, 0)
atomic.StoreInt64(&s.shardInfo.TransferAckLevel, 0)
}

func (s *testShardContext) GetRangeID() int64 {
// GetRangeID test implementation
func (s *TestShardContext) GetRangeID() int64 {
return atomic.LoadInt64(&s.shardInfo.RangeID)
}

Expand Down Expand Up @@ -598,9 +637,11 @@ func (s *TestBase) CompleteTransferTask(taskID int64) error {
}

// GetTimerIndexTasks is a utility method to get tasks from transfer task queue
func (s *TestBase) GetTimerIndexTasks(minKey int64, maxKey int64) ([]*TimerTaskInfo, error) {
func (s *TestBase) GetTimerIndexTasks() ([]*TimerTaskInfo, error) {
response, err := s.WorkflowMgr.GetTimerIndexTasks(&GetTimerIndexTasksRequest{
MinKey: minKey, MaxKey: maxKey, BatchSize: 10})
MinTimestamp: time.Time{},
MaxTimestamp: time.Unix(0, math.MaxInt64),
BatchSize: 10})

if err != nil {
return nil, err
Expand All @@ -609,6 +650,14 @@ func (s *TestBase) GetTimerIndexTasks(minKey int64, maxKey int64) ([]*TimerTaskI
return response.Timers, nil
}

// CompleteTimerTask is a utility method to complete a timer task
func (s *TestBase) CompleteTimerTask(ts time.Time, taskID int64) error {
return s.WorkflowMgr.CompleteTimerTask(&CompleteTimerTaskRequest{
VisibilityTimestamp: ts,
TaskID: taskID,
})
}

// CreateDecisionTask is a utility method to create a task
func (s *TestBase) CreateDecisionTask(domainID string, workflowExecution workflow.WorkflowExecution, taskList string,
decisionScheduleID int64) (int64, error) {
Expand All @@ -623,7 +672,7 @@ func (s *TestBase) CreateDecisionTask(domainID string, workflowExecution workflo

taskID := s.GetNextSequenceNumber()
tasks := []*CreateTaskInfo{
&CreateTaskInfo{
{
TaskID: taskID,
Execution: workflowExecution,
Data: &TaskInfo{
Expand Down Expand Up @@ -667,7 +716,7 @@ func (s *TestBase) CreateActivityTasks(domainID string, workflowExecution workfl
}
taskID := s.GetNextSequenceNumber()
tasks := []*CreateTaskInfo{
&CreateTaskInfo{
{
TaskID: taskID,
Execution: workflowExecution,
Data: &TaskInfo{
Expand Down
Loading

0 comments on commit 4614c23

Please sign in to comment.