Skip to content

Commit

Permalink
Small update to shard store store manager separation (cadence-workflo…
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewjdawson2016 authored Oct 6, 2020
1 parent 64e81b6 commit 2aba341
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 91 deletions.
54 changes: 18 additions & 36 deletions common/persistence/persistenceInterface.go
Original file line number Diff line number Diff line change
Expand Up @@ -676,44 +676,26 @@ type (
NextPageToken []byte
}

// InternalTransferFailoverLevel contains corresponding start / end level
InternalTransferFailoverLevel struct {
StartTime time.Time
MinLevel int64
CurrentLevel int64
MaxLevel int64
DomainIDs map[string]struct{}
}

// InternalTimerFailoverLevel contains domain IDs and corresponding start / end level
InternalTimerFailoverLevel struct {
StartTime time.Time
MinLevel time.Time
CurrentLevel time.Time
MaxLevel time.Time
DomainIDs map[string]struct{}
}

// InternalShardInfo describes a shard
InternalShardInfo struct {
ShardID int `json:"shard_id"`
Owner string `json:"owner"`
RangeID int64 `json:"range_id"`
StolenSinceRenew int `json:"stolen_since_renew"`
UpdatedAt time.Time `json:"updated_at"`
ReplicationAckLevel int64 `json:"replication_ack_level"`
ReplicationDLQAckLevel map[string]int64 `json:"replication_dlq_ack_level"`
TransferAckLevel int64 `json:"transfer_ack_level"`
TimerAckLevel time.Time `json:"timer_ack_level"`
ClusterTransferAckLevel map[string]int64 `json:"cluster_transfer_ack_level"`
ClusterTimerAckLevel map[string]time.Time `json:"cluster_timer_ack_level"`
TransferProcessingQueueStates *DataBlob `json:"transfer_processing_queue_states"`
TimerProcessingQueueStates *DataBlob `json:"timer_processing_queue_states"`
TransferFailoverLevels map[string]InternalTransferFailoverLevel // uuid -> TransferFailoverLevel
TimerFailoverLevels map[string]InternalTimerFailoverLevel // uuid -> TimerFailoverLevel
ClusterReplicationLevel map[string]int64 `json:"cluster_replication_level"`
DomainNotificationVersion int64 `json:"domain_notification_version"`
PendingFailoverMarkers *DataBlob `json:"pending_failover_markers"`
ShardID int `json:"shard_id"`
Owner string `json:"owner"`
RangeID int64 `json:"range_id"`
StolenSinceRenew int `json:"stolen_since_renew"`
UpdatedAt time.Time `json:"updated_at"`
ReplicationAckLevel int64 `json:"replication_ack_level"`
ReplicationDLQAckLevel map[string]int64 `json:"replication_dlq_ack_level"`
TransferAckLevel int64 `json:"transfer_ack_level"`
TimerAckLevel time.Time `json:"timer_ack_level"`
ClusterTransferAckLevel map[string]int64 `json:"cluster_transfer_ack_level"`
ClusterTimerAckLevel map[string]time.Time `json:"cluster_timer_ack_level"`
TransferProcessingQueueStates *DataBlob `json:"transfer_processing_queue_states"`
TimerProcessingQueueStates *DataBlob `json:"timer_processing_queue_states"`
TransferFailoverLevels map[string]TransferFailoverLevel // uuid -> TransferFailoverLevel
TimerFailoverLevels map[string]TimerFailoverLevel // uuid -> TimerFailoverLevel
ClusterReplicationLevel map[string]int64 `json:"cluster_replication_level"`
DomainNotificationVersion int64 `json:"domain_notification_version"`
PendingFailoverMarkers *DataBlob `json:"pending_failover_markers"`
}

// InternalCreateShardRequest is request to CreateShard
Expand Down
67 changes: 12 additions & 55 deletions common/persistence/shardManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,10 @@ func (m *shardManager) UpdateShard(ctx context.Context, request *UpdateShardRequ
}

func (m *shardManager) toInternalShardInfo(shardInfo *ShardInfo) *InternalShardInfo {
internalShardInfo := &InternalShardInfo{
if shardInfo == nil {
return nil
}
return &InternalShardInfo{
ShardID: shardInfo.ShardID,
Owner: shardInfo.Owner,
RangeID: shardInfo.RangeID,
Expand All @@ -98,38 +101,16 @@ func (m *shardManager) toInternalShardInfo(shardInfo *ShardInfo) *InternalShardI
ClusterReplicationLevel: shardInfo.ClusterReplicationLevel,
DomainNotificationVersion: shardInfo.DomainNotificationVersion,
PendingFailoverMarkers: shardInfo.PendingFailoverMarkers,
TransferFailoverLevels: shardInfo.TransferFailoverLevels,
TimerFailoverLevels: shardInfo.TimerFailoverLevels,
}
if shardInfo.TransferFailoverLevels != nil {
internalShardInfo.TransferFailoverLevels = make(map[string]InternalTransferFailoverLevel)
for k, v := range shardInfo.TransferFailoverLevels {
internalShardInfo.TransferFailoverLevels[k] = InternalTransferFailoverLevel{
StartTime: v.StartTime,
MinLevel: v.MinLevel,
CurrentLevel: v.CurrentLevel,
MaxLevel: v.MaxLevel,
DomainIDs: v.DomainIDs,
}
}
}

if shardInfo.TimerFailoverLevels != nil {
internalShardInfo.TimerFailoverLevels = make(map[string]InternalTimerFailoverLevel)
for k, v := range shardInfo.TimerFailoverLevels {
internalShardInfo.TimerFailoverLevels[k] = InternalTimerFailoverLevel{
StartTime: v.StartTime,
MinLevel: v.MinLevel,
CurrentLevel: v.CurrentLevel,
MaxLevel: v.MaxLevel,
DomainIDs: v.DomainIDs,
}
}
}

return internalShardInfo
}

func (m *shardManager) fromInternalShardInfo(internalShardInfo *InternalShardInfo) *ShardInfo {
shardInfo := &ShardInfo{
if internalShardInfo == nil {
return nil
}
return &ShardInfo{
ShardID: internalShardInfo.ShardID,
Owner: internalShardInfo.Owner,
RangeID: internalShardInfo.RangeID,
Expand All @@ -146,31 +127,7 @@ func (m *shardManager) fromInternalShardInfo(internalShardInfo *InternalShardInf
ClusterReplicationLevel: internalShardInfo.ClusterReplicationLevel,
DomainNotificationVersion: internalShardInfo.DomainNotificationVersion,
PendingFailoverMarkers: internalShardInfo.PendingFailoverMarkers,
TransferFailoverLevels: internalShardInfo.TransferFailoverLevels,
TimerFailoverLevels: internalShardInfo.TimerFailoverLevels,
}
if internalShardInfo.TransferFailoverLevels != nil {
shardInfo.TransferFailoverLevels = make(map[string]TransferFailoverLevel)
for k, v := range internalShardInfo.TransferFailoverLevels {
shardInfo.TransferFailoverLevels[k] = TransferFailoverLevel{
StartTime: v.StartTime,
MinLevel: v.MinLevel,
CurrentLevel: v.CurrentLevel,
MaxLevel: v.MaxLevel,
DomainIDs: v.DomainIDs,
}
}
}

if internalShardInfo.TimerFailoverLevels != nil {
shardInfo.TimerFailoverLevels = make(map[string]TimerFailoverLevel)
for k, v := range internalShardInfo.TimerFailoverLevels {
shardInfo.TimerFailoverLevels[k] = TimerFailoverLevel{
StartTime: v.StartTime,
MinLevel: v.MinLevel,
CurrentLevel: v.CurrentLevel,
MaxLevel: v.MaxLevel,
DomainIDs: v.DomainIDs,
}
}
}
return shardInfo
}

0 comments on commit 2aba341

Please sign in to comment.