Skip to content

Commit

Permalink
Switch shard related types to internal (cadence-workflow#3768)
Browse files Browse the repository at this point in the history
  • Loading branch information
vytautas-karpavicius authored Nov 19, 2020
1 parent 25281df commit 32b167b
Show file tree
Hide file tree
Showing 17 changed files with 169 additions and 174 deletions.
39 changes: 18 additions & 21 deletions common/persistence/dataInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,6 @@ import (
"strings"
"time"

"github.com/uber/cadence/.gen/go/history"
"github.com/uber/cadence/.gen/go/replicator"

"github.com/pborman/uuid"

workflow "github.com/uber/cadence/.gen/go/shared"
Expand Down Expand Up @@ -248,24 +245,24 @@ type (

// ShardInfo describes a shard
ShardInfo struct {
ShardID int `json:"shard_id"`
Owner string `json:"owner"`
RangeID int64 `json:"range_id"`
StolenSinceRenew int `json:"stolen_since_renew"`
UpdatedAt time.Time `json:"updated_at"`
ReplicationAckLevel int64 `json:"replication_ack_level"`
ReplicationDLQAckLevel map[string]int64 `json:"replication_dlq_ack_level"`
TransferAckLevel int64 `json:"transfer_ack_level"`
TimerAckLevel time.Time `json:"timer_ack_level"`
ClusterTransferAckLevel map[string]int64 `json:"cluster_transfer_ack_level"`
ClusterTimerAckLevel map[string]time.Time `json:"cluster_timer_ack_level"`
TransferProcessingQueueStates *history.ProcessingQueueStates `json:"transfer_processing_queue_states"`
TimerProcessingQueueStates *history.ProcessingQueueStates `json:"timer_processing_queue_states"`
TransferFailoverLevels map[string]TransferFailoverLevel // uuid -> TransferFailoverLevel
TimerFailoverLevels map[string]TimerFailoverLevel // uuid -> TimerFailoverLevel
ClusterReplicationLevel map[string]int64 `json:"cluster_replication_level"`
DomainNotificationVersion int64 `json:"domain_notification_version"`
PendingFailoverMarkers []*replicator.FailoverMarkerAttributes `json:"pending_failover_markers"`
ShardID int `json:"shard_id"`
Owner string `json:"owner"`
RangeID int64 `json:"range_id"`
StolenSinceRenew int `json:"stolen_since_renew"`
UpdatedAt time.Time `json:"updated_at"`
ReplicationAckLevel int64 `json:"replication_ack_level"`
ReplicationDLQAckLevel map[string]int64 `json:"replication_dlq_ack_level"`
TransferAckLevel int64 `json:"transfer_ack_level"`
TimerAckLevel time.Time `json:"timer_ack_level"`
ClusterTransferAckLevel map[string]int64 `json:"cluster_transfer_ack_level"`
ClusterTimerAckLevel map[string]time.Time `json:"cluster_timer_ack_level"`
TransferProcessingQueueStates *types.ProcessingQueueStates `json:"transfer_processing_queue_states"`
TimerProcessingQueueStates *types.ProcessingQueueStates `json:"timer_processing_queue_states"`
TransferFailoverLevels map[string]TransferFailoverLevel // uuid -> TransferFailoverLevel
TimerFailoverLevels map[string]TimerFailoverLevel // uuid -> TimerFailoverLevel
ClusterReplicationLevel map[string]int64 `json:"cluster_replication_level"`
DomainNotificationVersion int64 `json:"domain_notification_version"`
PendingFailoverMarkers []*types.FailoverMarkerAttributes `json:"pending_failover_markers"`
}

// TransferFailoverLevel contains corresponding start / end level
Expand Down
26 changes: 13 additions & 13 deletions common/persistence/persistence-tests/executionManagerTest.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,12 @@ import (
log "github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"

"github.com/uber/cadence/.gen/go/history"
gen "github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/checksum"
"github.com/uber/cadence/common/cluster"
p "github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/types"
)

type (
Expand Down Expand Up @@ -5179,54 +5179,54 @@ func timeComparator(t1, t2 time.Time, timeTolerance time.Duration) bool {
return false
}

func createTransferPQS(cluster1 string, level1 int32, ackLevel1 int64, cluster2 string, level2 int32, ackLevel2 int64) history.ProcessingQueueStates {
domainFilter := &history.DomainFilter{
func createTransferPQS(cluster1 string, level1 int32, ackLevel1 int64, cluster2 string, level2 int32, ackLevel2 int64) types.ProcessingQueueStates {
domainFilter := &types.DomainFilter{
DomainIDs: nil,
ReverseMatch: common.BoolPtr(true),
}
processingQueueStateMap := map[string][]*history.ProcessingQueueState{
processingQueueStateMap := map[string][]*types.ProcessingQueueState{
cluster1: {
&history.ProcessingQueueState{
&types.ProcessingQueueState{
Level: common.Int32Ptr(level1),
AckLevel: common.Int64Ptr(ackLevel1),
MaxLevel: common.Int64Ptr(ackLevel1),
DomainFilter: domainFilter,
},
},
cluster2: {
&history.ProcessingQueueState{
&types.ProcessingQueueState{
Level: common.Int32Ptr(level2),
AckLevel: common.Int64Ptr(ackLevel2),
MaxLevel: common.Int64Ptr(ackLevel2),
DomainFilter: domainFilter,
},
},
}
return history.ProcessingQueueStates{StatesByCluster: processingQueueStateMap}
return types.ProcessingQueueStates{StatesByCluster: processingQueueStateMap}
}

func createTimerPQS(cluster1 string, level1 int32, ackLevel1 time.Time, cluster2 string, level2 int32, ackLevel2 time.Time) history.ProcessingQueueStates {
domainFilter := &history.DomainFilter{
func createTimerPQS(cluster1 string, level1 int32, ackLevel1 time.Time, cluster2 string, level2 int32, ackLevel2 time.Time) types.ProcessingQueueStates {
domainFilter := &types.DomainFilter{
DomainIDs: []string{},
ReverseMatch: common.BoolPtr(true),
}
processingQueueStateMap := map[string][]*history.ProcessingQueueState{
processingQueueStateMap := map[string][]*types.ProcessingQueueState{
cluster1: {
&history.ProcessingQueueState{
&types.ProcessingQueueState{
Level: common.Int32Ptr(level1),
AckLevel: common.Int64Ptr(ackLevel1.UnixNano()),
MaxLevel: common.Int64Ptr(ackLevel1.UnixNano()),
DomainFilter: domainFilter,
},
},
cluster2: {
&history.ProcessingQueueState{
&types.ProcessingQueueState{
Level: common.Int32Ptr(level2),
AckLevel: common.Int64Ptr(ackLevel2.UnixNano()),
MaxLevel: common.Int64Ptr(ackLevel2.UnixNano()),
DomainFilter: domainFilter,
},
},
}
return history.ProcessingQueueStates{StatesByCluster: processingQueueStateMap}
return types.ProcessingQueueStates{StatesByCluster: processingQueueStateMap}
}
15 changes: 7 additions & 8 deletions common/persistence/persistence-tests/persistenceTestBase.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
"github.com/stretchr/testify/suite"
"github.com/uber-go/tally"

"github.com/uber/cadence/.gen/go/history"
workflow "github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/backoff"
Expand Down Expand Up @@ -216,32 +215,32 @@ func (s *TestBase) Setup() {
s.ReadLevel = 0
s.ReplicationReadLevel = 0

domainFilter := &history.DomainFilter{
domainFilter := &types.DomainFilter{
DomainIDs: []string{},
ReverseMatch: common.BoolPtr(true),
}
transferPQSMap := map[string][]*history.ProcessingQueueState{
transferPQSMap := map[string][]*types.ProcessingQueueState{
s.ClusterMetadata.GetCurrentClusterName(): {
&history.ProcessingQueueState{
&types.ProcessingQueueState{
Level: common.Int32Ptr(0),
AckLevel: common.Int64Ptr(0),
MaxLevel: common.Int64Ptr(0),
DomainFilter: domainFilter,
},
},
}
transferPQS := history.ProcessingQueueStates{transferPQSMap}
timerPQSMap := map[string][]*history.ProcessingQueueState{
transferPQS := types.ProcessingQueueStates{transferPQSMap}
timerPQSMap := map[string][]*types.ProcessingQueueState{
s.ClusterMetadata.GetCurrentClusterName(): {
&history.ProcessingQueueState{
&types.ProcessingQueueState{
Level: common.Int32Ptr(0),
AckLevel: common.Int64Ptr(time.Now().UnixNano()),
MaxLevel: common.Int64Ptr(time.Now().UnixNano()),
DomainFilter: domainFilter,
},
},
}
timerPQS := history.ProcessingQueueStates{StatesByCluster: timerPQSMap}
timerPQS := types.ProcessingQueueStates{StatesByCluster: timerPQSMap}

s.ShardInfo = &p.ShardInfo{
ShardID: shardID,
Expand Down
13 changes: 7 additions & 6 deletions common/persistence/shardManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"context"

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/types/mapper/thrift"
)

type (
Expand Down Expand Up @@ -100,15 +101,15 @@ func (m *shardManager) toInternalShardInfo(shardInfo *ShardInfo) (*InternalShard
if shardInfo == nil {
return nil, nil
}
serializedTransferProcessingQueueStates, err := m.serializer.SerializeProcessingQueueStates(shardInfo.TransferProcessingQueueStates, common.EncodingTypeThriftRW)
serializedTransferProcessingQueueStates, err := m.serializer.SerializeProcessingQueueStates(thrift.FromProcessingQueueStates(shardInfo.TransferProcessingQueueStates), common.EncodingTypeThriftRW)
if err != nil {
return nil, err
}
serializedTimerProcessingQueueStates, err := m.serializer.SerializeProcessingQueueStates(shardInfo.TimerProcessingQueueStates, common.EncodingTypeThriftRW)
serializedTimerProcessingQueueStates, err := m.serializer.SerializeProcessingQueueStates(thrift.FromProcessingQueueStates(shardInfo.TimerProcessingQueueStates), common.EncodingTypeThriftRW)
if err != nil {
return nil, err
}
pendingFailoverMarker, err := m.serializer.SerializePendingFailoverMarkers(shardInfo.PendingFailoverMarkers, common.EncodingTypeThriftRW)
pendingFailoverMarker, err := m.serializer.SerializePendingFailoverMarkers(thrift.FromFailoverMarkerAttributesArray(shardInfo.PendingFailoverMarkers), common.EncodingTypeThriftRW)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -164,11 +165,11 @@ func (m *shardManager) fromInternalShardInfo(internalShardInfo *InternalShardInf
TimerAckLevel: internalShardInfo.TimerAckLevel,
ClusterTransferAckLevel: internalShardInfo.ClusterTransferAckLevel,
ClusterTimerAckLevel: internalShardInfo.ClusterTimerAckLevel,
TransferProcessingQueueStates: transferProcessingQueueStates,
TimerProcessingQueueStates: timerProcessingQueueStates,
TransferProcessingQueueStates: thrift.ToProcessingQueueStates(transferProcessingQueueStates),
TimerProcessingQueueStates: thrift.ToProcessingQueueStates(timerProcessingQueueStates),
ClusterReplicationLevel: internalShardInfo.ClusterReplicationLevel,
DomainNotificationVersion: internalShardInfo.DomainNotificationVersion,
PendingFailoverMarkers: pendingFailoverMarker,
PendingFailoverMarkers: thrift.ToFailoverMarkerAttributesArray(pendingFailoverMarker),
TransferFailoverLevels: internalShardInfo.TransferFailoverLevels,
TimerFailoverLevels: internalShardInfo.TimerFailoverLevels,
}, nil
Expand Down
21 changes: 10 additions & 11 deletions service/history/failover/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"sync/atomic"
"time"

"github.com/uber/cadence/.gen/go/replicator"
"github.com/uber/cadence/client/history"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/backoff"
Expand Down Expand Up @@ -59,8 +58,8 @@ type (
Coordinator interface {
common.Daemon

NotifyFailoverMarkers(shardID int32, markers []*replicator.FailoverMarkerAttributes)
ReceiveFailoverMarkers(shardIDs []int32, marker *replicator.FailoverMarkerAttributes)
NotifyFailoverMarkers(shardID int32, markers []*types.FailoverMarkerAttributes)
ReceiveFailoverMarkers(shardIDs []int32, marker *types.FailoverMarkerAttributes)
}

coordinatorImpl struct {
Expand All @@ -81,12 +80,12 @@ type (

notificationRequest struct {
shardID int32
markers []*replicator.FailoverMarkerAttributes
markers []*types.FailoverMarkerAttributes
}

receiveRequest struct {
shardIDs []int32
marker *replicator.FailoverMarkerAttributes
marker *types.FailoverMarkerAttributes
}

failoverRecord struct {
Expand Down Expand Up @@ -158,7 +157,7 @@ func (c *coordinatorImpl) Stop() {

func (c *coordinatorImpl) NotifyFailoverMarkers(
shardID int32,
markers []*replicator.FailoverMarkerAttributes,
markers []*types.FailoverMarkerAttributes,
) {

c.notificationChan <- &notificationRequest{
Expand All @@ -169,7 +168,7 @@ func (c *coordinatorImpl) NotifyFailoverMarkers(

func (c *coordinatorImpl) ReceiveFailoverMarkers(
shardIDs []int32,
marker *replicator.FailoverMarkerAttributes,
marker *types.FailoverMarkerAttributes,
) {

c.receiveChan <- &receiveRequest{
Expand Down Expand Up @@ -202,7 +201,7 @@ func (c *coordinatorImpl) notifyFailoverMarkerLoop() {
c.config.NotifyFailoverMarkerTimerJitterCoefficient(),
))
defer timer.Stop()
requestByMarker := make(map[*replicator.FailoverMarkerAttributes]*receiveRequest)
requestByMarker := make(map[*types.FailoverMarkerAttributes]*receiveRequest)

for {
select {
Expand Down Expand Up @@ -293,15 +292,15 @@ func (c *coordinatorImpl) cleanupInvalidMarkers() {
}

func (c *coordinatorImpl) notifyRemoteCoordinator(
requestByMarker map[*replicator.FailoverMarkerAttributes]*receiveRequest,
requestByMarker map[*types.FailoverMarkerAttributes]*receiveRequest,
) {

if len(requestByMarker) > 0 {
var tokens []*types.FailoverMarkerToken
for _, request := range requestByMarker {
tokens = append(tokens, &types.FailoverMarkerToken{
ShardIDs: request.shardIDs,
FailoverMarker: thrift.ToFailoverMarkerAttributes(request.marker),
FailoverMarker: request.marker,
})
}

Expand All @@ -325,7 +324,7 @@ func (c *coordinatorImpl) notifyRemoteCoordinator(

func aggregateNotificationRequests(
request *notificationRequest,
requestByMarker map[*replicator.FailoverMarkerAttributes]*receiveRequest,
requestByMarker map[*types.FailoverMarkerAttributes]*receiveRequest,
) {

for _, marker := range request.markers {
Expand Down
6 changes: 3 additions & 3 deletions service/history/failover/coordinator_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 32b167b

Please sign in to comment.