Skip to content

Commit

Permalink
[CLI] Multi execution scanner and fixer (cadence-workflow#3535)
Browse files Browse the repository at this point in the history
  • Loading branch information
mantas-sidlauskas authored Sep 22, 2020
1 parent fc3473a commit 541a12a
Show file tree
Hide file tree
Showing 37 changed files with 821 additions and 1,380 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ cadence-canary: $(ALL_SRC)
go-generate:
GO111MODULE=off go get -u github.com/myitcv/gobin
GOOS= GOARCH= gobin -mod=readonly github.com/golang/mock/mockgen
GOOS= GOARCH= gobin -mod=readonly github.com/dmarkham/enumer
@echo "running go generate ./..."
@go generate ./...

Expand Down
7 changes: 4 additions & 3 deletions cmd/tools/cli/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package main

import (
"fmt"
"os"

"github.com/uber/cadence/tools/cli"
Expand All @@ -30,8 +31,8 @@ import (
// See cadence/tools/cli/README.md for usage
func main() {
app := cli.NewCliApp()
err := app.Run(os.Args)
if err != nil {
panic(err)
if err := app.Run(os.Args); err != nil {
fmt.Fprintln(os.Stderr, err.Error())
os.Exit(1)
}
}
11 changes: 11 additions & 0 deletions common/reconciliation/entity/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,19 @@ func (CurrentExecution) Clone() Entity {
return &CurrentExecution{}
}

// GetShardID returns shard id
func (ce *ConcreteExecution) GetShardID() int {
return ce.Execution.ShardID
}

// GetShardID returns shard id
func (curre *CurrentExecution) GetShardID() int {
return curre.Execution.ShardID
}

// Entity allows to deserialize and validate different type of executions
type Entity interface {
Validate() error
Clone() Entity
GetShardID() int
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,53 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.

package iterator
package fetcher

import (
"github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/codec"
"github.com/uber/cadence/common/pagination"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/reconciliation/entity"
)

// ConcreteExecution is used to retrieve Concrete executions.
func ConcreteExecution(retryer persistence.Retryer, pageSize int) pagination.Iterator {
// ConcreteExecutionIterator is used to retrieve Concrete executions.
func ConcreteExecutionIterator(retryer persistence.Retryer, pageSize int) pagination.Iterator {
return pagination.NewIterator(nil, getConcreteExecutions(retryer, pageSize, codec.NewThriftRWEncoder()))
}

// ConcreteExecution returns a single ConcreteExecution from persistence
func ConcreteExecution(retryer persistence.Retryer, request ExecutionRequest) (entity.Entity, error) {

req := persistence.GetWorkflowExecutionRequest{
DomainID: request.DomainID,
Execution: shared.WorkflowExecution{
WorkflowId: common.StringPtr(request.WorkflowID),
RunId: common.StringPtr(request.RunID),
},
}
e, err := retryer.GetWorkflowExecution(&req)
if err != nil {
return nil, err
}

branchToken, branch, err := getBranchToken(e.State.ExecutionInfo.BranchToken, e.State.VersionHistories, codec.NewThriftRWEncoder())

return &entity.ConcreteExecution{
BranchToken: branchToken,
TreeID: branch.GetTreeID(),
BranchID: branch.GetBranchID(),
Execution: entity.Execution{
ShardID: retryer.GetShardID(),
DomainID: e.State.ExecutionInfo.DomainID,
WorkflowID: e.State.ExecutionInfo.WorkflowID,
RunID: e.State.ExecutionInfo.RunID,
State: e.State.ExecutionInfo.State,
},
}, nil
}

func getConcreteExecutions(
pr persistence.Retryer,
pageSize int,
Expand All @@ -53,14 +85,14 @@ func getConcreteExecutions(
}
executions := make([]pagination.Entity, len(resp.Executions), len(resp.Executions))
for i, e := range resp.Executions {
branchToken, treeID, branchID, err := GetBranchToken(e, encoder)
branchToken, branch, err := getBranchToken(e.ExecutionInfo.BranchToken, e.VersionHistories, encoder)
if err != nil {
return pagination.Page{}, err
}
concreteExec := &entity.ConcreteExecution{
BranchToken: branchToken,
TreeID: treeID,
BranchID: branchID,
TreeID: branch.GetTreeID(),
BranchID: branch.GetBranchID(),
Execution: entity.Execution{
ShardID: pr.GetShardID(),
DomainID: e.ExecutionInfo.DomainID,
Expand All @@ -87,22 +119,25 @@ func getConcreteExecutions(
}
}

// GetBranchToken returns the branchToken, treeID and branchID or error on failure.
func GetBranchToken(
entity *persistence.ListConcreteExecutionsEntity,
// getBranchToken returns the branchToken and historyBranch, error on failure.
func getBranchToken(
branchToken []byte,
histories *persistence.VersionHistories,
decoder *codec.ThriftRWEncoder,
) ([]byte, string, string, error) {
branchToken := entity.ExecutionInfo.BranchToken
if entity.VersionHistories != nil {
versionHistory, err := entity.VersionHistories.GetCurrentVersionHistory()
) ([]byte, shared.HistoryBranch, error) {
var branch shared.HistoryBranch
bt := branchToken
if histories != nil {
versionHistory, err := histories.GetCurrentVersionHistory()
if err != nil {
return nil, "", "", err
return nil, branch, err
}
branchToken = versionHistory.GetBranchToken()
bt = versionHistory.GetBranchToken()
}
var branch shared.HistoryBranch
if err := decoder.Decode(branchToken, &branch); err != nil {
return nil, "", "", err

if err := decoder.Decode(bt, &branch); err != nil {
return nil, branch, err
}
return branchToken, branch.GetTreeID(), branch.GetBranchID(), nil

return bt, branch, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.

package iterator
package fetcher

import (
"testing"
Expand All @@ -35,8 +35,8 @@ import (
)

const (
treeID = "test-tree-id"
branchID = "test-branch-id"
testTreeID = "test-tree-id"
testBranchID = "test-branch-id"
)

var (
Expand Down Expand Up @@ -74,8 +74,8 @@ func (p *PersistenceSuite) TestGetBranchToken() {
},
expectError: false,
branchToken: validBranchToken,
treeID: treeID,
branchID: branchID,
treeID: testTreeID,
branchID: testBranchID,
},
{
entity: &persistence.ListConcreteExecutionsEntity{
Expand All @@ -96,8 +96,8 @@ func (p *PersistenceSuite) TestGetBranchToken() {
},
expectError: false,
branchToken: validBranchToken,
treeID: treeID,
branchID: branchID,
treeID: testTreeID,
branchID: testBranchID,
},
{
entity: &persistence.ListConcreteExecutionsEntity{
Expand Down Expand Up @@ -133,25 +133,25 @@ func (p *PersistenceSuite) TestGetBranchToken() {
}

for _, tc := range testCases {
branchToken, treeID, branchID, err := GetBranchToken(tc.entity, encoder)
branchToken, branch, err := getBranchToken(tc.entity.ExecutionInfo.BranchToken, tc.entity.VersionHistories, encoder)
if tc.expectError {
p.Error(err)
p.Nil(branchToken)
p.Empty(treeID)
p.Empty(branchID)
p.Empty(branch.GetTreeID())
p.Empty(branch.GetBranchID())
} else {
p.NoError(err)
p.Equal(tc.branchToken, branchToken)
p.Equal(tc.treeID, treeID)
p.Equal(tc.branchID, branchID)
p.Equal(tc.treeID, branch.GetTreeID())
p.Equal(tc.branchID, branch.GetBranchID())
}
}
}

func (p *PersistenceSuite) getValidBranchToken(encoder *codec.ThriftRWEncoder) []byte {
hb := &shared.HistoryBranch{
TreeID: common.StringPtr(treeID),
BranchID: common.StringPtr(branchID),
TreeID: common.StringPtr(testTreeID),
BranchID: common.StringPtr(testBranchID),
}
bytes, err := encoder.Encode(hb)
p.NoError(err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,42 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.

package iterator
package fetcher

import (
"github.com/uber/cadence/common/pagination"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/reconciliation/entity"
)

// CurrentExecution is used to retrieve Concrete executions.
func CurrentExecution(retryer persistence.Retryer, pageSize int) pagination.Iterator {
// CurrentExecutionIterator is used to retrieve Concrete executions.
func CurrentExecutionIterator(retryer persistence.Retryer, pageSize int) pagination.Iterator {
return pagination.NewIterator(nil, getCurrentExecution(retryer, pageSize))
}

// CurrentExecution returns a single execution
func CurrentExecution(retryer persistence.Retryer, request ExecutionRequest) (entity.Entity, error) {
req := persistence.GetCurrentExecutionRequest{
DomainID: request.DomainID,
WorkflowID: request.WorkflowID,
}
e, err := retryer.GetCurrentExecution(&req)
if err != nil {
return nil, err
}

return &entity.CurrentExecution{
CurrentRunID: e.RunID,
Execution: entity.Execution{
ShardID: retryer.GetShardID(),
DomainID: request.DomainID,
WorkflowID: request.WorkflowID,
RunID: e.RunID,
State: e.State,
},
}, nil
}

func getCurrentExecution(
pr persistence.Retryer,
pageSize int,
Expand Down
30 changes: 30 additions & 0 deletions common/reconciliation/fetcher/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// The MIT License (MIT)
//
// Copyright (c) 2017-2020 Uber Technologies Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.

package fetcher

// ExecutionRequest is used to fetch execution from persistence
type ExecutionRequest struct {
DomainID string
WorkflowID string
RunID string
}
Loading

0 comments on commit 541a12a

Please sign in to comment.