Skip to content

Commit

Permalink
Add History V2: API and Cassandra implementation (cadence-workflow#1156)
Browse files Browse the repository at this point in the history
* Schema and interface

* fix fmt

* fix interfaces

* fix build

* fix build

* fix updateTask_test

* finish common/persistence/historyStore.go

* fix and add checking in common/persistence/historyStore.go

* cql templates

* stash common/persistence/cassandra/cassandraHistoryPersistence.go

* stash

* fix name for appendHistoryNodes

* fix name

* fix name

* NewBranch-stash

* append

* processCommonErrors

* ReadHistoryBranch

* ForkHistoryBranch

* GetHistoryTree

* stash

* update ForkHistoryBranch

* finish deleteBranchAndRootNode

* minor fix

* comments and small improvments

* fix build

* fix comment

* rename methods to make more sense

* fix build

* add v2 test suite

* first test for v2

* finish first test

* fix bug for delete root

* fix ReadBranch

* simplify Append API

* fix bugs

* testing deleting with creating concurrently

* add checking forking node for appendNode

* return branchInfo for append API

* fix in query

* fix readHistory for forking

* fix all concurrent tests

* fix comment

* more test for override in append

* fix race in test code

* workaround when range deletion is not supported in old Cassandra

* remove ID from dataBlob

* improve validate branch status

* improve slice allocation

* improve make slice

* add perf test for history v1/v2

* add idl for historyBranch

* events v3

* finalize schema

* sqlManager

* remove code for common/persistence/historyStore.go

* fix interface

* eventsv3

* historyV2Store

* reset cassandraHistoryPersistence

* finish common/persistence/cassandra/cassandraHistoryV2Persistence.go

* fix GetHistoryTree

* fix fork for batch

* fix getHistoryTree response type

* implement historyV2 factory

* fix bugs

* fix forking point

* draft common/persistence/persistence-tests/historyV2PersistenceTest.go

* reimplement ReadHistoryBranch

* fix all tests

* add test

* add more test

* add more test

* done perf test

* disable perf test

* fix read perf test

* add protection for read same nodeID

* disable perf test

* use same encoding for read perf test

* Remove IsNewTree

* disable perf test

* not calling NewHistoryV2Manager for sql

* fix lint
  • Loading branch information
longquanzheng authored Oct 24, 2018
1 parent 1086b6a commit ebc991d
Show file tree
Hide file tree
Showing 28 changed files with 3,574 additions and 12 deletions.
4 changes: 2 additions & 2 deletions .gen/go/shared/idl.go

Large diffs are not rendered by default.

499 changes: 499 additions & 0 deletions .gen/go/shared/types.go

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions common/logging/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,4 +142,8 @@ const (
// task list tags
TagTaskListType = "task-list-type"
TagTaskListName = "task-list-name"

// persistence tags
TagTreeID = "tree-id"
TagBranchID = "branch-id"
)
18 changes: 18 additions & 0 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,18 @@ const (
MatchingClientDescribeTaskListScope
// DomainCacheScope tracks domain cache callbacks
DomainCacheScope
// PersistenceNewHistoryBranchScope tracks NewHistoryBranch calls made by service to persistence layer
PersistenceNewHistoryBranchScope
// PersistenceAppendHistoryNodesScope tracks AppendHistoryNodes calls made by service to persistence layer
PersistenceAppendHistoryNodesScope
// PersistenceReadHistoryBranchScope tracks ReadHistoryBranch calls made by service to persistence layer
PersistenceReadHistoryBranchScope
// PersistenceForkHistoryBranchScope tracks ForkHistoryBranch calls made by service to persistence layer
PersistenceForkHistoryBranchScope
// PersistenceDeleteHistoryBranchScope tracks DeleteHistoryBranch calls made by service to persistence layer
PersistenceDeleteHistoryBranchScope
// PersistenceGetHistoryTreeScope tracks GetHistoryTree calls made by service to persistence layer
PersistenceGetHistoryTreeScope

NumCommonScopes
)
Expand Down Expand Up @@ -566,6 +578,12 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{
PersistenceListClosedWorkflowExecutionsByWorkflowIDScope: {operation: "ListClosedWorkflowExecutionsByWorkflowID"},
PersistenceListClosedWorkflowExecutionsByStatusScope: {operation: "ListClosedWorkflowExecutionsByStatus"},
PersistenceGetClosedWorkflowExecutionScope: {operation: "GetClosedWorkflowExecution"},
PersistenceNewHistoryBranchScope: {operation: "NewHistoryBranch", tags: map[string]string{ShardTagName: NoneShardsTagValue}},
PersistenceAppendHistoryNodesScope: {operation: "AppendHistoryNodes", tags: map[string]string{ShardTagName: NoneShardsTagValue}},
PersistenceReadHistoryBranchScope: {operation: "ReadHistoryBranch", tags: map[string]string{ShardTagName: NoneShardsTagValue}},
PersistenceForkHistoryBranchScope: {operation: "ForkHistoryBranch", tags: map[string]string{ShardTagName: NoneShardsTagValue}},
PersistenceDeleteHistoryBranchScope: {operation: "DeleteHistoryBranch", tags: map[string]string{ShardTagName: NoneShardsTagValue}},
PersistenceGetHistoryTreeScope: {operation: "GetHistoryTree", tags: map[string]string{ShardTagName: NoneShardsTagValue}},

HistoryClientStartWorkflowExecutionScope: {operation: "HistoryClientStartWorkflowExecution", tags: map[string]string{CadenceRoleTagName: HistoryRoleTagValue}},
HistoryClientRecordActivityTaskHeartbeatScope: {operation: "HistoryClientRecordActivityTaskHeartbeat", tags: map[string]string{CadenceRoleTagName: HistoryRoleTagValue}},
Expand Down
164 changes: 164 additions & 0 deletions common/mocks/HistoryV2Manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
// Copyright (c) 2017 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package mocks

import "github.com/uber/cadence/common/persistence"
import "github.com/stretchr/testify/mock"

type HistoryV2Manager struct {
mock.Mock
}

// GetName provides a mock function with given fields:
func (_m *HistoryV2Manager) GetName() string {
ret := _m.Called()

var r0 string
if rf, ok := ret.Get(0).(func() string); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(string)
}

return r0
}

// NewHistoryBranch provides a mock function with given fields: request
func (_m *HistoryV2Manager) NewHistoryBranch(request *persistence.NewHistoryBranchRequest) (*persistence.NewHistoryBranchResponse, error) {
ret := _m.Called(request)

var r0 *persistence.NewHistoryBranchResponse
if rf, ok := ret.Get(0).(func(*persistence.NewHistoryBranchRequest) *persistence.NewHistoryBranchResponse); ok {
r0 = rf(request)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*persistence.NewHistoryBranchResponse)
}
}

