Skip to content

Commit

Permalink
Switch history related types to internal (cadence-workflow#3835)
Browse files Browse the repository at this point in the history
  • Loading branch information
vytautas-karpavicius authored Dec 9, 2020
1 parent df3c714 commit 54559f4
Show file tree
Hide file tree
Showing 69 changed files with 2,628 additions and 2,624 deletions.
10 changes: 1 addition & 9 deletions common/archiver/historyIterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/types"
"github.com/uber/cadence/common/types/mapper/thrift"
)

const (
Expand Down Expand Up @@ -237,14 +236,7 @@ func (i *historyIterator) readHistory(ctx context.Context, firstEventID int64) (
PageSize: i.historyPageSize,
ShardID: common.IntPtr(i.request.ShardID),
}
thriftHistoryBatches, _, _, err := persistence.ReadFullPageV2EventsByBatch(ctx, i.historyV2Manager, req)
if thriftHistoryBatches == nil {
return nil, err
}
historyBatches := make([]*types.History, len(thriftHistoryBatches))
for i := range historyBatches {
historyBatches[i] = thrift.ToHistory(thriftHistoryBatches[i])
}
historyBatches, _, _, err := persistence.ReadFullPageV2EventsByBatch(ctx, i.historyV2Manager, req)
return historyBatches, err

}
Expand Down
15 changes: 7 additions & 8 deletions common/archiver/historyIterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"

