Skip to content

Commit

Permalink
Delete CompleteFork API from History Persistence (cadence-workflow#2675)
Browse files Browse the repository at this point in the history
  • Loading branch information
longquanzheng authored Oct 16, 2019
1 parent cbf1d4a commit 961f8b3
Show file tree
Hide file tree
Showing 26 changed files with 32 additions and 520 deletions.
70 changes: 7 additions & 63 deletions common/persistence/cassandra/cassandraHistoryPersistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,13 @@ const (

// below are templates for history_tree table
v2templateInsertTree = `INSERT INTO history_tree (` +
`tree_id, branch_id, ancestors, in_progress, fork_time, info) ` +
`VALUES (?, ?, ?, ?, ?, ?) `
`tree_id, branch_id, ancestors, fork_time, info) ` +
`VALUES (?, ?, ?, ?, ?) `

v2templateReadAllBranches = `SELECT branch_id, ancestors, in_progress, fork_time, info FROM history_tree WHERE tree_id = ? `
v2templateReadAllBranches = `SELECT branch_id, ancestors, fork_time, info FROM history_tree WHERE tree_id = ? `

v2templateDeleteBranch = `DELETE FROM history_tree WHERE tree_id = ? AND branch_id = ? `

v2templateUpdateBranch = `UPDATE history_tree set in_progress = ? WHERE tree_id = ? AND branch_id = ? `

v2templateScanAllTreeBranches = `SELECT tree_id, branch_id, fork_time, info FROM history_tree `
)

Expand Down Expand Up @@ -146,7 +144,7 @@ func (h *cassandraHistoryV2Persistence) AppendHistoryNodes(
cqlNowTimestamp := p.UnixNanoToDBTimestamp(time.Now().UnixNano())
batch := h.session.NewBatch(gocql.LoggedBatch)
batch.Query(v2templateInsertTree,
branchInfo.TreeID, branchInfo.BranchID, ancs, false, cqlNowTimestamp, request.Info)
branchInfo.TreeID, branchInfo.BranchID, ancs, cqlNowTimestamp, request.Info)
batch.Query(v2templateUpsertData,
branchInfo.TreeID, branchInfo.BranchID, request.NodeID, request.TransactionID, request.Events.Data, request.Events.Encoding)
err = h.session.ExecuteBatch(batch)
Expand Down Expand Up @@ -330,9 +328,8 @@ func (h *cassandraHistoryV2Persistence) ForkHistoryBranch(
}
cqlNowTimestamp := p.UnixNanoToDBTimestamp(time.Now().UnixNano())

// NOTE: To prevent leaking event data caused by forking, we introduce this in_progress flag.
query := h.session.Query(v2templateInsertTree,
treeID, request.NewBranchID, ancs, true, cqlNowTimestamp, request.Info)
treeID, request.NewBranchID, ancs, cqlNowTimestamp, request.Info)