var r1 error
if rf, ok := ret.Get(1).(func(*persistence.NewHistoryBranchRequest) error); ok {
r1 = rf(request)
} else {
r1 = ret.Error(1)
}
return r0, r1
}

// AppendHistoryNodes provides a mock function with given fields: request
func (_m *HistoryV2Manager) AppendHistoryNodes(request *persistence.AppendHistoryNodesRequest) (*persistence.AppendHistoryNodesResponse, error) {
ret := _m.Called(request)
var r0 *persistence.AppendHistoryNodesResponse
if rf, ok := ret.Get(0).(func(*persistence.AppendHistoryNodesRequest) *persistence.AppendHistoryNodesResponse); ok {
r0 = rf(request)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*persistence.AppendHistoryNodesResponse)
}
}

var r1 error
if rf, ok := ret.Get(1).(func(*persistence.AppendHistoryNodesRequest) error); ok {
r1 = rf(request)
} else {
r1 = ret.Error(1)
}
return r0, r1
}

// ReadHistoryBranch provides a mock function with given fields: request
func (_m *HistoryV2Manager) ReadHistoryBranch(request *persistence.ReadHistoryBranchRequest) (*persistence.ReadHistoryBranchResponse, error) {
ret := _m.Called(request)
var r0 *persistence.ReadHistoryBranchResponse
if rf, ok := ret.Get(0).(func(*persistence.ReadHistoryBranchRequest) *persistence.ReadHistoryBranchResponse); ok {
r0 = rf(request)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*persistence.ReadHistoryBranchResponse)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(*persistence.ReadHistoryBranchRequest) error); ok {
r1 = rf(request)
} else {
r1 = ret.Error(1)
}
return r0, r1
}

// ForkHistoryBranch provides a mock function with given fields: request
func (_m *HistoryV2Manager) ForkHistoryBranch(request *persistence.ForkHistoryBranchRequest) (*persistence.ForkHistoryBranchResponse, error) {
ret := _m.Called(request)
var r0 *persistence.ForkHistoryBranchResponse
if rf, ok := ret.Get(0).(func(*persistence.ForkHistoryBranchRequest) *persistence.ForkHistoryBranchResponse); ok {
r0 = rf(request)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*persistence.ForkHistoryBranchResponse)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(*persistence.ForkHistoryBranchRequest) error); ok {
r1 = rf(request)
} else {
r1 = ret.Error(1)
}
return r0, r1
}

// DeleteHistoryBranch provides a mock function with given fields: request
func (_m *HistoryV2Manager) DeleteHistoryBranch(request *persistence.DeleteHistoryBranchRequest) error {
ret := _m.Called(request)
var r0 error
if rf, ok := ret.Get(0).(func(*persistence.DeleteHistoryBranchRequest) error); ok {
r0 = rf(request)
} else {
r0 = ret.Error(0)
}
return r0
}

// GetHistoryTree provides a mock function with given fields: request
func (_m *HistoryV2Manager) GetHistoryTree(request *persistence.GetHistoryTreeRequest) (*persistence.GetHistoryTreeResponse, error) {
ret := _m.Called(request)
var r0 *persistence.GetHistoryTreeResponse
if rf, ok := ret.Get(0).(func(*persistence.GetHistoryTreeRequest) *persistence.GetHistoryTreeResponse); ok {
r0 = rf(request)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*persistence.GetHistoryTreeResponse)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(*persistence.GetHistoryTreeRequest) error); ok {
r1 = rf(request)
} else {
r1 = ret.Error(1)
}
return r0, r1
}

// Close provides a mock function with given fields:
func (_m *HistoryV2Manager) Close() {
_m.Called()
}

var _ persistence.HistoryV2Manager = (*HistoryV2Manager)(nil)
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func (h *cassandraHistoryPersistence) GetWorkflowExecutionHistory(request *p.Int
eventBatchVersion := common.EmptyVersion

eventBatch := &p.DataBlob{}
history := make([]*p.DataBlob, 0)
history := make([]*p.DataBlob, 0, request.PageSize)

for iter.Scan(nil, &eventBatchVersionPointer, &eventBatch.Data, &eventBatch.Encoding) {
found = true
Expand Down
Loading

0 comments on commit ebc991d

Please sign in to comment.