"github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/mocks"
"github.com/uber/cadence/common/persistence"
Expand Down Expand Up @@ -102,7 +101,7 @@ func (s *HistoryIteratorSuite) TestReadHistory_Failed_EventsV2() {
func (s *HistoryIteratorSuite) TestReadHistory_Success_EventsV2() {
mockHistoryV2Manager := &mocks.HistoryV2Manager{}
resp := persistence.ReadHistoryBranchByBatchResponse{
History: []*shared.History{},
History: []*types.History{},
NextPageToken: []byte{},
}
mockHistoryV2Manager.On("ReadHistoryBranchByBatch", mock.Anything, mock.Anything).Return(&resp, nil)
Expand Down Expand Up @@ -659,14 +658,14 @@ func (s *HistoryIteratorSuite) assertStateMatches(expected historyIteratorState,
s.Equal(expected.FinishedIteration, itr.FinishedIteration)
}

func (s *HistoryIteratorSuite) constructHistoryBatches(batchInfo []int, page page, firstEventID int64) []*shared.History {
batches := []*shared.History{}
func (s *HistoryIteratorSuite) constructHistoryBatches(batchInfo []int, page page, firstEventID int64) []*types.History {
batches := []*types.History{}
eventsID := firstEventID
for batchIdx, numEvents := range batchInfo[page.firstbatchIdx : page.firstbatchIdx+page.numBatches] {
events := []*shared.HistoryEvent{}
events := []*types.HistoryEvent{}
for i := 0; i < numEvents; i++ {
event := &shared.HistoryEvent{
EventId: common.Int64Ptr(eventsID),
event := &types.HistoryEvent{
EventID: common.Int64Ptr(eventsID),
Version: common.Int64Ptr(page.firstEventFailoverVersion),
}
eventsID++
Expand All @@ -675,7 +674,7 @@ func (s *HistoryIteratorSuite) constructHistoryBatches(batchInfo []int, page pag
}
events = append(events, event)
}
batches = append(batches, &shared.History{
batches = append(batches, &types.History{
Events: events,
})
}
Expand Down
14 changes: 7 additions & 7 deletions common/ndc/history_resender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,18 +136,18 @@ func (s *historyResenderSuite) TestSendSingleWorkflowHistory() {
startEventVersion := int64(100)
token := []byte{1}
pageSize := defaultPageSize
eventBatch := []*shared.HistoryEvent{
eventBatch := []*types.HistoryEvent{
{
EventId: common.Int64Ptr(2),
EventID: common.Int64Ptr(2),
Version: common.Int64Ptr(123),
Timestamp: common.Int64Ptr(time.Now().UnixNano()),
EventType: shared.EventTypeDecisionTaskScheduled.Ptr(),
EventType: types.EventTypeDecisionTaskScheduled.Ptr(),
},
{
EventId: common.Int64Ptr(3),
EventID: common.Int64Ptr(3),
Version: common.Int64Ptr(123),
Timestamp: common.Int64Ptr(time.Now().UnixNano()),
EventType: shared.EventTypeDecisionTaskStarted.Ptr(),
EventType: types.EventTypeDecisionTaskStarted.Ptr(),
},
}
blob := s.serializeEvents(eventBatch)
Expand Down Expand Up @@ -401,8 +401,8 @@ func (s *historyResenderSuite) TestCurrentExecutionCheck() {
s.True(skipTask)
}

func (s *historyResenderSuite) serializeEvents(events []*shared.HistoryEvent) *shared.DataBlob {
blob, err := s.serializer.SerializeBatchEvents(thrift.ToHistoryEventArray(events), common.EncodingTypeThriftRW)
func (s *historyResenderSuite) serializeEvents(events []*types.HistoryEvent) *shared.DataBlob {
blob, err := s.serializer.SerializeBatchEvents(events, common.EncodingTypeThriftRW)
s.Nil(err)
return &shared.DataBlob{
EncodingType: shared.EncodingTypeThriftRW.Ptr(),
Expand Down
24 changes: 12 additions & 12 deletions common/persistence/dataInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ type (
ParentRunID string
InitiatedID int64
CompletionEventBatchID int64
CompletionEvent *workflow.HistoryEvent
CompletionEvent *types.HistoryEvent
TaskList string
WorkflowTypeName string
WorkflowTimeout int32
Expand Down Expand Up @@ -657,7 +657,7 @@ type (
SignalRequestedIDs map[string]struct{}
ExecutionInfo *WorkflowExecutionInfo
ExecutionStats *ExecutionStats
BufferedEvents []*workflow.HistoryEvent
BufferedEvents []*types.HistoryEvent
VersionHistories *VersionHistories
ReplicationState *ReplicationState // TODO: remove this after all 2DC workflows complete
Checksum checksum.Checksum
Expand All @@ -668,10 +668,10 @@ type (
Version int64
ScheduleID int64
ScheduledEventBatchID int64
ScheduledEvent *workflow.HistoryEvent
ScheduledEvent *types.HistoryEvent
ScheduledTime time.Time
StartedID int64
StartedEvent *workflow.HistoryEvent
StartedEvent *types.HistoryEvent
StartedTime time.Time
DomainID string
ActivityID string
Expand Down Expand Up @@ -717,15 +717,15 @@ type (
Version int64
InitiatedID int64
InitiatedEventBatchID int64
InitiatedEvent *workflow.HistoryEvent
InitiatedEvent *types.HistoryEvent
StartedID int64
StartedWorkflowID string
StartedRunID string
StartedEvent *workflow.HistoryEvent
StartedEvent *types.HistoryEvent
CreateRequestID string
DomainName string
WorkflowTypeName string
ParentClosePolicy workflow.ParentClosePolicy
ParentClosePolicy types.ParentClosePolicy
}

// RequestCancelInfo has details for pending external workflow cancellations
Expand Down Expand Up @@ -911,7 +911,7 @@ type (
WorkflowID string
RunID string
BranchToken []byte
Events []*workflow.HistoryEvent
Events []*types.HistoryEvent
}

// WorkflowMutation is used as generic workflow execution state mutation
Expand All @@ -932,7 +932,7 @@ type (
DeleteSignalInfos []int64
UpsertSignalRequestedIDs []string
DeleteSignalRequestedIDs []string
NewBufferedEvents []*workflow.HistoryEvent
NewBufferedEvents []*types.HistoryEvent
ClearBufferedEvents bool

TransferTasks []Task
Expand Down Expand Up @@ -1357,7 +1357,7 @@ type (
// The branch to be appended
BranchToken []byte
// The batch of events to be appended. The first eventID will become the nodeID of this batch
Events []*workflow.HistoryEvent
Events []*types.HistoryEvent
// requested TransactionID for this write operation. For the same eventID, the node with larger TransactionID always wins
TransactionID int64
// optional binary encoding type
Expand Down Expand Up @@ -1392,7 +1392,7 @@ type (
// ReadHistoryBranchResponse is the response to ReadHistoryBranchRequest
ReadHistoryBranchResponse struct {
// History events
HistoryEvents []*workflow.HistoryEvent
HistoryEvents []*types.HistoryEvent
// Token to read next page if there are more events beyond page size.
// Use this to set NextPageToken on ReadHistoryBranchRequest to read the next page.
// Empty means we have reached the last page, not need to continue
Expand All @@ -1406,7 +1406,7 @@ type (
// ReadHistoryBranchByBatchResponse is the response to ReadHistoryBranchRequest
ReadHistoryBranchByBatchResponse struct {
// History events by batch
History []*workflow.History
History []*types.History
// Token to read next page if there are more events beyond page size.
// Use this to set NextPageToken on ReadHistoryBranchRequest to read the next page.
// Empty means we have reached the last page, not need to continue
Expand Down
31 changes: 15 additions & 16 deletions common/persistence/executionStore.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"context"
"time"

workflow "github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/types"
Expand Down Expand Up @@ -132,7 +131,7 @@ func (m *executionManagerImpl) DeserializeExecutionInfo(
}

newInfo := &WorkflowExecutionInfo{
CompletionEvent: thrift.FromHistoryEvent(completionEvent),
CompletionEvent: completionEvent,

DomainID: info.DomainID,
WorkflowID: info.WorkflowID,
Expand Down Expand Up @@ -196,7 +195,7 @@ func (m *executionManagerImpl) DeserializeExecutionInfo(

func (m *executionManagerImpl) DeserializeBufferedEvents(
blobs []*DataBlob,
) ([]*workflow.HistoryEvent, error) {
) ([]*types.HistoryEvent, error) {

events := make([]*types.HistoryEvent, 0)
for _, b := range blobs {
Expand All @@ -206,7 +205,7 @@ func (m *executionManagerImpl) DeserializeBufferedEvents(
}
events = append(events, history...)
}
return thrift.FromHistoryEventArray(events), nil
return events, nil
}

func (m *executionManagerImpl) DeserializeChildExecutionInfos(
Expand All @@ -224,8 +223,8 @@ func (m *executionManagerImpl) DeserializeChildExecutionInfos(
return nil, err
}
c := &ChildExecutionInfo{
InitiatedEvent: thrift.FromHistoryEvent(initiatedEvent),
StartedEvent: thrift.FromHistoryEvent(startedEvent),
InitiatedEvent: initiatedEvent,
StartedEvent: startedEvent,

Version: v.Version,
InitiatedID: v.InitiatedID,
Expand All @@ -236,7 +235,7 @@ func (m *executionManagerImpl) DeserializeChildExecutionInfos(
CreateRequestID: v.CreateRequestID,
DomainName: v.DomainName,
WorkflowTypeName: v.WorkflowTypeName,
ParentClosePolicy: *thrift.FromParentClosePolicy(&v.ParentClosePolicy),
ParentClosePolicy: v.ParentClosePolicy,
}

// Needed for backward compatibility reason.
Expand Down Expand Up @@ -270,8 +269,8 @@ func (m *executionManagerImpl) DeserializeActivityInfos(
return nil, err
}
a := &ActivityInfo{
ScheduledEvent: thrift.FromHistoryEvent(scheduledEvent),
StartedEvent: thrift.FromHistoryEvent(startedEvent),
ScheduledEvent: scheduledEvent,
StartedEvent: startedEvent,

Version: v.Version,
ScheduleID: v.ScheduleID,
Expand Down Expand Up @@ -348,11 +347,11 @@ func (m *executionManagerImpl) SerializeUpsertChildExecutionInfos(

newInfos := make([]*InternalChildExecutionInfo, 0)
for _, v := range infos {
initiatedEvent, err := m.serializer.SerializeEvent(thrift.ToHistoryEvent(v.InitiatedEvent), encoding)
initiatedEvent, err := m.serializer.SerializeEvent(v.InitiatedEvent, encoding)
if err != nil {
return nil, err
}
startedEvent, err := m.serializer.SerializeEvent(thrift.ToHistoryEvent(v.StartedEvent), encoding)
startedEvent, err := m.serializer.SerializeEvent(v.StartedEvent, encoding)
if err != nil {
return nil, err
}
Expand All @@ -369,7 +368,7 @@ func (m *executionManagerImpl) SerializeUpsertChildExecutionInfos(
StartedRunID: v.StartedRunID,
DomainName: v.DomainName,
WorkflowTypeName: v.WorkflowTypeName,
ParentClosePolicy: *thrift.ToParentClosePolicy(&v.ParentClosePolicy),
ParentClosePolicy: v.ParentClosePolicy,
}
newInfos = append(newInfos, i)
}
Expand All @@ -383,11 +382,11 @@ func (m *executionManagerImpl) SerializeUpsertActivityInfos(

newInfos := make([]*InternalActivityInfo, 0)
for _, v := range infos {
scheduledEvent, err := m.serializer.SerializeEvent(thrift.ToHistoryEvent(v.ScheduledEvent), encoding)
scheduledEvent, err := m.serializer.SerializeEvent(v.ScheduledEvent, encoding)
if err != nil {
return nil, err
}
startedEvent, err := m.serializer.SerializeEvent(thrift.ToHistoryEvent(v.StartedEvent), encoding)
startedEvent, err := m.serializer.SerializeEvent(v.StartedEvent, encoding)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -441,7 +440,7 @@ func (m *executionManagerImpl) SerializeExecutionInfo(
if info == nil {
return &InternalWorkflowExecutionInfo{}, nil
}
completionEvent, err := m.serializer.SerializeEvent(thrift.ToHistoryEvent(info.CompletionEvent), encoding)
completionEvent, err := m.serializer.SerializeEvent(info.CompletionEvent, encoding)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -636,7 +635,7 @@ func (m *executionManagerImpl) SerializeWorkflowMutation(
}
var serializedNewBufferedEvents *DataBlob
if input.NewBufferedEvents != nil {
serializedNewBufferedEvents, err = m.serializer.SerializeBatchEvents(thrift.ToHistoryEventArray(input.NewBufferedEvents), encoding)
serializedNewBufferedEvents, err = m.serializer.SerializeBatchEvents(input.NewBufferedEvents, encoding)
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit 54559f4

Please sign in to comment.