Skip to content

Commit

Permalink
TransferQueueProcessor can skip transfer tasks (cadence-workflow#151)
Browse files Browse the repository at this point in the history
TransferQueueProcessor uses Quorum level consistency to read tasks.
These reads are not isolated from in-flight LWT transactions running
in Cassandra, which could lead the read to observe partial results.
This can lead to tasks being skipped.

We can use Serial level consistency to deal with this but it is not
supported at the moment by the gocql client, and it might cause reads
to be too expensive. Instead, we'll use a MaxReadLevel to make sure
that the read requests from TransferQueueProcessor do not attempt to
read the results of LWT writes that are still in flight.

Issue cadence-workflow#150
  • Loading branch information
Tamer Eldeeb authored Apr 25, 2017
1 parent 143d2fc commit f983701
Show file tree
Hide file tree
Showing 7 changed files with 43 additions and 11 deletions.
4 changes: 3 additions & 1 deletion common/persistence/cassandraPersistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,8 @@ const (
`and domain_id = ? ` +
`and workflow_id = ? ` +
`and run_id = ? ` +
`and task_id > ? LIMIT ?`
`and task_id > ? ` +
`and task_id <= ? LIMIT ?`

templateCompleteTransferTaskQuery = `DELETE FROM executions ` +
`WHERE shard_id = ? ` +
Expand Down Expand Up @@ -888,6 +889,7 @@ func (d *cassandraPersistence) GetTransferTasks(request *GetTransferTasksRequest
rowTypeTransferWorkflowID,
rowTypeTransferRunID,
request.ReadLevel,
request.MaxReadLevel,
request.BatchSize)

iter := query.Iter()
Expand Down
5 changes: 3 additions & 2 deletions common/persistence/dataInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,8 +318,9 @@ type (

// GetTransferTasksRequest is used to read tasks from the transfer task queue
GetTransferTasksRequest struct {
ReadLevel int64
BatchSize int
ReadLevel int64
MaxReadLevel int64
BatchSize int
}

// GetTransferTasksResponse is the response to GetTransferTasksRequest
Expand Down
11 changes: 8 additions & 3 deletions common/persistence/persistenceTestBase.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,10 @@ func (s *testShardContext) GetNextTransferTaskID() (int64, error) {
return atomic.AddInt64(&s.transferSequenceNumber, 1), nil
}

func (s *testShardContext) GetTransferMaxReadLevel() int64 {
return atomic.LoadInt64(&s.transferSequenceNumber)
}

func (s *testShardContext) GetTransferAckLevel() int64 {
return atomic.LoadInt64(&s.shardInfo.TransferAckLevel)
}
Expand Down Expand Up @@ -252,7 +256,7 @@ func (s *TestBase) UpdateShard(updatedInfo *ShardInfo, previousRangeID int64) er

// CreateWorkflowExecution is a utility method to create workflow executions
func (s *TestBase) CreateWorkflowExecution(domainID string, workflowExecution workflow.WorkflowExecution, taskList,
wType string, decisionTimeout int32, executionContext []byte, nextEventID int64, lastProcessedEventID int64,
wType string, decisionTimeout int32, executionContext []byte, nextEventID int64, lastProcessedEventID int64,
decisionScheduleID int64, timerTasks []Task) (string, error) {
response, err := s.WorkflowMgr.CreateWorkflowExecution(&CreateWorkflowExecutionRequest{
RequestID: uuid.New(),
Expand Down Expand Up @@ -489,8 +493,9 @@ func (s *TestBase) DeleteWorkflowExecution(info *WorkflowExecutionInfo) error {
// GetTransferTasks is a utility method to get tasks from transfer task queue
func (s *TestBase) GetTransferTasks(batchSize int) ([]*TransferTaskInfo, error) {
response, err := s.WorkflowMgr.GetTransferTasks(&GetTransferTasksRequest{
ReadLevel: s.GetReadLevel(),
BatchSize: batchSize,
ReadLevel: s.GetReadLevel(),
MaxReadLevel: int64(math.MaxInt64),
BatchSize: batchSize,
})

if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,7 @@ Update_History_Loop:
}

completedID := completedEvent.GetEventId()
hasUnhandledEvents := ((completedID-startedID) > 1)
hasUnhandledEvents := ((completedID - startedID) > 1)
isComplete := false
transferTasks := []persistence.Task{}
timerTasks := []persistence.Task{}
Expand Down
23 changes: 23 additions & 0 deletions service/history/shardContext.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type (
GetHistoryManager() persistence.HistoryManager
GetNextTransferTaskID() (int64, error)
GetTransferSequenceNumber() int64
GetTransferMaxReadLevel() int64
GetTransferAckLevel() int64
UpdateAckLevel(ackLevel int64) error
GetTimerSequenceNumber() int64
Expand Down Expand Up @@ -49,6 +50,7 @@ type (
shardInfo *persistence.ShardInfo
transferSequenceNumber int64
maxTransferSequenceNumber int64
transferMaxReadLevel int64
}
)

Expand Down Expand Up @@ -83,6 +85,12 @@ func (s *shardContextImpl) GetTransferAckLevel() int64 {
return s.shardInfo.TransferAckLevel
}

func (s *shardContextImpl) GetTransferMaxReadLevel() int64 {
s.RLock()
defer s.RUnlock()
return s.transferMaxReadLevel
}

func (s *shardContextImpl) UpdateAckLevel(ackLevel int64) error {
s.Lock()
defer s.Unlock()
Expand Down Expand Up @@ -114,6 +122,7 @@ func (s *shardContextImpl) CreateWorkflowExecution(request *persistence.CreateWo
s.Lock()
defer s.Unlock()

transferMaxReadLevel := int64(0)
// assign IDs for the transfer tasks
// Must be done under the shard lock to ensure transfer tasks are written to persistence in increasing
// ID order
Expand All @@ -123,7 +132,9 @@ func (s *shardContextImpl) CreateWorkflowExecution(request *persistence.CreateWo
return nil, err
}
task.SetTaskID(id)
transferMaxReadLevel = id
}
defer s.updateMaxReadLevelLocked(transferMaxReadLevel)

Create_Loop:
for attempt := 0; attempt < conditionalRetryCount; attempt++ {
Expand Down Expand Up @@ -170,6 +181,7 @@ func (s *shardContextImpl) UpdateWorkflowExecution(request *persistence.UpdateWo
s.Lock()
defer s.Unlock()

transferMaxReadLevel := int64(0)
// assign IDs for the transfer tasks
// Must be done under the shard lock to ensure transfer tasks are written to persistence in increasing
// ID order
Expand All @@ -179,6 +191,7 @@ func (s *shardContextImpl) UpdateWorkflowExecution(request *persistence.UpdateWo
return err
}
task.SetTaskID(id)
transferMaxReadLevel = id
}

if request.ContinueAsNew != nil {
Expand All @@ -188,8 +201,10 @@ func (s *shardContextImpl) UpdateWorkflowExecution(request *persistence.UpdateWo
return err
}
task.SetTaskID(id)
transferMaxReadLevel = id
}
}
defer s.updateMaxReadLevelLocked(transferMaxReadLevel)

Update_Loop:
for attempt := 0; attempt < conditionalRetryCount; attempt++ {
Expand Down Expand Up @@ -314,6 +329,7 @@ func (s *shardContextImpl) renewRangeLocked(isStealing bool) error {
// Range is successfully updated in cassandra now update shard context to reflect new range
s.transferSequenceNumber = updatedShardInfo.RangeID << s.rangeSize
s.maxTransferSequenceNumber = (updatedShardInfo.RangeID + 1) << s.rangeSize
s.transferMaxReadLevel = s.transferSequenceNumber - 1
atomic.StoreInt64(&s.rangeID, updatedShardInfo.RangeID)
s.shardInfo = updatedShardInfo

Expand All @@ -323,6 +339,13 @@ func (s *shardContextImpl) renewRangeLocked(isStealing bool) error {
return nil
}

func (s *shardContextImpl) updateMaxReadLevelLocked(rl int64) {
if rl > s.transferMaxReadLevel {
s.logger.Debugf("Updating MaxReadLevel: %v", rl)
s.transferMaxReadLevel = rl
}
}

// TODO: This method has too many parameters. Clean it up. Maybe create a struct to pass in as parameter.
func acquireShard(shardID int, shardManager persistence.ShardManager, historyMgr persistence.HistoryManager,
executionMgr persistence.ExecutionManager, owner string, closeCh chan<- int, logger bark.Logger,
Expand Down
8 changes: 5 additions & 3 deletions service/history/transferQueueProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ type (
sync.RWMutex
outstandingTasks map[int64]bool
readLevel int64
maxReadLevel int64
ackLevel int64
}
)
Expand Down Expand Up @@ -305,7 +306,7 @@ ProcessRetryLoop:
ExternalInitiatedEventId: common.Int64Ptr(task.ScheduleID),
ExternalWorkflowExecution: &workflow.WorkflowExecution{
WorkflowId: common.StringPtr(task.WorkflowID),
RunId: common.StringPtr(task.RunID),
RunId: common.StringPtr(task.RunID),
},
}

Expand Down Expand Up @@ -362,8 +363,9 @@ func (a *ackManager) readTransferTasks() ([]*persistence.TransferTaskInfo, error
rLevel := a.readLevel
a.RUnlock()
response, err := a.executionMgr.GetTransferTasks(&persistence.GetTransferTasksRequest{
ReadLevel: rLevel,
BatchSize: transferTaskBatchSize,
ReadLevel: rLevel,
MaxReadLevel: a.shard.GetTransferMaxReadLevel(),
BatchSize: transferTaskBatchSize,
})

if err != nil {
Expand Down
1 change: 0 additions & 1 deletion service/history/transferQueueProcessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,6 @@ workerPump:
s.mockHistoryClient.AssertExpectations(s.T())
}


func createAddRequestFromTask(task *persistence.TransferTaskInfo) interface{} {
var res interface{}
domainID := task.DomainID
Expand Down

0 comments on commit f983701

Please sign in to comment.