Skip to content

Commit

Permalink
Add Scan API to HistoryV2 to get all history tree branches (cadence-w…
Browse files Browse the repository at this point in the history
  • Loading branch information
longquanzheng authored Aug 27, 2019
1 parent 4023da4 commit a55628f
Show file tree
Hide file tree
Showing 10 changed files with 187 additions and 6 deletions.
3 changes: 3 additions & 0 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,8 @@ const (
PersistenceCompleteForkBranchScope
// PersistenceGetHistoryTreeScope tracks GetHistoryTree calls made by service to persistence layer
PersistenceGetHistoryTreeScope
// PersistenceGetAllHistoryTreeBranchesScope tracks GetHistoryTree calls made by service to persistence layer
PersistenceGetAllHistoryTreeBranchesScope

// ClusterMetadataArchivalConfigScope tracks ArchivalConfig calls to ClusterMetadata
ClusterMetadataArchivalConfigScope
Expand Down Expand Up @@ -932,6 +934,7 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{
PersistenceDeleteHistoryBranchScope: {operation: "DeleteHistoryBranch"},
PersistenceCompleteForkBranchScope: {operation: "CompleteForkBranch"},
PersistenceGetHistoryTreeScope: {operation: "GetHistoryTree"},
PersistenceGetAllHistoryTreeBranchesScope: {operation: "GetAllHistoryTreeBranches"},

ClusterMetadataArchivalConfigScope: {operation: "ArchivalConfig"},

Expand Down
19 changes: 19 additions & 0 deletions common/mocks/HistoryV2Manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,25 @@ func (_m *HistoryV2Manager) GetHistoryTree(request *persistence.GetHistoryTreeRe
return r0, r1
}

func (_m *HistoryV2Manager) GetAllHistoryTreeBranches(request *persistence.GetAllHistoryTreeBranchesRequest) (*persistence.GetAllHistoryTreeBranchesResponse, error) {
ret := _m.Called(request)
var r0 *persistence.GetAllHistoryTreeBranchesResponse
if rf, ok := ret.Get(0).(func(*persistence.GetAllHistoryTreeBranchesRequest) *persistence.GetAllHistoryTreeBranchesResponse); ok {
r0 = rf(request)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*persistence.GetAllHistoryTreeBranchesResponse)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(*persistence.GetAllHistoryTreeBranchesRequest) error); ok {
r1 = rf(request)
} else {
r1 = ret.Error(1)
}
return r0, r1
}

// Close provides a mock function with given fields:
func (_m *HistoryV2Manager) Close() {
_m.Called()
Expand Down
48 changes: 46 additions & 2 deletions common/persistence/cassandra/cassandraHistoryV2Persistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ const (
v2templateDeleteBranch = `DELETE FROM history_tree WHERE tree_id = ? AND branch_id = ? `

v2templateUpdateBranch = `UPDATE history_tree set in_progress = ? WHERE tree_id = ? AND branch_id = ? `

v2templateScanAllTreeBranches = `SELECT tree_id, branch_id, fork_time, info FROM history_tree `
)

type (
Expand Down Expand Up @@ -398,6 +400,47 @@ func (h *cassandraHistoryV2Persistence) deleteBranchRangeNodes(batch *gocql.Batc
beginNodeID)
}

func (h *cassandraHistoryV2Persistence) GetAllHistoryTreeBranches(request *p.GetAllHistoryTreeBranchesRequest) (*p.GetAllHistoryTreeBranchesResponse, error) {
query := h.session.Query(v2templateScanAllTreeBranches)

iter := query.PageSize(int(request.PageSize)).PageState(request.NextPageToken).Iter()
if iter == nil {
return nil, &workflow.InternalServiceError{
Message: "GetAllHistoryTreeBranches operation failed. Not able to create query iterator.",
}
}
pagingToken := iter.PageState()

branches := make([]p.HistoryBranchDetail, 0, int(request.PageSize))
treeUUID := gocql.UUID{}
branchUUID := gocql.UUID{}
forkTime := time.Time{}
info := ""

for iter.Scan(&treeUUID, &branchUUID, &forkTime, &info) {
branchDetail := p.HistoryBranchDetail{
TreeID: treeUUID.String(),
BranchID: branchUUID.String(),
ForkTime: forkTime,
Info: info,
}
branches = append(branches, branchDetail)
}

if err := iter.Close(); err != nil {
return nil, &workflow.InternalServiceError{
Message: fmt.Sprintf("GetAllHistoryTreeBranches. Close operation failed. Error: %v", err),
}
}

response := &p.GetAllHistoryTreeBranchesResponse{
Branches: branches,
NextPageToken: pagingToken,
}

return response, nil
}

// GetHistoryTree returns all branch information of a tree
func (h *cassandraHistoryV2Persistence) GetHistoryTree(request *p.GetHistoryTreeRequest) (*p.GetHistoryTreeResponse, error) {
treeID := request.TreeID
Expand All @@ -406,7 +449,7 @@ func (h *cassandraHistoryV2Persistence) GetHistoryTree(request *p.GetHistoryTree

pagingToken := []byte{}
branches := make([]*workflow.HistoryBranch, 0)
forkingBranches := make([]p.ForkingInProgressBranch, 0)
forkingBranches := make([]p.HistoryBranchDetail, 0)

var iter *gocql.Iter
for {
Expand All @@ -426,7 +469,8 @@ func (h *cassandraHistoryV2Persistence) GetHistoryTree(request *p.GetHistoryTree

for iter.Scan(&branchUUID, &ancsResult, &forkingInProgress, &forkTime, &info) {
if forkingInProgress {
br := p.ForkingInProgressBranch{
br := p.HistoryBranchDetail{
TreeID: treeID,
BranchID: branchUUID.String(),
ForkTime: forkTime,
Info: info,
Expand Down
23 changes: 21 additions & 2 deletions common/persistence/dataInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -1345,7 +1345,8 @@ type (
}

// ForkingInProgressBranch is part of GetHistoryTreeResponse
ForkingInProgressBranch struct {
HistoryBranchDetail struct {
TreeID string
BranchID string
ForkTime time.Time
Info string
Expand All @@ -1355,7 +1356,23 @@ type (
GetHistoryTreeResponse struct {
// all branches of a tree
Branches []*workflow.HistoryBranch
ForkingInProgressBranches []ForkingInProgressBranch
ForkingInProgressBranches []HistoryBranchDetail
}

// GetAllHistoryTreeBranchesRequest is a request of GetAllHistoryTreeBranches
GetAllHistoryTreeBranchesRequest struct {
// pagination token
NextPageToken []byte
// maximum number of branches returned per page
PageSize int
}

// GetAllHistoryTreeBranchesResponse is a response to GetAllHistoryTreeBranches
GetAllHistoryTreeBranchesResponse struct {
// pagination token
NextPageToken []byte
// all branches of all trees
Branches []HistoryBranchDetail
}

// AppendHistoryEventsResponse is response for AppendHistoryEventsRequest
Expand Down Expand Up @@ -1478,6 +1495,8 @@ type (
DeleteHistoryBranch(request *DeleteHistoryBranchRequest) error
// GetHistoryTree returns all branch information of a tree
GetHistoryTree(request *GetHistoryTreeRequest) (*GetHistoryTreeResponse, error)
// GetAllHistoryTreeBranches returns all branches of all trees
GetAllHistoryTreeBranches(request *GetAllHistoryTreeBranchesRequest) (*GetAllHistoryTreeBranchesResponse, error)
}

// MetadataManager is used to manage metadata CRUD for domain entities
Expand Down
4 changes: 4 additions & 0 deletions common/persistence/historyV2Store.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,10 @@ func (m *historyV2ManagerImpl) ReadHistoryBranch(request *ReadHistoryBranchReque
return resp, nil
}

func (m *historyV2ManagerImpl) GetAllHistoryTreeBranches(request *GetAllHistoryTreeBranchesRequest) (*GetAllHistoryTreeBranchesResponse, error) {
return m.persistence.GetAllHistoryTreeBranches(request)
}

func (m *historyV2ManagerImpl) readHistoryBranch(byBatch bool, request *ReadHistoryBranchRequest) ([]*workflow.HistoryEvent, []*workflow.History, []byte, int, int64, error) {
var branch workflow.HistoryBranch
err := m.thriftEncoder.Decode(request.BranchToken, &branch)
Expand Down
64 changes: 64 additions & 0 deletions common/persistence/persistence-tests/historyV2PersistenceTest.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,62 @@ func (s *HistoryV2PersistenceSuite) TestGenUUIDs() {
s.Equal(concurrency, cnt)
}

//TestReadBranchByPagination test
func (s *HistoryV2PersistenceSuite) TestScanAllTrees() {
//TODO https://github.com/uber/cadence/issues/2458
if s.HistoryV2Mgr.GetName() != "cassandra" {
return
}

resp, err := s.HistoryV2Mgr.GetAllHistoryTreeBranches(&p.GetAllHistoryTreeBranchesRequest{
PageSize: 1,
})
s.Nil(err)
s.Equal(0, len(resp.Branches), "some trees were leaked in other tests")

trees := map[string]bool{}
totalTrees := 1002
pgSize := 100

for i := 0; i < totalTrees; i++ {
treeID := uuid.New()
bi, err := s.newHistoryBranch(treeID)
s.Nil(err)

events := s.genRandomEvents([]int64{1, 2, 3}, 1)
err = s.appendNewBranchAndFirstNode(bi, events, 1, "branchInfo")
s.Nil(err)
trees[treeID] = true
}

var pgToken []byte
for {
resp, err := s.HistoryV2Mgr.GetAllHistoryTreeBranches(&p.GetAllHistoryTreeBranchesRequest{
PageSize: pgSize,
NextPageToken: pgToken,
})
s.Nil(err)
for _, br := range resp.Branches {
if trees[br.TreeID] == true {
delete(trees, br.TreeID)

s.True(br.ForkTime.UnixNano() > 0)
s.True(len(br.BranchID) > 0)
s.Equal("branchInfo", br.Info)
} else {
s.Fail("treeID not found", br.TreeID)
}
}

if resp.NextPageToken == nil {
break
}
pgToken = resp.NextPageToken
}

s.Equal(0, len(trees))
}

//TestReadBranchByPagination test
func (s *HistoryV2PersistenceSuite) TestReadBranchByPagination() {
treeID := uuid.New()
Expand Down Expand Up @@ -192,6 +248,7 @@ func (s *HistoryV2PersistenceSuite) TestReadBranchByPagination() {
// fork from here
bi2, err := s.fork(bi, 13)
s.Nil(err)
s.completeFork(bi2, true)

events = s.genRandomEvents([]int64{13}, 1)
err = s.appendNewNode(bi2, events, 1)
Expand Down Expand Up @@ -284,6 +341,13 @@ func (s *HistoryV2PersistenceSuite) TestReadBranchByPagination() {
req.NextPageToken = nil
resp, err = s.HistoryV2Mgr.ReadHistoryBranch(req)
s.IsType(&gen.EntityNotExistsError{}, err)

err = s.deleteHistoryBranch(bi2)
s.Nil(err)
err = s.deleteHistoryBranch(bi)
s.Nil(err)
branches := s.descTree(treeID)
s.Equal(0, len(branches))
}

//TestConcurrentlyCreateAndAppendBranches test
Expand Down
2 changes: 2 additions & 0 deletions common/persistence/persistenceInterface.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ type (
CompleteForkBranch(request *InternalCompleteForkBranchRequest) error
// GetHistoryTree returns all branch information of a tree
GetHistoryTree(request *GetHistoryTreeRequest) (*GetHistoryTreeResponse, error)
// GetAllHistoryTreeBranches returns all branches of all trees
GetAllHistoryTreeBranches(request *GetAllHistoryTreeBranchesRequest) (*GetAllHistoryTreeBranchesResponse, error)
}

// VisibilityStore is the store interface for visibility
Expand Down
11 changes: 11 additions & 0 deletions common/persistence/persistenceMetricClients.go
Original file line number Diff line number Diff line change
Expand Up @@ -1126,6 +1126,17 @@ func (p *historyV2PersistenceClient) CompleteForkBranch(request *CompleteForkBra
return err
}

func (p *historyV2PersistenceClient) GetAllHistoryTreeBranches(request *GetAllHistoryTreeBranchesRequest) (*GetAllHistoryTreeBranchesResponse, error) {
p.metricClient.IncCounter(metrics.PersistenceGetAllHistoryTreeBranchesScope, metrics.PersistenceRequests)
sw := p.metricClient.StartTimer(metrics.PersistenceGetAllHistoryTreeBranchesScope, metrics.PersistenceLatency)
response, err := p.persistence.GetAllHistoryTreeBranches(request)
sw.Stop()
if err != nil {
p.updateErrorMetric(metrics.PersistenceGetAllHistoryTreeBranchesScope, err)
}
return response, err
}

// GetHistoryTree returns all branch information of a tree
func (p *historyV2PersistenceClient) GetHistoryTree(request *GetHistoryTreeRequest) (*GetHistoryTreeResponse, error) {
p.metricClient.IncCounter(metrics.PersistenceGetHistoryTreeScope, metrics.PersistenceRequests)
Expand Down
8 changes: 8 additions & 0 deletions common/persistence/persistenceRateLimitedClients.go
Original file line number Diff line number Diff line change
Expand Up @@ -732,3 +732,11 @@ func (p *historyV2RateLimitedPersistenceClient) GetHistoryTree(request *GetHisto
response, err := p.persistence.GetHistoryTree(request)
return response, err
}

func (p *historyV2RateLimitedPersistenceClient) GetAllHistoryTreeBranches(request *GetAllHistoryTreeBranchesRequest) (*GetAllHistoryTreeBranchesResponse, error) {
if ok := p.rateLimiter.Allow(); !ok {
return nil, ErrPersistenceLimitExceeded
}
response, err := p.persistence.GetAllHistoryTreeBranches(request)
return response, err
}
11 changes: 9 additions & 2 deletions common/persistence/sql/sqlHistoryV2Manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -477,11 +477,17 @@ func (m *sqlHistoryV2Manager) CompleteForkBranch(request *p.InternalCompleteFork
})
}

func (h *sqlHistoryV2Manager) GetAllHistoryTreeBranches(request *p.GetAllHistoryTreeBranchesRequest) (*p.GetAllHistoryTreeBranchesResponse, error) {
// TODO https://github.com/uber/cadence/issues/2458
// Implement it when we need
panic("not implemented yet")
}

// GetHistoryTree returns all branch information of a tree
func (m *sqlHistoryV2Manager) GetHistoryTree(request *p.GetHistoryTreeRequest) (*p.GetHistoryTreeResponse, error) {
treeID := sqldb.MustParseUUID(request.TreeID)
branches := make([]*shared.HistoryBranch, 0)
forkingBranches := make([]p.ForkingInProgressBranch, 0)
forkingBranches := make([]p.HistoryBranchDetail, 0)

treeFilter := &sqldb.HistoryTreeFilter{
TreeID: treeID,
Expand All @@ -497,7 +503,8 @@ func (m *sqlHistoryV2Manager) GetHistoryTree(request *p.GetHistoryTreeRequest) (
return nil, err
}
if row.InProgress {
br := p.ForkingInProgressBranch{
br := p.HistoryBranchDetail{
TreeID: request.TreeID,
BranchID: row.BranchID.String(),
ForkTime: time.Unix(0, treeInfo.GetCreatedTimeNanos()),
Info: treeInfo.GetInfo(),
Expand Down

0 comments on commit a55628f

Please sign in to comment.