Skip to content

Commit

Permalink
Serialize task list persistence writes (cadence-workflow#285)
Browse files Browse the repository at this point in the history
Concurrent Cassandra LWT to the same partition are known to cause timeout errors.
This change ensures that all LWT to a task list are serialized by using a mutex.
  • Loading branch information
Tamer Eldeeb authored Jul 24, 2017
1 parent cc82d24 commit b77af1e
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 2 deletions.
10 changes: 8 additions & 2 deletions service/matching/taskListManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,12 @@ type taskListManagerImpl struct {
logger bark.Logger
metricsClient metrics.Client
engine *matchingEngineImpl
taskWriter *taskWriter
taskBuffer chan *persistence.TaskInfo // tasks loaded from persistence
// serializes all writes to persistence
// This is needed because of a known Cassandra issue where concurrent LWT to the same partition
// cause timeout errors.
persistenceLock sync.Mutex
taskWriter *taskWriter
taskBuffer chan *persistence.TaskInfo // tasks loaded from persistence
// Sync channel used to perform sync matching.
// It must to be unbuffered. addTask publishes to it asynchronously and expects publish to succeed
// only if there is waiting poll that consumes from it.
Expand Down Expand Up @@ -204,6 +208,8 @@ func (c *taskListManagerImpl) persistAckLevel() error {
},
}
c.Unlock()
c.persistenceLock.Lock()
defer c.persistenceLock.Unlock()
_, err := c.engine.taskManager.UpdateTaskList(updateTaskListRequest)
return err
}
Expand Down
2 changes: 2 additions & 0 deletions service/matching/taskWriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ writerLoop:
maxReadLevel = taskIDs[i]
}

w.tlMgr.persistenceLock.Lock()
r, err := w.taskManager.CreateTasks(&persistence.CreateTasksRequest{
DomainID: w.taskListID.domainID,
TaskList: w.taskListID.taskListName,
Expand All @@ -143,6 +144,7 @@ writerLoop:
// might be out of sync. This is OK as caller can just retry.
RangeID: rangeID,
})
w.tlMgr.persistenceLock.Unlock()

if err != nil {
logging.LogPersistantStoreErrorEvent(w.logger, logging.TagValueStoreOperationCreateTask, err,
Expand Down

0 comments on commit b77af1e

Please sign in to comment.