Skip to content

Commit

Permalink
Remove unused functions from TaskAckManager (cadence-workflow#4872)
Browse files Browse the repository at this point in the history
  • Loading branch information
vytautas-karpavicius authored Jun 21, 2022
1 parent dc5230f commit 608bcb5
Show file tree
Hide file tree
Showing 2 changed files with 0 additions and 236 deletions.
98 changes: 0 additions & 98 deletions service/history/replication/task_ack_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,11 @@ import (
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/backoff"
"github.com/uber/cadence/common/cache"
"github.com/uber/cadence/common/collection"
"github.com/uber/cadence/common/dynamicconfig"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/common/persistence"
persistenceutils "github.com/uber/cadence/common/persistence/persistence-utils"
"github.com/uber/cadence/common/quotas"
"github.com/uber/cadence/common/types"
exec "github.com/uber/cadence/service/history/execution"
Expand All @@ -51,7 +49,6 @@ import (
var (
errUnknownQueueTask = errors.New("unknown task type")
errUnknownReplicationTask = errors.New("unknown replication task")
defaultHistoryPageSize = 1000
minReadTaskSize = 20
)

Expand Down Expand Up @@ -368,33 +365,6 @@ func (t *taskAckManagerImpl) getEventsBlob(
return eventBatchBlobs[0].ToInternal(), nil
}

func (t *taskAckManagerImpl) isNewRunNDCEnabled(
ctx context.Context,
domainID string,
workflowID string,
runID string,
) (isNDCWorkflow bool, retError error) {

context, release, err := t.executionCache.GetOrCreateWorkflowExecution(
ctx,
domainID,
types.WorkflowExecution{
WorkflowID: workflowID,
RunID: runID,
},
)
if err != nil {
return false, err
}
defer func() { release(retError) }()

mutableState, err := context.LoadWorkflowExecution(ctx)
if err != nil {
return false, err
}
return mutableState.GetVersionHistories() != nil, nil
}

func (t *taskAckManagerImpl) readTasksWithBatchSize(
ctx context.Context,
readLevel int64,
Expand Down Expand Up @@ -422,74 +392,6 @@ func (t *taskAckManagerImpl) readTasksWithBatchSize(
return tasks, len(response.NextPageToken) != 0, nil
}

func (t *taskAckManagerImpl) getAllHistory(
ctx context.Context,
firstEventID int64,
nextEventID int64,
branchToken []byte,
) (*types.History, error) {

// overall result
shardID := t.shard.GetShardID()
var historyEvents []*types.HistoryEvent
historySize := 0
iterator := collection.NewPagingIterator(
t.getPaginationFunc(
ctx,
firstEventID,
nextEventID,
branchToken,
shardID,
&historySize,
),
)
for iterator.HasNext() {
event, err := iterator.Next()
if err != nil {
return nil, err
}
historyEvents = append(historyEvents, event.(*types.HistoryEvent))
}
t.metricsClient.RecordTimer(metrics.ReplicatorQueueProcessorScope, metrics.HistorySize, time.Duration(historySize))
history := &types.History{
Events: historyEvents,
}
return history, nil
}

func (t *taskAckManagerImpl) getPaginationFunc(
ctx context.Context,
firstEventID int64,
nextEventID int64,
branchToken []byte,
shardID int,
historySize *int,
) collection.PaginationFn {

return func(paginationToken []byte) ([]interface{}, []byte, error) {
events, _, pageToken, pageHistorySize, err := persistenceutils.PaginateHistory(
ctx,
t.historyManager,
false,
branchToken,
firstEventID,
nextEventID,
paginationToken,
defaultHistoryPageSize,
common.IntPtr(shardID),
)
if err != nil {
return nil, nil, err
}
*historySize += pageHistorySize
var paginateItems []interface{}
for _, event := range events {
paginateItems = append(paginateItems, event)
}
return paginateItems, pageToken, nil
}
}

func (t *taskAckManagerImpl) generateFailoverMarkerTask(
taskInfo *persistence.ReplicationTaskInfo,
) *types.ReplicationTask {
Expand Down
138 changes: 0 additions & 138 deletions service/history/replication/task_ack_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,64 +114,6 @@ func (s *taskAckManagerSuite) TearDownTest() {
s.mockShard.Finish(s.T())
}

func (s *taskAckManagerSuite) TestGetPaginationFunc() {
firstEventID := int64(0)
nextEventID := int64(1)
var branchToken []byte
shardID := 0
historyCount := 0
pagingFunc := s.ackManager.getPaginationFunc(context.Background(), firstEventID, nextEventID, branchToken, shardID, &historyCount)

pageToken := []byte{1}
event := &types.HistoryEvent{
ID: 1,
}
s.mockHistoryMgr.On("ReadHistoryBranch", mock.Anything, mock.Anything).Return(&persistence.ReadHistoryBranchResponse{
HistoryEvents: []*types.HistoryEvent{event},
NextPageToken: pageToken,
Size: 1,
LastFirstEventID: 1,
}, nil)
events, token, err := pagingFunc(nil)
s.NoError(err)
s.Equal(pageToken, token)
s.Len(events, 1)
s.Equal(events[0].(*types.HistoryEvent), event)
s.Equal(historyCount, 1)
}

func (s *taskAckManagerSuite) TestGetAllHistory_OK() {
firstEventID := int64(0)
nextEventID := int64(1)
var branchToken []byte
event := &types.HistoryEvent{
ID: 1,
}

s.mockHistoryMgr.On("ReadHistoryBranch", mock.Anything, mock.Anything).Return(&persistence.ReadHistoryBranchResponse{
HistoryEvents: []*types.HistoryEvent{event},
NextPageToken: nil,
Size: 1,
LastFirstEventID: 1,
}, nil)

history, err := s.ackManager.getAllHistory(context.Background(), firstEventID, nextEventID, branchToken)
s.NoError(err)
s.Len(history.GetEvents(), 1)
s.Equal(event, history.GetEvents()[0])
}

func (s *taskAckManagerSuite) TestGetAllHistory_Error() {
firstEventID := int64(0)
nextEventID := int64(1)
var branchToken []byte
s.mockHistoryMgr.On("ReadHistoryBranch", mock.Anything, mock.Anything).Return(nil, errors.New("test"))

history, err := s.ackManager.getAllHistory(context.Background(), firstEventID, nextEventID, branchToken)
s.Error(err)
s.Nil(history)
}

func (s *taskAckManagerSuite) TestReadTasksWithBatchSize_OK() {
task := &persistence.ReplicationTaskInfo{
DomainID: uuid.New(),
Expand All @@ -197,86 +139,6 @@ func (s *taskAckManagerSuite) TestReadTasksWithBatchSize_Error() {
s.Len(taskInfo, 0)
}

func (s *taskAckManagerSuite) TestIsNewRunNDCEnabled_True() {
domainID := uuid.New()
workflowID := uuid.New()
runID := uuid.New()
workflowContext, release, _ := s.ackManager.executionCache.GetOrCreateWorkflowExecutionForBackground(
domainID,
types.WorkflowExecution{
WorkflowID: workflowID,
RunID: runID,
},
)
workflowContext.SetWorkflowExecution(s.mockMutableState)
release(nil)

s.mockMutableState.EXPECT().StartTransaction(gomock.Any(), gomock.Any()).Return(false, nil).Times(1)
s.mockMutableState.EXPECT().IsWorkflowExecutionRunning().Return(false).AnyTimes()
s.mockMutableState.EXPECT().GetVersionHistories().Return(&persistence.VersionHistories{})
s.mockDomainCache.EXPECT().GetDomainByID(domainID).Return(cache.NewGlobalDomainCacheEntryForTest(
&persistence.DomainInfo{ID: domainID, Name: "domainName"},
&persistence.DomainConfig{Retention: 1},
&persistence.DomainReplicationConfig{
ActiveClusterName: cluster.TestCurrentClusterName,
Clusters: []*persistence.ClusterReplicationConfig{
{ClusterName: cluster.TestCurrentClusterName},
{ClusterName: cluster.TestAlternativeClusterName},
},
},
1,
), nil).AnyTimes()

isNDC, err := s.ackManager.isNewRunNDCEnabled(
context.Background(),
domainID,
workflowID,
runID,
)
s.NoError(err)
s.True(isNDC)
}

func (s *taskAckManagerSuite) TestIsNewRunNDCEnabled_False() {
domainID := uuid.New()
workflowID := uuid.New()
runID := uuid.New()
workflowContext, release, _ := s.ackManager.executionCache.GetOrCreateWorkflowExecutionForBackground(
domainID,
types.WorkflowExecution{
WorkflowID: workflowID,
RunID: runID,
},
)
workflowContext.SetWorkflowExecution(s.mockMutableState)
release(nil)

s.mockMutableState.EXPECT().StartTransaction(gomock.Any(), gomock.Any()).Return(false, nil).Times(1)
s.mockMutableState.EXPECT().IsWorkflowExecutionRunning().Return(false).AnyTimes()
s.mockMutableState.EXPECT().GetVersionHistories().Return(nil)
s.mockDomainCache.EXPECT().GetDomainByID(domainID).Return(cache.NewGlobalDomainCacheEntryForTest(
&persistence.DomainInfo{ID: domainID, Name: "domainName"},
&persistence.DomainConfig{Retention: 1},
&persistence.DomainReplicationConfig{
ActiveClusterName: cluster.TestCurrentClusterName,
Clusters: []*persistence.ClusterReplicationConfig{
{ClusterName: cluster.TestCurrentClusterName},
{ClusterName: cluster.TestAlternativeClusterName},
},
},
1,
), nil).AnyTimes()

isNDC, err := s.ackManager.isNewRunNDCEnabled(
context.Background(),
domainID,
workflowID,
runID,
)
s.NoError(err)
s.False(isNDC)
}

func (s *taskAckManagerSuite) TestGetVersionHistoryItems_Error() {
_, _, err := getVersionHistoryItems(nil, 0, 0)
s.Error(err)
Expand Down

0 comments on commit 608bcb5

Please sign in to comment.