From a55628f2941e1c04dd62e2fee55d569d36b7581d Mon Sep 17 00:00:00 2001 From: Quanzheng Long Date: Tue, 27 Aug 2019 16:13:20 -0700 Subject: [PATCH] Add Scan API to HistoryV2 to get all history tree branches (#2459) --- common/metrics/defs.go | 3 + common/mocks/HistoryV2Manager.go | 19 ++++++ .../cassandraHistoryV2Persistence.go | 48 +++++++++++++- common/persistence/dataInterfaces.go | 23 ++++++- common/persistence/historyV2Store.go | 4 ++ .../historyV2PersistenceTest.go | 64 +++++++++++++++++++ common/persistence/persistenceInterface.go | 2 + .../persistence/persistenceMetricClients.go | 11 ++++ .../persistenceRateLimitedClients.go | 8 +++ common/persistence/sql/sqlHistoryV2Manager.go | 11 +++- 10 files changed, 187 insertions(+), 6 deletions(-) diff --git a/common/metrics/defs.go b/common/metrics/defs.go index 748495b7389..56e776c59d3 100644 --- a/common/metrics/defs.go +++ b/common/metrics/defs.go @@ -478,6 +478,8 @@ const ( PersistenceCompleteForkBranchScope // PersistenceGetHistoryTreeScope tracks GetHistoryTree calls made by service to persistence layer PersistenceGetHistoryTreeScope + // PersistenceGetAllHistoryTreeBranchesScope tracks GetHistoryTree calls made by service to persistence layer + PersistenceGetAllHistoryTreeBranchesScope // ClusterMetadataArchivalConfigScope tracks ArchivalConfig calls to ClusterMetadata ClusterMetadataArchivalConfigScope @@ -932,6 +934,7 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{ PersistenceDeleteHistoryBranchScope: {operation: "DeleteHistoryBranch"}, PersistenceCompleteForkBranchScope: {operation: "CompleteForkBranch"}, PersistenceGetHistoryTreeScope: {operation: "GetHistoryTree"}, + PersistenceGetAllHistoryTreeBranchesScope: {operation: "GetAllHistoryTreeBranches"}, ClusterMetadataArchivalConfigScope: {operation: "ArchivalConfig"}, diff --git a/common/mocks/HistoryV2Manager.go b/common/mocks/HistoryV2Manager.go index 6e5bef8e6f4..a3490a5b96c 100644 --- a/common/mocks/HistoryV2Manager.go +++ b/common/mocks/HistoryV2Manager.go @@ -167,6 +167,25 @@ func (_m *HistoryV2Manager) GetHistoryTree(request *persistence.GetHistoryTreeRe return r0, r1 } +func (_m *HistoryV2Manager) GetAllHistoryTreeBranches(request *persistence.GetAllHistoryTreeBranchesRequest) (*persistence.GetAllHistoryTreeBranchesResponse, error) { + ret := _m.Called(request) + var r0 *persistence.GetAllHistoryTreeBranchesResponse + if rf, ok := ret.Get(0).(func(*persistence.GetAllHistoryTreeBranchesRequest) *persistence.GetAllHistoryTreeBranchesResponse); ok { + r0 = rf(request) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*persistence.GetAllHistoryTreeBranchesResponse) + } + } + var r1 error + if rf, ok := ret.Get(1).(func(*persistence.GetAllHistoryTreeBranchesRequest) error); ok { + r1 = rf(request) + } else { + r1 = ret.Error(1) + } + return r0, r1 +} + // Close provides a mock function with given fields: func (_m *HistoryV2Manager) Close() { _m.Called() diff --git a/common/persistence/cassandra/cassandraHistoryV2Persistence.go b/common/persistence/cassandra/cassandraHistoryV2Persistence.go index 0ae6145f2fa..275d7c7d0b7 100644 --- a/common/persistence/cassandra/cassandraHistoryV2Persistence.go +++ b/common/persistence/cassandra/cassandraHistoryV2Persistence.go @@ -54,6 +54,8 @@ const ( v2templateDeleteBranch = `DELETE FROM history_tree WHERE tree_id = ? AND branch_id = ? ` v2templateUpdateBranch = `UPDATE history_tree set in_progress = ? WHERE tree_id = ? AND branch_id = ? ` + + v2templateScanAllTreeBranches = `SELECT tree_id, branch_id, fork_time, info FROM history_tree ` ) type ( @@ -398,6 +400,47 @@ func (h *cassandraHistoryV2Persistence) deleteBranchRangeNodes(batch *gocql.Batc beginNodeID) } +func (h *cassandraHistoryV2Persistence) GetAllHistoryTreeBranches(request *p.GetAllHistoryTreeBranchesRequest) (*p.GetAllHistoryTreeBranchesResponse, error) { + query := h.session.Query(v2templateScanAllTreeBranches) + + iter := query.PageSize(int(request.PageSize)).PageState(request.NextPageToken).Iter() + if iter == nil { + return nil, &workflow.InternalServiceError{ + Message: "GetAllHistoryTreeBranches operation failed. Not able to create query iterator.", + } + } + pagingToken := iter.PageState() + + branches := make([]p.HistoryBranchDetail, 0, int(request.PageSize)) + treeUUID := gocql.UUID{} + branchUUID := gocql.UUID{} + forkTime := time.Time{} + info := "" + + for iter.Scan(&treeUUID, &branchUUID, &forkTime, &info) { + branchDetail := p.HistoryBranchDetail{ + TreeID: treeUUID.String(), + BranchID: branchUUID.String(), + ForkTime: forkTime, + Info: info, + } + branches = append(branches, branchDetail) + } + + if err := iter.Close(); err != nil { + return nil, &workflow.InternalServiceError{ + Message: fmt.Sprintf("GetAllHistoryTreeBranches. Close operation failed. Error: %v", err), + } + } + + response := &p.GetAllHistoryTreeBranchesResponse{ + Branches: branches, + NextPageToken: pagingToken, + } + + return response, nil +} + // GetHistoryTree returns all branch information of a tree func (h *cassandraHistoryV2Persistence) GetHistoryTree(request *p.GetHistoryTreeRequest) (*p.GetHistoryTreeResponse, error) { treeID := request.TreeID @@ -406,7 +449,7 @@ func (h *cassandraHistoryV2Persistence) GetHistoryTree(request *p.GetHistoryTree pagingToken := []byte{} branches := make([]*workflow.HistoryBranch, 0) - forkingBranches := make([]p.ForkingInProgressBranch, 0) + forkingBranches := make([]p.HistoryBranchDetail, 0) var iter *gocql.Iter for { @@ -426,7 +469,8 @@ func (h *cassandraHistoryV2Persistence) GetHistoryTree(request *p.GetHistoryTree for iter.Scan(&branchUUID, &ancsResult, &forkingInProgress, &forkTime, &info) { if forkingInProgress { - br := p.ForkingInProgressBranch{ + br := p.HistoryBranchDetail{ + TreeID: treeID, BranchID: branchUUID.String(), ForkTime: forkTime, Info: info, diff --git a/common/persistence/dataInterfaces.go b/common/persistence/dataInterfaces.go index f0c9c109db2..f9fb4a08b38 100644 --- a/common/persistence/dataInterfaces.go +++ b/common/persistence/dataInterfaces.go @@ -1345,7 +1345,8 @@ type ( } // ForkingInProgressBranch is part of GetHistoryTreeResponse - ForkingInProgressBranch struct { + HistoryBranchDetail struct { + TreeID string BranchID string ForkTime time.Time Info string @@ -1355,7 +1356,23 @@ type ( GetHistoryTreeResponse struct { // all branches of a tree Branches []*workflow.HistoryBranch - ForkingInProgressBranches []ForkingInProgressBranch + ForkingInProgressBranches []HistoryBranchDetail + } + + // GetAllHistoryTreeBranchesRequest is a request of GetAllHistoryTreeBranches + GetAllHistoryTreeBranchesRequest struct { + // pagination token + NextPageToken []byte + // maximum number of branches returned per page + PageSize int + } + + // GetAllHistoryTreeBranchesResponse is a response to GetAllHistoryTreeBranches + GetAllHistoryTreeBranchesResponse struct { + // pagination token + NextPageToken []byte + // all branches of all trees + Branches []HistoryBranchDetail } // AppendHistoryEventsResponse is response for AppendHistoryEventsRequest @@ -1478,6 +1495,8 @@ type ( DeleteHistoryBranch(request *DeleteHistoryBranchRequest) error // GetHistoryTree returns all branch information of a tree GetHistoryTree(request *GetHistoryTreeRequest) (*GetHistoryTreeResponse, error) + // GetAllHistoryTreeBranches returns all branches of all trees + GetAllHistoryTreeBranches(request *GetAllHistoryTreeBranchesRequest) (*GetAllHistoryTreeBranchesResponse, error) } // MetadataManager is used to manage metadata CRUD for domain entities diff --git a/common/persistence/historyV2Store.go b/common/persistence/historyV2Store.go index 38236c0d173..a202ab1c429 100644 --- a/common/persistence/historyV2Store.go +++ b/common/persistence/historyV2Store.go @@ -257,6 +257,10 @@ func (m *historyV2ManagerImpl) ReadHistoryBranch(request *ReadHistoryBranchReque return resp, nil } +func (m *historyV2ManagerImpl) GetAllHistoryTreeBranches(request *GetAllHistoryTreeBranchesRequest) (*GetAllHistoryTreeBranchesResponse, error) { + return m.persistence.GetAllHistoryTreeBranches(request) +} + func (m *historyV2ManagerImpl) readHistoryBranch(byBatch bool, request *ReadHistoryBranchRequest) ([]*workflow.HistoryEvent, []*workflow.History, []byte, int, int64, error) { var branch workflow.HistoryBranch err := m.thriftEncoder.Decode(request.BranchToken, &branch) diff --git a/common/persistence/persistence-tests/historyV2PersistenceTest.go b/common/persistence/persistence-tests/historyV2PersistenceTest.go index 1110ecaa065..531833a50e5 100644 --- a/common/persistence/persistence-tests/historyV2PersistenceTest.go +++ b/common/persistence/persistence-tests/historyV2PersistenceTest.go @@ -113,6 +113,62 @@ func (s *HistoryV2PersistenceSuite) TestGenUUIDs() { s.Equal(concurrency, cnt) } +//TestReadBranchByPagination test +func (s *HistoryV2PersistenceSuite) TestScanAllTrees() { + //TODO https://github.com/uber/cadence/issues/2458 + if s.HistoryV2Mgr.GetName() != "cassandra" { + return + } + + resp, err := s.HistoryV2Mgr.GetAllHistoryTreeBranches(&p.GetAllHistoryTreeBranchesRequest{ + PageSize: 1, + }) + s.Nil(err) + s.Equal(0, len(resp.Branches), "some trees were leaked in other tests") + + trees := map[string]bool{} + totalTrees := 1002 + pgSize := 100 + + for i := 0; i < totalTrees; i++ { + treeID := uuid.New() + bi, err := s.newHistoryBranch(treeID) + s.Nil(err) + + events := s.genRandomEvents([]int64{1, 2, 3}, 1) + err = s.appendNewBranchAndFirstNode(bi, events, 1, "branchInfo") + s.Nil(err) + trees[treeID] = true + } + + var pgToken []byte + for { + resp, err := s.HistoryV2Mgr.GetAllHistoryTreeBranches(&p.GetAllHistoryTreeBranchesRequest{ + PageSize: pgSize, + NextPageToken: pgToken, + }) + s.Nil(err) + for _, br := range resp.Branches { + if trees[br.TreeID] == true { + delete(trees, br.TreeID) + + s.True(br.ForkTime.UnixNano() > 0) + s.True(len(br.BranchID) > 0) + s.Equal("branchInfo", br.Info) + } else { + s.Fail("treeID not found", br.TreeID) + } + } + + if resp.NextPageToken == nil { + break + } + pgToken = resp.NextPageToken + } + + s.Equal(0, len(trees)) +} + //TestReadBranchByPagination test func (s *HistoryV2PersistenceSuite) TestReadBranchByPagination() { treeID := uuid.New() @@ -192,6 +248,7 @@ func (s *HistoryV2PersistenceSuite) TestReadBranchByPagination() { // fork from here bi2, err := s.fork(bi, 13) s.Nil(err) + s.completeFork(bi2, true) events = s.genRandomEvents([]int64{13}, 1) err = s.appendNewNode(bi2, events, 1) @@ -284,6 +341,13 @@ func (s *HistoryV2PersistenceSuite) TestReadBranchByPagination() { req.NextPageToken = nil resp, err = s.HistoryV2Mgr.ReadHistoryBranch(req) s.IsType(&gen.EntityNotExistsError{}, err) + + err = s.deleteHistoryBranch(bi2) + s.Nil(err) + err = s.deleteHistoryBranch(bi) + s.Nil(err) + branches := s.descTree(treeID) + s.Equal(0, len(branches)) } //TestConcurrentlyCreateAndAppendBranches test diff --git a/common/persistence/persistenceInterface.go b/common/persistence/persistenceInterface.go index 4f84347ead7..8a50c3a6f08 100644 --- a/common/persistence/persistenceInterface.go +++ b/common/persistence/persistenceInterface.go @@ -119,6 +119,8 @@ type ( CompleteForkBranch(request *InternalCompleteForkBranchRequest) error // GetHistoryTree returns all branch information of a tree GetHistoryTree(request *GetHistoryTreeRequest) (*GetHistoryTreeResponse, error) + // GetAllHistoryTreeBranches returns all branches of all trees + GetAllHistoryTreeBranches(request *GetAllHistoryTreeBranchesRequest) (*GetAllHistoryTreeBranchesResponse, error) } // VisibilityStore is the store interface for visibility diff --git a/common/persistence/persistenceMetricClients.go b/common/persistence/persistenceMetricClients.go index f12644e9f52..b6eea82d0f6 100644 --- a/common/persistence/persistenceMetricClients.go +++ b/common/persistence/persistenceMetricClients.go @@ -1126,6 +1126,17 @@ func (p *historyV2PersistenceClient) CompleteForkBranch(request *CompleteForkBra return err } +func (p *historyV2PersistenceClient) GetAllHistoryTreeBranches(request *GetAllHistoryTreeBranchesRequest) (*GetAllHistoryTreeBranchesResponse, error) { + p.metricClient.IncCounter(metrics.PersistenceGetAllHistoryTreeBranchesScope, metrics.PersistenceRequests) + sw := p.metricClient.StartTimer(metrics.PersistenceGetAllHistoryTreeBranchesScope, metrics.PersistenceLatency) + response, err := p.persistence.GetAllHistoryTreeBranches(request) + sw.Stop() + if err != nil { + p.updateErrorMetric(metrics.PersistenceGetAllHistoryTreeBranchesScope, err) + } + return response, err +} + // GetHistoryTree returns all branch information of a tree func (p *historyV2PersistenceClient) GetHistoryTree(request *GetHistoryTreeRequest) (*GetHistoryTreeResponse, error) { p.metricClient.IncCounter(metrics.PersistenceGetHistoryTreeScope, metrics.PersistenceRequests) diff --git a/common/persistence/persistenceRateLimitedClients.go b/common/persistence/persistenceRateLimitedClients.go index f954692e906..94af51cfb76 100644 --- a/common/persistence/persistenceRateLimitedClients.go +++ b/common/persistence/persistenceRateLimitedClients.go @@ -732,3 +732,11 @@ func (p *historyV2RateLimitedPersistenceClient) GetHistoryTree(request *GetHisto response, err := p.persistence.GetHistoryTree(request) return response, err } + +func (p *historyV2RateLimitedPersistenceClient) GetAllHistoryTreeBranches(request *GetAllHistoryTreeBranchesRequest) (*GetAllHistoryTreeBranchesResponse, error) { + if ok := p.rateLimiter.Allow(); !ok { + return nil, ErrPersistenceLimitExceeded + } + response, err := p.persistence.GetAllHistoryTreeBranches(request) + return response, err +} diff --git a/common/persistence/sql/sqlHistoryV2Manager.go b/common/persistence/sql/sqlHistoryV2Manager.go index f70e1004df9..e05de7f931b 100644 --- a/common/persistence/sql/sqlHistoryV2Manager.go +++ b/common/persistence/sql/sqlHistoryV2Manager.go @@ -477,11 +477,17 @@ func (m *sqlHistoryV2Manager) CompleteForkBranch(request *p.InternalCompleteFork }) } +func (h *sqlHistoryV2Manager) GetAllHistoryTreeBranches(request *p.GetAllHistoryTreeBranchesRequest) (*p.GetAllHistoryTreeBranchesResponse, error) { + // TODO https://github.com/uber/cadence/issues/2458 + // Implement it when we need + panic("not implemented yet") +} + // GetHistoryTree returns all branch information of a tree func (m *sqlHistoryV2Manager) GetHistoryTree(request *p.GetHistoryTreeRequest) (*p.GetHistoryTreeResponse, error) { treeID := sqldb.MustParseUUID(request.TreeID) branches := make([]*shared.HistoryBranch, 0) - forkingBranches := make([]p.ForkingInProgressBranch, 0) + forkingBranches := make([]p.HistoryBranchDetail, 0) treeFilter := &sqldb.HistoryTreeFilter{ TreeID: treeID, @@ -497,7 +503,8 @@ func (m *sqlHistoryV2Manager) GetHistoryTree(request *p.GetHistoryTreeRequest) ( return nil, err } if row.InProgress { - br := p.ForkingInProgressBranch{ + br := p.HistoryBranchDetail{ + TreeID: request.TreeID, BranchID: row.BranchID.String(), ForkTime: time.Unix(0, treeInfo.GetCreatedTimeNanos()), Info: treeInfo.GetInfo(),