Skip to content

Commit

Permalink
Support shard in event v2 sql store (cadence-workflow#1689)
Browse files Browse the repository at this point in the history
* Support shard in event v2 SQL store
  • Loading branch information
yux0 authored Apr 15, 2019
1 parent 0c50d08 commit 623b921
Show file tree
Hide file tree
Showing 42 changed files with 348 additions and 78 deletions.
14 changes: 13 additions & 1 deletion common/persistence/dataInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -1225,6 +1225,8 @@ type (
TransactionID int64
// It is to suggest a binary encoding type to serialize history events
Encoding common.EncodingType
// The shard to get history node data
ShardID *int
}

// AppendHistoryNodesResponse is a response to AppendHistoryNodesRequest
Expand All @@ -1246,6 +1248,8 @@ type (
PageSize int
// Token to continue reading next page of history append transactions. Pass in empty slice for first page
NextPageToken []byte
// The shard to get history branch data
ShardID *int
}

// ReadHistoryBranchResponse is the response to ReadHistoryBranchRequest
Expand Down Expand Up @@ -1286,6 +1290,8 @@ type (
ForkNodeID int64
// the info for clean up data in background
Info string
// The shard to get history branch data
ShardID *int
}

// ForkHistoryBranchResponse is the response to ForkHistoryBranchRequest
Expand All @@ -1300,18 +1306,24 @@ type (
BranchToken []byte
// true means the fork is success, will update the flag, otherwise will delete the new branch
Success bool
// The shard to update history branch data
ShardID *int
}

// DeleteHistoryBranchRequest is used to remove a history branch
DeleteHistoryBranchRequest struct {
// branch to be deleted
BranchToken []byte
// The shard to delete history branch data
ShardID *int
}

// GetHistoryTreeRequest is used to retrieve branch info of a history tree
GetHistoryTreeRequest struct {
// A UUID of a tree
TreeID string
// Get data from this shard
ShardID *int
// optional: can provide treeID via branchToken if treeID is empty
BranchToken []byte
}
Expand Down Expand Up @@ -1434,7 +1446,7 @@ type (
// V2 regards history events growing as a tree, decoupled from workflow concepts
// For Cadence, treeID is new runID, except for fork(reset), treeID will be the runID that it forks from.

// AppendHistoryNodes add(or override) a batach of nodes to a history branch
// AppendHistoryNodes add(or override) a batch of nodes to a history branch
AppendHistoryNodes(request *AppendHistoryNodesRequest) (*AppendHistoryNodesResponse, error)
// ReadHistoryBranch returns history node data for a branch
ReadHistoryBranch(request *ReadHistoryBranchRequest) (*ReadHistoryBranchResponse, error)
Expand Down
3 changes: 1 addition & 2 deletions common/persistence/historyStore.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,9 +171,8 @@ func (m *historyManagerImpl) getWorkflowExecutionHistory(request *GetWorkflowExe
})
logger.Error("Unexpected event batch")
return nil, nil, nil, 0, 0, fmt.Errorf("corrupted history event batch")
} else {
token.LastEventID = historyBatch[0].GetEventId() - 1
}
token.LastEventID = historyBatch[0].GetEventId() - 1
}

