Skip to content

Commit

Permalink
Switch common/archiver to internal types (cadence-workflow#3788)
Browse files Browse the repository at this point in the history
  • Loading branch information
vytautas-karpavicius authored Nov 24, 2020
1 parent 25ea280 commit 2de9816
Show file tree
Hide file tree
Showing 32 changed files with 1,973 additions and 466 deletions.
18 changes: 9 additions & 9 deletions common/archiver/archivalMetadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ import (
"fmt"
"strings"

"github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/service/config"
"github.com/uber/cadence/common/service/dynamicconfig"
"github.com/uber/cadence/common/types"
)

type (
Expand All @@ -43,7 +43,7 @@ type (
ClusterConfiguredForArchival() bool
GetClusterStatus() ArchivalStatus
ReadEnabled() bool
GetDomainDefaultStatus() shared.ArchivalStatus
GetDomainDefaultStatus() types.ArchivalStatus
GetDomainDefaultURI() string
}

Expand All @@ -56,7 +56,7 @@ type (
staticClusterStatus ArchivalStatus
dynamicClusterStatus dynamicconfig.StringPropertyFn
enableRead dynamicconfig.BoolPropertyFn
domainDefaultStatus shared.ArchivalStatus
domainDefaultStatus types.ArchivalStatus
domainDefaultURI string
}

Expand Down Expand Up @@ -145,7 +145,7 @@ func NewDisabledArchvialConfig() ArchivalConfig {
staticClusterStatus: ArchivalDisabled,
dynamicClusterStatus: nil,
enableRead: nil,
domainDefaultStatus: shared.ArchivalStatusDisabled,
domainDefaultStatus: types.ArchivalStatusDisabled,
domainDefaultURI: "",
}
}
Expand Down Expand Up @@ -180,7 +180,7 @@ func (a *archivalConfig) ReadEnabled() bool {
return a.enableRead()
}

func (a *archivalConfig) GetDomainDefaultStatus() shared.ArchivalStatus {
func (a *archivalConfig) GetDomainDefaultStatus() types.ArchivalStatus {
return a.domainDefaultStatus
}

Expand All @@ -201,13 +201,13 @@ func getClusterArchivalStatus(str string) (ArchivalStatus, error) {
return ArchivalDisabled, fmt.Errorf("invalid archival status of %v for cluster, valid status are: {\"\", \"disabled\", \"paused\", \"enabled\"}", str)
}

func getDomainArchivalStatus(str string) (shared.ArchivalStatus, error) {
func getDomainArchivalStatus(str string) (types.ArchivalStatus, error) {
str = strings.TrimSpace(strings.ToLower(str))
switch str {
case "", common.ArchivalDisabled:
return shared.ArchivalStatusDisabled, nil
return types.ArchivalStatusDisabled, nil
case common.ArchivalEnabled:
return shared.ArchivalStatusEnabled, nil
return types.ArchivalStatusEnabled, nil
}
return shared.ArchivalStatusDisabled, fmt.Errorf("invalid archival status of %v for domain, valid status are: {\"\", \"disabled\", \"enabled\"}", str)
return types.ArchivalStatusDisabled, fmt.Errorf("invalid archival status of %v for domain, valid status are: {\"\", \"disabled\", \"enabled\"}", str)
}
3 changes: 1 addition & 2 deletions common/archiver/filestore/historyArchiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ import (
"path"
"strconv"

"github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/archiver"
"github.com/uber/cadence/common/backoff"
Expand Down Expand Up @@ -142,7 +141,7 @@ func (h *historyArchiver) Archive(
historyIterator = archiver.NewHistoryIterator(ctx, request, h.container.HistoryV2Manager, targetHistoryBlobSize)
}

historyBatches := []*shared.History{}
historyBatches := []*types.History{}
for historyIterator.HasNext() {
historyBlob, err := getNextHistoryBlob(ctx, historyIterator)
if err != nil {
Expand Down
73 changes: 36 additions & 37 deletions common/archiver/filestore/historyArchiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (
"github.com/stretchr/testify/suite"
"go.uber.org/zap"

"github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/archiver"
"github.com/uber/cadence/common/log/loggerimpl"
Expand Down Expand Up @@ -67,8 +66,8 @@ type historyArchiverSuite struct {
container *archiver.HistoryBootstrapContainer
testArchivalURI archiver.URI
testGetDirectory string
historyBatchesV1 []*shared.History
historyBatchesV100 []*shared.History
historyBatchesV1 []*types.History
historyBatchesV100 []*types.History
}

func TestHistoryArchiverSuite(t *testing.T) {
Expand Down Expand Up @@ -205,11 +204,11 @@ func (s *historyArchiverSuite) TestArchive_Fail_HistoryMutated() {
mockCtrl := gomock.NewController(s.T())
defer mockCtrl.Finish()
historyIterator := archiver.NewMockHistoryIterator(mockCtrl)
historyBatches := []*shared.History{
&shared.History{
Events: []*shared.HistoryEvent{
&shared.HistoryEvent{
EventId: common.Int64Ptr(common.FirstEventID + 1),
historyBatches := []*types.History{
{
Events: []*types.HistoryEvent{
{
EventID: common.Int64Ptr(common.FirstEventID + 1),
Timestamp: common.Int64Ptr(time.Now().UnixNano()),
Version: common.Int64Ptr(testCloseFailoverVersion + 1),
},
Expand Down Expand Up @@ -269,25 +268,25 @@ func (s *historyArchiverSuite) TestArchive_Success() {
mockCtrl := gomock.NewController(s.T())
defer mockCtrl.Finish()
historyIterator := archiver.NewMockHistoryIterator(mockCtrl)
historyBatches := []*shared.History{
&shared.History{
Events: []*shared.HistoryEvent{
&shared.HistoryEvent{
EventId: common.Int64Ptr(common.FirstEventID + 1),
historyBatches := []*types.History{
{
Events: []*types.HistoryEvent{
{
EventID: common.Int64Ptr(common.FirstEventID + 1),
Timestamp: common.Int64Ptr(time.Now().UnixNano()),
Version: common.Int64Ptr(testCloseFailoverVersion),
},
&shared.HistoryEvent{
EventId: common.Int64Ptr(common.FirstEventID + 2),
{
EventID: common.Int64Ptr(common.FirstEventID + 2),
Timestamp: common.Int64Ptr(time.Now().UnixNano()),
Version: common.Int64Ptr(testCloseFailoverVersion),
},
},
},
&shared.History{
Events: []*shared.HistoryEvent{
&shared.HistoryEvent{
EventId: common.Int64Ptr(testNextEventID - 1),
{
Events: []*types.HistoryEvent{
{
EventID: common.Int64Ptr(testNextEventID - 1),
Timestamp: common.Int64Ptr(time.Now().UnixNano()),
Version: common.Int64Ptr(testCloseFailoverVersion),
},
Expand Down Expand Up @@ -448,7 +447,7 @@ func (s *historyArchiverSuite) TestGet_Success_SmallPageSize() {
PageSize: 1,
CloseFailoverVersion: common.Int64Ptr(100),
}
combinedHistory := []*shared.History{}
combinedHistory := []*types.History{}

URI, err := archiver.NewURI("file://" + s.testGetDirectory)
s.NoError(err)
Expand Down Expand Up @@ -534,37 +533,37 @@ func (s *historyArchiverSuite) newTestHistoryArchiver(historyIterator archiver.H
}

func (s *historyArchiverSuite) setupHistoryDirectory() {
s.historyBatchesV1 = []*shared.History{
&shared.History{
Events: []*shared.HistoryEvent{
&shared.HistoryEvent{
EventId: common.Int64Ptr(testNextEventID - 1),
s.historyBatchesV1 = []*types.History{
{
Events: []*types.HistoryEvent{
{
EventID: common.Int64Ptr(testNextEventID - 1),
Timestamp: common.Int64Ptr(time.Now().UnixNano()),
Version: common.Int64Ptr(1),
},
},
},
}

s.historyBatchesV100 = []*shared.History{
&shared.History{
Events: []*shared.HistoryEvent{
&shared.HistoryEvent{
EventId: common.Int64Ptr(common.FirstEventID + 1),
s.historyBatchesV100 = []*types.History{
{
Events: []*types.HistoryEvent{
{
EventID: common.Int64Ptr(common.FirstEventID + 1),
Timestamp: common.Int64Ptr(time.Now().UnixNano()),
Version: common.Int64Ptr(testCloseFailoverVersion),
},
&shared.HistoryEvent{
EventId: common.Int64Ptr(common.FirstEventID + 1),
{
EventID: common.Int64Ptr(common.FirstEventID + 1),
Timestamp: common.Int64Ptr(time.Now().UnixNano()),
Version: common.Int64Ptr(testCloseFailoverVersion),
},
},
},
&shared.History{
Events: []*shared.HistoryEvent{
&shared.HistoryEvent{
EventId: common.Int64Ptr(testNextEventID - 1),
{
Events: []*types.HistoryEvent{
{
EventID: common.Int64Ptr(testNextEventID - 1),
Timestamp: common.Int64Ptr(time.Now().UnixNano()),
Version: common.Int64Ptr(testCloseFailoverVersion),
},
Expand All @@ -576,7 +575,7 @@ func (s *historyArchiverSuite) setupHistoryDirectory() {
s.writeHistoryBatchesForGetTest(s.historyBatchesV100, testCloseFailoverVersion)
}

func (s *historyArchiverSuite) writeHistoryBatchesForGetTest(historyBatches []*shared.History, version int64) {
func (s *historyArchiverSuite) writeHistoryBatchesForGetTest(historyBatches []*types.History, version int64) {
data, err := encode(historyBatches)
s.Require().NoError(err)
filename := constructHistoryFilename(testDomainID, testWorkflowID, testRunID, version)
Expand Down
30 changes: 15 additions & 15 deletions common/archiver/filestore/queryParser.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ import (

"github.com/xwb1989/sqlparser"

"github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/types"
)

type (
Expand All @@ -49,7 +49,7 @@ type (
workflowID *string
runID *string
workflowTypeName *string
closeStatus *shared.WorkflowExecutionCloseStatus
closeStatus *types.WorkflowExecutionCloseStatus
emptyResult bool
}
)
Expand Down Expand Up @@ -241,21 +241,21 @@ func convertToTimestamp(timeStr string) (int64, error) {
return parsedTime.UnixNano(), nil
}

func convertStatusStr(statusStr string) (shared.WorkflowExecutionCloseStatus, error) {
func convertStatusStr(statusStr string) (types.WorkflowExecutionCloseStatus, error) {
statusStr = strings.ToLower(strings.TrimSpace(statusStr))
switch statusStr {
case "completed", strconv.Itoa(int(shared.WorkflowExecutionCloseStatusCompleted)):
return shared.WorkflowExecutionCloseStatusCompleted, nil
case "failed", strconv.Itoa(int(shared.WorkflowExecutionCloseStatusFailed)):
return shared.WorkflowExecutionCloseStatusFailed, nil
case "canceled", strconv.Itoa(int(shared.WorkflowExecutionCloseStatusCanceled)):
return shared.WorkflowExecutionCloseStatusCanceled, nil
case "terminated", strconv.Itoa(int(shared.WorkflowExecutionCloseStatusTerminated)):
return shared.WorkflowExecutionCloseStatusTerminated, nil
case "continuedasnew", "continued_as_new", strconv.Itoa(int(shared.WorkflowExecutionCloseStatusContinuedAsNew)):
return shared.WorkflowExecutionCloseStatusContinuedAsNew, nil
case "timedout", "timed_out", strconv.Itoa(int(shared.WorkflowExecutionCloseStatusTimedOut)):
return shared.WorkflowExecutionCloseStatusTimedOut, nil
case "completed", strconv.Itoa(int(types.WorkflowExecutionCloseStatusCompleted)):
return types.WorkflowExecutionCloseStatusCompleted, nil
case "failed", strconv.Itoa(int(types.WorkflowExecutionCloseStatusFailed)):
return types.WorkflowExecutionCloseStatusFailed, nil
case "canceled", strconv.Itoa(int(types.WorkflowExecutionCloseStatusCanceled)):
return types.WorkflowExecutionCloseStatusCanceled, nil
case "terminated", strconv.Itoa(int(types.WorkflowExecutionCloseStatusTerminated)):
return types.WorkflowExecutionCloseStatusTerminated, nil
case "continuedasnew", "continued_as_new", strconv.Itoa(int(types.WorkflowExecutionCloseStatusContinuedAsNew)):
return types.WorkflowExecutionCloseStatusContinuedAsNew, nil
case "timedout", "timed_out", strconv.Itoa(int(types.WorkflowExecutionCloseStatusTimedOut)):
return types.WorkflowExecutionCloseStatusTimedOut, nil
default:
return 0, fmt.Errorf("unknown workflow close status: %s", statusStr)
}
Expand Down
14 changes: 7 additions & 7 deletions common/archiver/filestore/queryParser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ import (
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"

"github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/types"
)

type queryParserSuite struct {
Expand Down Expand Up @@ -151,28 +151,28 @@ func (s *queryParserSuite) TestParseCloseStatus() {
query: "CloseStatus = \"Completed\"",
expectErr: false,
parsedQuery: &parsedQuery{
closeStatus: shared.WorkflowExecutionCloseStatusCompleted.Ptr(),
closeStatus: types.WorkflowExecutionCloseStatusCompleted.Ptr(),
},
},
{
query: "CloseStatus = 'continuedasnew'",
expectErr: false,
parsedQuery: &parsedQuery{
closeStatus: shared.WorkflowExecutionCloseStatusContinuedAsNew.Ptr(),
closeStatus: types.WorkflowExecutionCloseStatusContinuedAsNew.Ptr(),
},
},
{
query: "CloseStatus = 'TIMED_OUT'",
expectErr: false,
parsedQuery: &parsedQuery{
closeStatus: shared.WorkflowExecutionCloseStatusTimedOut.Ptr(),
closeStatus: types.WorkflowExecutionCloseStatusTimedOut.Ptr(),
},
},
{
query: "CloseStatus = 'Failed' and CloseStatus = \"Failed\"",
expectErr: false,
parsedQuery: &parsedQuery{
closeStatus: shared.WorkflowExecutionCloseStatusFailed.Ptr(),
closeStatus: types.WorkflowExecutionCloseStatusFailed.Ptr(),
},
},
{
Expand Down Expand Up @@ -202,7 +202,7 @@ func (s *queryParserSuite) TestParseCloseStatus() {
query: "CloseStatus = 1",
expectErr: false,
parsedQuery: &parsedQuery{
closeStatus: shared.WorkflowExecutionCloseStatusFailed.Ptr(),
closeStatus: types.WorkflowExecutionCloseStatusFailed.Ptr(),
},
},
{
Expand Down Expand Up @@ -314,7 +314,7 @@ func (s *queryParserSuite) TestParse() {
earliestCloseTime: 2000,
latestCloseTime: 9999,
runID: common.StringPtr("random runID"),
closeStatus: shared.WorkflowExecutionCloseStatusFailed.Ptr(),
closeStatus: types.WorkflowExecutionCloseStatusFailed.Ptr(),
},
},
{
Expand Down
10 changes: 5 additions & 5 deletions common/archiver/filestore/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ import (

"github.com/dgryski/go-farm"

"github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/common/archiver"
"github.com/uber/cadence/common/types"
"github.com/uber/cadence/common/util"
)

Expand All @@ -46,8 +46,8 @@ func encode(v interface{}) ([]byte, error) {
return json.Marshal(v)
}

func decodeHistoryBatches(data []byte) ([]*shared.History, error) {
historyBatches := []*shared.History{}
func decodeHistoryBatches(data []byte) ([]*types.History, error) {
historyBatches := []*types.History{}
err := json.Unmarshal(data, &historyBatches)
if err != nil {
return nil, err
Expand Down Expand Up @@ -133,7 +133,7 @@ func extractCloseFailoverVersion(filename string) (int64, error) {
return strconv.ParseInt(filenameParts[1], 10, 64)
}

func historyMutated(request *archiver.ArchiveHistoryRequest, historyBatches []*shared.History, isLast bool) bool {
func historyMutated(request *archiver.ArchiveHistoryRequest, historyBatches []*types.History, isLast bool) bool {
lastBatch := historyBatches[len(historyBatches)-1].Events
lastEvent := lastBatch[len(lastBatch)-1]
lastFailoverVersion := lastEvent.GetVersion()
Expand All @@ -144,7 +144,7 @@ func historyMutated(request *archiver.ArchiveHistoryRequest, historyBatches []*s
if !isLast {
return false
}
lastEventID := lastEvent.GetEventId()
lastEventID := lastEvent.GetEventID()
return lastFailoverVersion != request.CloseFailoverVersion || lastEventID+1 != request.NextEventID
}

Expand Down
Loading

0 comments on commit 2de9816

Please sign in to comment.