err := query.Exec()
if err != nil {
Expand All @@ -341,37 +338,6 @@ func (h *cassandraHistoryV2Persistence) ForkHistoryBranch(
return resp, nil
}

// UpdateHistoryBranch update a branch
func (h *cassandraHistoryV2Persistence) CompleteForkBranch(
request *p.InternalCompleteForkBranchRequest,
) error {

branch := request.BranchInfo
treeID := *branch.TreeID
branchID := *branch.BranchID

var query *gocql.Query
if request.Success {
query = h.session.Query(v2templateUpdateBranch,
false, treeID, branchID)
err := query.Exec()
if err != nil {
return convertCommonErrors("CompleteForkBranch", err)
}
} else {
batch := h.session.NewBatch(gocql.LoggedBatch)
batch.Query(v2templateDeleteBranch, treeID, branchID)
batch.Query(v2templateRangeDeleteData,
treeID, branchID, 1)
err := h.session.ExecuteBatch(batch)
if err != nil {
return convertCommonErrors("CompleteForkBranch", err)
}
}

return nil
}

// DeleteHistoryBranch removes a branch
func (h *cassandraHistoryV2Persistence) DeleteHistoryBranch(
request *p.InternalDeleteHistoryBranchRequest,
Expand All @@ -392,15 +358,6 @@ func (h *cassandraHistoryV2Persistence) DeleteHistoryBranch(
if err != nil {
return err
}
// We won't delete the branch if there is any branch forking in progress. We will return error.
if len(rsp.ForkingInProgressBranches) > 0 {
return &p.ConditionFailedError{
Msg: fmt.Sprintf("Some branch is in progress of forking"),
}
}

// If there is no branch forking in progress we see here, it means that we are safe to calculate the deleting ranges based on the current result,
// because all the forking branches in the future would fail.

batch := h.session.NewBatch(gocql.LoggedBatch)
batch.Query(v2templateDeleteBranch, treeID, branch.BranchID)
Expand Down Expand Up @@ -505,7 +462,6 @@ func (h *cassandraHistoryV2Persistence) GetHistoryTree(

pagingToken := []byte{}
branches := make([]*workflow.HistoryBranch, 0)
forkingBranches := make([]p.HistoryBranchDetail, 0)

var iter *gocql.Iter
for {
Expand All @@ -519,20 +475,10 @@ func (h *cassandraHistoryV2Persistence) GetHistoryTree(

branchUUID := gocql.UUID{}
ancsResult := []map[string]interface{}{}
forkingInProgress := false
forkTime := time.Time{}
info := ""

for iter.Scan(&branchUUID, &ancsResult, &forkingInProgress, &forkTime, &info) {
if forkingInProgress {
br := p.HistoryBranchDetail{
TreeID: treeID,
BranchID: branchUUID.String(),
ForkTime: forkTime,
Info: info,
}
forkingBranches = append(forkingBranches, br)
}
for iter.Scan(&branchUUID, &ancsResult, &forkTime, &info) {
ancs := h.parseBranchAncestors(ancsResult)
br := &workflow.HistoryBranch{
TreeID: &treeID,
Expand All @@ -543,7 +489,6 @@ func (h *cassandraHistoryV2Persistence) GetHistoryTree(

branchUUID = gocql.UUID{}
ancsResult = []map[string]interface{}{}
forkingInProgress = false
forkTime = time.Time{}
info = ""
}
Expand All @@ -560,8 +505,7 @@ func (h *cassandraHistoryV2Persistence) GetHistoryTree(
}

return &p.GetHistoryTreeResponse{
Branches: branches,
ForkingInProgressBranches: forkingBranches,
Branches: branches,
}, nil
}

Expand Down
5 changes: 1 addition & 4 deletions common/persistence/dataInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -1375,8 +1375,7 @@ type (
// GetHistoryTreeResponse is a response to GetHistoryTreeRequest
GetHistoryTreeResponse struct {
// all branches of a tree
Branches []*workflow.HistoryBranch
ForkingInProgressBranches []HistoryBranchDetail
Branches []*workflow.HistoryBranch
}

// GetAllHistoryTreeBranchesRequest is a request of GetAllHistoryTreeBranches
Expand Down Expand Up @@ -1491,8 +1490,6 @@ type (
ReadRawHistoryBranch(request *ReadHistoryBranchRequest) (*ReadRawHistoryBranchResponse, error)
// ForkHistoryBranch forks a new branch from a old branch
ForkHistoryBranch(request *ForkHistoryBranchRequest) (*ForkHistoryBranchResponse, error)
// CompleteForkBranch will complete the forking process after update mutableState, this is to help preventing data leakage
CompleteForkBranch(request *CompleteForkBranchRequest) error
// DeleteHistoryBranch removes a branch
// If this is the last branch to delete, it will also remove the root node
DeleteHistoryBranch(request *DeleteHistoryBranchRequest) error
Expand Down
27 changes: 0 additions & 27 deletions common/persistence/historyStore.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,33 +144,6 @@ func (m *historyV2ManagerImpl) DeleteHistoryBranch(
return m.persistence.DeleteHistoryBranch(req)
}

// CompleteForkBranch complete the forking process
func (m *historyV2ManagerImpl) CompleteForkBranch(
request *CompleteForkBranchRequest,
) error {

var branch workflow.HistoryBranch
err := m.thriftEncoder.Decode(request.BranchToken, &branch)
if err != nil {
return err
}

shardID, err := getShardID(request.ShardID)
if err != nil {
m.logger.Error("shardID is not set in complete fork branch operation", tag.Error(err))
return &workflow.InternalServiceError{
Message: err.Error(),
}
}
req := &InternalCompleteForkBranchRequest{
BranchInfo: branch,
Success: request.Success,
ShardID: shardID,
}

return m.persistence.CompleteForkBranch(req)
}

// GetHistoryTree returns all branch information of a tree
func (m *historyV2ManagerImpl) GetHistoryTree(
request *GetHistoryTreeRequest,
Expand Down
101 changes: 0 additions & 101 deletions common/persistence/historyStoreUtil.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,110 +22,9 @@ package persistence

import (
"fmt"
"time"

"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"

"github.com/uber/cadence/.gen/go/shared"
)

/*
DeleteWorkflowExecutionHistoryV2 is used to delete workflow execution history from historyV2.
Histories in historyV2 are represented as a tree rather than a linear list of history events.
The tree based history structure introduces complexities for deletions. The following concepts
must be understood in order to understand how histories get deleted:
- Branch token is used to identify a single path from root to leaf on the history tree
- Creation of child branches from parent branches are not atomic, their creation can be in progress during this method
- When a child branch has been forked from the parent branch it will call CompleteForkBranch
- Due to failure cases it is possible the child branch started to fork but never called CompleteForkBranch
- When deleting a branch, only the section of the branch which is not relied upon by other branches can be safely deleted
Given the above understanding this method can now be explained as the following protocol
1. Attempt to delete branch identified by branch token. The persistence APIs which get invoked are smart enough
to only delete the section of branch identified by branch token which are not depended on by other branches.
2. If the delete was successful then return nil.
3. If delete failed then check if it failed due to a child branch fork creation being in progress (error type for
child branch fork in progress is ConditionFailedError).
4. If error was not caused by child branch fork in progress then return error.
5. Otherwise history will attempt to be deleted while a child branch fork is ongoing. In order to do this
it is checked if the child branch fork was created less than one minute ago. If it was created less than one minute ago
return retryable error type indicating that branch cannot be deleted currently but client can retry deletion. If child
branch fork has been in progress for longer than one minute then it is assumed the child branch fork will be successful
and only the section of history which is not common to the child being created will get deleted.
This protocol is safe in that it will never delete history which is relied upon. But it is not complete in the sense
that zombie history segments can remain under some rare failure cases. Consider the following sequence of events
1. Child branch fork started and is in progress.
2. Forking has been in progress for longer than one minute, therefore forking is assumed to be successful.
3. History section of parent branch is deleted. But section of parent which is common to child is not deleted.
4. Our assumption of child branch fork being successful is actually wrong and the child never successfully forked.
Under this rare case the section of parent history which was assumed to be common to child will be a zombie history section.
*/
func DeleteWorkflowExecutionHistoryV2(historyV2Mgr HistoryManager, branchToken []byte, shardID *int, logger log.Logger) error {
err := historyV2Mgr.DeleteHistoryBranch(&DeleteHistoryBranchRequest{
BranchToken: branchToken,
ShardID: shardID,
})
if err == nil {
return nil
}

_, ok := err.(*ConditionFailedError)
if !ok {
return err
}

// we believe this is very rare case to see: DeleteHistoryBranch returns ConditionFailedError means there are some incomplete branches

resp, err := historyV2Mgr.GetHistoryTree(&GetHistoryTreeRequest{
BranchToken: branchToken,
ShardID: shardID,
})
if err != nil {
return err
}

if len(resp.ForkingInProgressBranches) > 0 {
logInfo := ""
defer func() {
logger.Warn("seeing incomplete forking branches when deleting branch", tag.DetailInfo(logInfo))
}()
for _, br := range resp.ForkingInProgressBranches {
if time.Now().After(br.ForkTime.Add(time.Minute)) {
logInfo += ";" + br.Info
// this can be case of goroutine crash the API call doesn't call CompleteForkBranch() to clean up the new branch
bt, err := NewHistoryBranchTokenFromAnother(br.BranchID, branchToken)
if err != nil {
return err
}
err = historyV2Mgr.CompleteForkBranch(&CompleteForkBranchRequest{
// actually we don't know it is success or fail. but use true for safety
// the worst case is we may leak some data that will never deleted
Success: true,
BranchToken: bt,
ShardID: shardID,
})
if err != nil {
return err
}
} else {
// in case of the forking is in progress within a short time period
return &shared.ServiceBusyError{
Message: "waiting for forking to complete",
}
}
}
}
err = historyV2Mgr.DeleteHistoryBranch(&DeleteHistoryBranchRequest{
BranchToken: branchToken,
ShardID: shardID,
})
return err
}

// ReadFullPageV2Events reads a full page of history events from HistoryManager. Due to storage format of V2 History
// it is not guaranteed that pageSize amount of data is returned. Function returns the list of history events, the size
// of data read, the next page token, and an error if present.
Expand Down
32 changes: 2 additions & 30 deletions common/persistence/persistence-tests/historyV2PersistenceTest.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,6 @@ func (s *HistoryV2PersistenceSuite) TestReadBranchByPagination() {
// fork from here
bi2, err := s.fork(bi, 13)
s.Nil(err)
s.completeFork(bi2, true)

events = s.genRandomEvents([]int64{13}, 4)
err = s.appendNewNode(bi2, events, 12)
Expand Down Expand Up @@ -561,11 +560,9 @@ func (s *HistoryV2PersistenceSuite) TestConcurrentlyForkAndAppendBranches() {
s.Nil(err)
s.Equal((concurrency)*2+1, len(events))

s.descInProgress(treeID)
if idx == 0 {
s.completeFork(bi, false)
} else {
s.completeFork(bi, true)
err = s.deleteHistoryBranch(bi)
s.Nil(err)
}

}(i)
Expand Down Expand Up @@ -597,7 +594,6 @@ func (s *HistoryV2PersistenceSuite) TestConcurrentlyForkAndAppendBranches() {
s.Nil(err)
level2Br.Store(idx, bi)

s.completeFork(bi, true)
// append second batch to second level
eids := make([]int64, 0)
for i := forkNodeID; i <= int64(concurrency)*3+1; i++ {
Expand Down Expand Up @@ -737,23 +733,9 @@ func (s *HistoryV2PersistenceSuite) descTree(treeID string) []*workflow.HistoryB
ShardID: common.IntPtr(s.ShardInfo.ShardID),
})
s.Nil(err)
s.True(len(resp.ForkingInProgressBranches) == 0)
return resp.Branches
}

// persistence helper
func (s *HistoryV2PersistenceSuite) descInProgress(treeID string) {
resp, err := s.HistoryV2Mgr.GetHistoryTree(&p.GetHistoryTreeRequest{
TreeID: treeID,
ShardID: common.IntPtr(s.ShardInfo.ShardID),
})
s.Nil(err)
s.True(len(resp.ForkingInProgressBranches) > 0)
forkTime := p.UnixNanoToDBTimestamp(resp.ForkingInProgressBranches[0].ForkTime.UnixNano())
s.True(forkTime > defaultVisibilityTimestamp)
s.Equal(testForkRunID, resp.ForkingInProgressBranches[0].Info)
}

// persistence helper
func (s *HistoryV2PersistenceSuite) read(branch []byte, minID, maxID int64) []*workflow.HistoryEvent {
res, err := s.readWithError(branch, minID, maxID)
Expand Down Expand Up @@ -860,13 +842,3 @@ func (s *HistoryV2PersistenceSuite) fork(forkBranch []byte, forkNodeID int64) ([
err := backoff.Retry(op, historyTestRetryPolicy, isConditionFail)
return bi, err
}

// persistence helper
func (s *HistoryV2PersistenceSuite) completeFork(forkBranch []byte, succ bool) {
err := s.HistoryV2Mgr.CompleteForkBranch(&p.CompleteForkBranchRequest{
BranchToken: forkBranch,
Success: succ,
ShardID: common.IntPtr(s.ShardInfo.ShardID),
})
s.Nil(err)
}
2 changes: 0 additions & 2 deletions common/persistence/persistenceInterface.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,6 @@ type (
ForkHistoryBranch(request *InternalForkHistoryBranchRequest) (*InternalForkHistoryBranchResponse, error)
// DeleteHistoryBranch removes a branch
DeleteHistoryBranch(request *InternalDeleteHistoryBranchRequest) error
// UpdateHistoryBranch update a branch
CompleteForkBranch(request *InternalCompleteForkBranchRequest) error
// GetHistoryTree returns all branch information of a tree
GetHistoryTree(request *GetHistoryTreeRequest) (*GetHistoryTreeResponse, error)
// GetAllHistoryTreeBranches returns all branches of all trees
Expand Down
Loading

0 comments on commit 961f8b3

Please sign in to comment.