if historyBatch[0].GetEventId() != token.LastEventID+1 {
Expand Down
40 changes: 39 additions & 1 deletion common/persistence/historyV2Store.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,18 @@ func (m *historyV2ManagerImpl) ForkHistoryBranch(request *ForkHistoryBranchReque
if err != nil {
return nil, err
}

shardID, err := getShardID(request.ShardID)
if err != nil {
return nil, &workflow.InternalServiceError{
Message: err.Error(),
}
}
req := &InternalForkHistoryBranchRequest{
ForkBranchInfo: forkBranch,
ForkNodeID: request.ForkNodeID,
NewBranchID: uuid.New(),
Info: request.Info,
ShardID: shardID,
}

resp, err := m.persistence.ForkHistoryBranch(req)
Expand All @@ -104,8 +110,16 @@ func (m *historyV2ManagerImpl) DeleteHistoryBranch(request *DeleteHistoryBranchR
return err
}

shardID, err := getShardID(request.ShardID)
if err != nil {
m.logger.WithError(err).Error("shardID is not set in delete history operation")
return &workflow.InternalServiceError{
Message: err.Error(),
}
}
req := &InternalDeleteHistoryBranchRequest{
BranchInfo: branch,
ShardID: shardID,
}

return m.persistence.DeleteHistoryBranch(req)
Expand All @@ -119,9 +133,17 @@ func (m *historyV2ManagerImpl) CompleteForkBranch(request *CompleteForkBranchReq
return err
}

shardID, err := getShardID(request.ShardID)
if err != nil {
m.logger.WithError(err).Error("shardID is not set in complete fork branch operation")
return &workflow.InternalServiceError{
Message: err.Error(),
}
}
req := &InternalCompleteForkBranchRequest{
BranchInfo: branch,
Success: request.Success,
ShardID: shardID,
}

return m.persistence.CompleteForkBranch(req)
Expand Down Expand Up @@ -179,13 +201,21 @@ func (m *historyV2ManagerImpl) AppendHistoryNodes(request *AppendHistoryNodesReq
blob, err := m.historySerializer.SerializeBatchEvents(request.Events, request.Encoding)
size := len(blob.Data)

shardID, err := getShardID(request.ShardID)
if err != nil {
m.logger.WithError(err).Error("shardID is not set in append history nodes operation")
return nil, &workflow.InternalServiceError{
Message: err.Error(),
}
}
req := &InternalAppendHistoryNodesRequest{
IsNewBranch: request.IsNewBranch,
Info: request.Info,
BranchInfo: branch,
NodeID: nodeID,
Events: blob,
TransactionID: request.TransactionID,
ShardID: shardID,
}

err = m.persistence.AppendHistoryNodes(req)
Expand Down Expand Up @@ -281,13 +311,21 @@ func (m *historyV2ManagerImpl) readHistoryBranch(byBatch bool, request *ReadHist
maxNodeID = request.MaxEventID
}

shardID, err := getShardID(request.ShardID)
if err != nil {
m.logger.WithError(err).Error("shardID is not set in read history branch operation")
return nil, nil, nil, 0, 0, &workflow.InternalServiceError{
Message: err.Error(),
}
}
req := &InternalReadHistoryBranchRequest{
TreeID: treeID,
BranchID: *allBRs[token.CurrentRangeIndex].BranchID,
MinNodeID: request.MinEventID,
MaxNodeID: maxNodeID,
PageSize: request.PageSize,
NextPageToken: token.StoreToken,
ShardID: shardID,
}

resp, err := m.persistence.ReadHistoryBranch(req)
Expand Down
15 changes: 14 additions & 1 deletion common/persistence/historyV2StoreUtil.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package persistence

import (
"fmt"
"time"

"github.com/uber-common/bark"
Expand Down Expand Up @@ -61,9 +62,10 @@ that zombie history segments can remain under some rare failure cases. Consider
Under this rare case the section of parent history which was assumed to be common to child will be a zombie history section.
*/
func DeleteWorkflowExecutionHistoryV2(historyV2Mgr HistoryV2Manager, branchToken []byte, logger bark.Logger) error {
func DeleteWorkflowExecutionHistoryV2(historyV2Mgr HistoryV2Manager, branchToken []byte, shardID *int, logger bark.Logger) error {
err := historyV2Mgr.DeleteHistoryBranch(&DeleteHistoryBranchRequest{
BranchToken: branchToken,
ShardID: shardID,
})
if err == nil {
return nil
Expand All @@ -78,6 +80,7 @@ func DeleteWorkflowExecutionHistoryV2(historyV2Mgr HistoryV2Manager, branchToken

resp, err := historyV2Mgr.GetHistoryTree(&GetHistoryTreeRequest{
BranchToken: branchToken,
ShardID: shardID,
})
if err != nil {
return err
Expand All @@ -101,6 +104,7 @@ func DeleteWorkflowExecutionHistoryV2(historyV2Mgr HistoryV2Manager, branchToken
// the worst case is we may leak some data that will never deleted
Success: true,
BranchToken: bt,
ShardID: shardID,
})
if err != nil {
return err
Expand All @@ -115,6 +119,7 @@ func DeleteWorkflowExecutionHistoryV2(historyV2Mgr HistoryV2Manager, branchToken
}
err = historyV2Mgr.DeleteHistoryBranch(&DeleteHistoryBranchRequest{
BranchToken: branchToken,
ShardID: shardID,
})
return err
}
Expand All @@ -139,6 +144,7 @@ func ReadFullPageV2Events(historyV2Mgr HistoryV2Manager, req *ReadHistoryBranchR
}
}

// GetBeginNodeID gets node id from last ancestor
func GetBeginNodeID(bi shared.HistoryBranch) int64 {
if len(bi.Ancestors) == 0 {
// root branch
Expand All @@ -147,3 +153,10 @@ func GetBeginNodeID(bi shared.HistoryBranch) int64 {
idx := len(bi.Ancestors) - 1
return *bi.Ancestors[idx].EndNodeID
}

func getShardID(shardID *int) (int, error) {
if shardID == nil {
return 0, fmt.Errorf("shardID is not set for persistence operation")
}
return *shardID, nil
}
9 changes: 5 additions & 4 deletions common/persistence/persistence-tests/historyPerfTest.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ PASS
*/
func (s *HistoryPerfSuite) TestPerf() {
treeID := s.genRandomUUIDString()

shardID := 1
//for v1
domainID := treeID

Expand Down Expand Up @@ -164,7 +164,7 @@ func (s *HistoryPerfSuite) TestPerf() {
lastID := firstIDV2 + int64(batchSize)
events := s.genRandomEvents(firstIDV2, lastID)

err := s.appendV2(brs[idx], events, 0)
err := s.appendV2(brs[idx], events, 0, shardID)

s.Nil(err)
firstIDV2 = lastID
Expand Down Expand Up @@ -237,13 +237,13 @@ func (s *HistoryPerfSuite) newHistoryBranch(treeID string) ([]byte, error) {

// persistence helper
func (s *HistoryPerfSuite) readv2(branch []byte, minID, maxID int64, pageSize int, token []byte) ([]*workflow.HistoryEvent, []byte, error) {

resp, err := s.HistoryV2Mgr.ReadHistoryBranch(&p.ReadHistoryBranchRequest{
BranchToken: branch,
MinEventID: minID,
MaxEventID: maxID,
PageSize: pageSize,
NextPageToken: token,
ShardID: common.IntPtr(s.ShardInfo.ShardID),
})
if err != nil {
return nil, nil, err
Expand All @@ -255,7 +255,7 @@ func (s *HistoryPerfSuite) readv2(branch []byte, minID, maxID int64, pageSize in
}

// persistence helper
func (s *HistoryPerfSuite) appendV2(br []byte, events []*workflow.HistoryEvent, txnID int64) error {
func (s *HistoryPerfSuite) appendV2(br []byte, events []*workflow.HistoryEvent, txnID int64, shardID int) error {

var resp *p.AppendHistoryNodesResponse
var err error
Expand All @@ -265,6 +265,7 @@ func (s *HistoryPerfSuite) appendV2(br []byte, events []*workflow.HistoryEvent,
Events: events,
TransactionID: txnID,
Encoding: common.EncodingTypeThriftRW,
ShardID: common.IntPtr(shardID),
})
if err != nil {
s.True(resp.Size > 0)
Expand Down
16 changes: 12 additions & 4 deletions common/persistence/persistence-tests/historyV2PersistenceTest.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ func (s *HistoryV2PersistenceSuite) TestReadBranchByPagination() {
MaxEventID: 10,
PageSize: 4,
NextPageToken: nil,
ShardID: common.IntPtr(s.ShardInfo.ShardID),
}
// first page
resp, err := s.HistoryV2Mgr.ReadHistoryBranch(req)
Expand Down Expand Up @@ -225,6 +226,7 @@ func (s *HistoryV2PersistenceSuite) TestReadBranchByPagination() {
MaxEventID: 21,
PageSize: 3,
NextPageToken: nil,
ShardID: common.IntPtr(s.ShardInfo.ShardID),
}
// first page
resp, err = s.HistoryV2Mgr.ReadHistoryBranch(req)
Expand Down Expand Up @@ -291,7 +293,6 @@ func (s *HistoryV2PersistenceSuite) TestReadBranchByPagination() {
//TestConcurrentlyCreateAndAppendBranches test
func (s *HistoryV2PersistenceSuite) TestConcurrentlyCreateAndAppendBranches() {
treeID := uuid.New()

wg := sync.WaitGroup{}
concurrency := 20
m := sync.Map{}
Expand Down Expand Up @@ -411,7 +412,6 @@ func (s *HistoryV2PersistenceSuite) TestConcurrentlyCreateAndAppendBranches() {
// TestConcurrentlyForkAndAppendBranches test
func (s *HistoryV2PersistenceSuite) TestConcurrentlyForkAndAppendBranches() {
treeID := uuid.New()

wg := sync.WaitGroup{}
concurrency := 10
masterBr, err := s.newHistoryBranch(treeID)
Expand Down Expand Up @@ -631,6 +631,7 @@ func (s *HistoryV2PersistenceSuite) deleteHistoryBranch(branch []byte) error {
var err error
err = s.HistoryV2Mgr.DeleteHistoryBranch(&p.DeleteHistoryBranchRequest{
BranchToken: branch,
ShardID: common.IntPtr(s.ShardInfo.ShardID),
})
return err
}
Expand All @@ -642,14 +643,16 @@ func (s *HistoryV2PersistenceSuite) deleteHistoryBranch(branch []byte) error {
func (s *HistoryV2PersistenceSuite) descTreeByToken(br []byte) []*workflow.HistoryBranch {
resp, err := s.HistoryV2Mgr.GetHistoryTree(&p.GetHistoryTreeRequest{
BranchToken: br,
ShardID: common.IntPtr(s.ShardInfo.ShardID),
})
s.Nil(err)
return resp.Branches
}

func (s *HistoryV2PersistenceSuite) descTree(treeID string) []*workflow.HistoryBranch {
resp, err := s.HistoryV2Mgr.GetHistoryTree(&p.GetHistoryTreeRequest{
TreeID: treeID,
TreeID: treeID,
ShardID: common.IntPtr(s.ShardInfo.ShardID),
})
s.Nil(err)
s.True(len(resp.ForkingInProgressBranches) == 0)
Expand All @@ -659,7 +662,8 @@ func (s *HistoryV2PersistenceSuite) descTree(treeID string) []*workflow.HistoryB
// persistence helper
func (s *HistoryV2PersistenceSuite) descInProgress(treeID string) {
resp, err := s.HistoryV2Mgr.GetHistoryTree(&p.GetHistoryTreeRequest{
TreeID: treeID,
TreeID: treeID,
ShardID: common.IntPtr(s.ShardInfo.ShardID),
})
s.Nil(err)
s.True(len(resp.ForkingInProgressBranches) > 0)
Expand Down Expand Up @@ -688,6 +692,7 @@ func (s *HistoryV2PersistenceSuite) readWithError(branch []byte, minID, maxID in
MaxEventID: maxID,
PageSize: randPageSize,
NextPageToken: token,
ShardID: common.IntPtr(s.ShardInfo.ShardID),
})
if err != nil {
return nil, err
Expand Down Expand Up @@ -737,6 +742,7 @@ func (s *HistoryV2PersistenceSuite) append(branch []byte, events []*workflow.His
Events: events,
TransactionID: txnID,
Encoding: pickRandomEncoding(),
ShardID: common.IntPtr(s.ShardInfo.ShardID),
})
return err
}
Expand All @@ -761,6 +767,7 @@ func (s *HistoryV2PersistenceSuite) fork(forkBranch []byte, forkNodeID int64) ([
ForkBranchToken: forkBranch,
ForkNodeID: forkNodeID,
Info: testForkRunID,
ShardID: common.IntPtr(s.ShardInfo.ShardID),
})
if resp != nil {
bi = resp.NewBranchToken
Expand All @@ -777,6 +784,7 @@ func (s *HistoryV2PersistenceSuite) completeFork(forkBranch []byte, succ bool) {
err := s.HistoryV2Mgr.CompleteForkBranch(&p.CompleteForkBranchRequest{
BranchToken: forkBranch,
Success: succ,
ShardID: common.IntPtr(s.ShardInfo.ShardID),
})
s.Nil(err)
}
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func (s *TestBase) Config() config.Persistence {
// Setup sets up the test base, must be called as part of SetupSuite
func (s *TestBase) Setup() {
var err error
shardID := 0
shardID := 10
clusterName := s.ClusterMetadata.GetCurrentClusterName()
log := bark.NewLoggerFromLogrus(log.New())

Expand Down
Loading

0 comments on commit 623b921

Please sign in to comment.