Skip to content

Commit

Permalink
Separating shard manager and shard store (cadence-workflow#3573)
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewjdawson2016 authored Oct 5, 2020
1 parent f952fd3 commit 99b59b9
Show file tree
Hide file tree
Showing 7 changed files with 266 additions and 21 deletions.
10 changes: 5 additions & 5 deletions common/persistence/cassandra/cassandraPersistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -987,7 +987,7 @@ func (d *cassandraPersistence) GetShardID() int {

func (d *cassandraPersistence) CreateShard(
_ context.Context,
request *p.CreateShardRequest,
request *p.InternalCreateShardRequest,
) error {
cqlNowTimestamp := p.UnixNanoToDBTimestamp(time.Now().UnixNano())
shardInfo := request.ShardInfo
Expand Down Expand Up @@ -1049,8 +1049,8 @@ func (d *cassandraPersistence) CreateShard(

func (d *cassandraPersistence) GetShard(
_ context.Context,
request *p.GetShardRequest,
) (*p.GetShardResponse, error) {
request *p.InternalGetShardRequest,
) (*p.InternalGetShardResponse, error) {
shardID := request.ShardID
query := d.session.Query(templateGetShardQuery,
shardID,
Expand Down Expand Up @@ -1080,12 +1080,12 @@ func (d *cassandraPersistence) GetShard(

info := createShardInfo(d.currentClusterName, result["shard"].(map[string]interface{}))

return &p.GetShardResponse{ShardInfo: info}, nil
return &p.InternalGetShardResponse{ShardInfo: info}, nil
}

func (d *cassandraPersistence) UpdateShard(
_ context.Context,
request *p.UpdateShardRequest,
request *p.InternalUpdateShardRequest,
) error {
cqlNowTimestamp := p.UnixNanoToDBTimestamp(time.Now().UnixNano())
shardInfo := request.ShardInfo
Expand Down
4 changes: 2 additions & 2 deletions common/persistence/cassandra/cassandraPersistenceUtil.go
Original file line number Diff line number Diff line change
Expand Up @@ -1582,15 +1582,15 @@ func updateBufferedEvents(
func createShardInfo(
currentCluster string,
result map[string]interface{},
) *p.ShardInfo {
) *p.InternalShardInfo {

var pendingFailoverMarkersRawData []byte
var pendingFailoverMarkersEncoding string
var transferProcessingQueueStatesRawData []byte
var transferProcessingQueueStatesEncoding string
var timerProcessingQueueStatesRawData []byte
var timerProcessingQueueStatesEncoding string
info := &p.ShardInfo{}
info := &p.InternalShardInfo{}
for k, v := range result {
switch k {
case "shard_id":
Expand Down
3 changes: 2 additions & 1 deletion common/persistence/client/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,10 +161,11 @@ func (f *factoryImpl) NewTaskManager() (p.TaskManager, error) {
// NewShardManager returns a new shard manager
func (f *factoryImpl) NewShardManager() (p.ShardManager, error) {
ds := f.datastores[storeTypeShard]
result, err := ds.factory.NewShardStore()
store, err := ds.factory.NewShardStore()
if err != nil {
return nil, err
}
result := p.NewShardManager(store)
if ds.ratelimit != nil {
result = p.NewShardPersistenceRateLimitedClient(result, ds.ratelimit, f.logger)
}
Expand Down
73 changes: 70 additions & 3 deletions common/persistence/persistenceInterface.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,17 @@ type (
// Persistence interface is a lower layer of dataInterface.
// The intention is to let different persistence implementation(SQL,Cassandra/etc) share some common logic
// Right now the only common part is serialization/deserialization, and only ExecutionManager/HistoryManager need it.
// ShardManager/TaskManager/MetadataManager are the same.
// TaskManager are the same.
//////////////////////////////////////////////////////////////////////

// ShardStore is a lower level of ShardManager
ShardStore = ShardManager
// ShardStore is the lower level of ShardManager
ShardStore interface {
Closeable
GetName() string
CreateShard(ctx context.Context, request *InternalCreateShardRequest) error
GetShard(ctx context.Context, request *InternalGetShardRequest) (*InternalGetShardResponse, error)
UpdateShard(ctx context.Context, request *InternalUpdateShardRequest) error
}
// TaskStore is a lower level of TaskManager
TaskStore = TaskManager
// MetadataStore is a lower level of MetadataManager
Expand Down Expand Up @@ -669,6 +675,67 @@ type (
Domains []*InternalGetDomainResponse
NextPageToken []byte
}

// InternalTransferFailoverLevel contains corresponding start / end level
InternalTransferFailoverLevel struct {
StartTime time.Time
MinLevel int64
CurrentLevel int64
MaxLevel int64
DomainIDs map[string]struct{}
}

// InternalTimerFailoverLevel contains domain IDs and corresponding start / end level
InternalTimerFailoverLevel struct {
StartTime time.Time
MinLevel time.Time
CurrentLevel time.Time
MaxLevel time.Time
DomainIDs map[string]struct{}
}

// InternalShardInfo describes a shard
InternalShardInfo struct {
ShardID int `json:"shard_id"`
Owner string `json:"owner"`
RangeID int64 `json:"range_id"`
StolenSinceRenew int `json:"stolen_since_renew"`
UpdatedAt time.Time `json:"updated_at"`
ReplicationAckLevel int64 `json:"replication_ack_level"`
ReplicationDLQAckLevel map[string]int64 `json:"replication_dlq_ack_level"`
TransferAckLevel int64 `json:"transfer_ack_level"`
TimerAckLevel time.Time `json:"timer_ack_level"`
ClusterTransferAckLevel map[string]int64 `json:"cluster_transfer_ack_level"`
ClusterTimerAckLevel map[string]time.Time `json:"cluster_timer_ack_level"`
TransferProcessingQueueStates *DataBlob `json:"transfer_processing_queue_states"`
TimerProcessingQueueStates *DataBlob `json:"timer_processing_queue_states"`
TransferFailoverLevels map[string]InternalTransferFailoverLevel // uuid -> TransferFailoverLevel
TimerFailoverLevels map[string]InternalTimerFailoverLevel // uuid -> TimerFailoverLevel
ClusterReplicationLevel map[string]int64 `json:"cluster_replication_level"`
DomainNotificationVersion int64 `json:"domain_notification_version"`
PendingFailoverMarkers *DataBlob `json:"pending_failover_markers"`
}

// InternalCreateShardRequest is request to CreateShard
InternalCreateShardRequest struct {
ShardInfo *InternalShardInfo
}

// InternalGetShardRequest is used to get shard information
InternalGetShardRequest struct {
ShardID int
}

// InternalUpdateShardRequest is used to update shard information
InternalUpdateShardRequest struct {
ShardInfo *InternalShardInfo
PreviousRangeID int64
}

// InternalGetShardResponse is the response to GetShard
InternalGetShardResponse struct {
ShardInfo *InternalShardInfo
}
)

// NewDataBlob returns a new DataBlob
Expand Down
176 changes: 176 additions & 0 deletions common/persistence/shardManager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
// The MIT License (MIT)
//
// Copyright (c) 2017-2020 Uber Technologies Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.

package persistence

import (
"context"
)

type (
shardManager struct {
persistence ShardStore
}
)

var _ ShardManager = (*shardManager)(nil)

// NewShardManager returns a new ShardManager
func NewShardManager(
persistence ShardStore,
) ShardManager {
return &shardManager{
persistence: persistence,
}
}

func (m *shardManager) GetName() string {
return m.persistence.GetName()
}

func (m *shardManager) Close() {
m.persistence.Close()
}

func (m *shardManager) CreateShard(ctx context.Context, request *CreateShardRequest) error {
internalRequest := &InternalCreateShardRequest{
ShardInfo: m.toInternalShardInfo(request.ShardInfo),
}
return m.persistence.CreateShard(ctx, internalRequest)
}

func (m *shardManager) GetShard(ctx context.Context, request *GetShardRequest) (*GetShardResponse, error) {
internalRequest := &InternalGetShardRequest{
ShardID: request.ShardID,
}
internalResult, err := m.persistence.GetShard(ctx, internalRequest)
if err != nil {
return nil, err
}
result := &GetShardResponse{
ShardInfo: m.fromInternalShardInfo(internalResult.ShardInfo),
}
return result, nil
}

func (m *shardManager) UpdateShard(ctx context.Context, request *UpdateShardRequest) error {
internalRequest := &InternalUpdateShardRequest{
ShardInfo: m.toInternalShardInfo(request.ShardInfo),
PreviousRangeID: request.PreviousRangeID,
}
return m.persistence.UpdateShard(ctx, internalRequest)
}

func (m *shardManager) toInternalShardInfo(shardInfo *ShardInfo) *InternalShardInfo {
internalShardInfo := &InternalShardInfo{
ShardID: shardInfo.ShardID,
Owner: shardInfo.Owner,
RangeID: shardInfo.RangeID,
StolenSinceRenew: shardInfo.StolenSinceRenew,
UpdatedAt: shardInfo.UpdatedAt,
ReplicationAckLevel: shardInfo.ReplicationAckLevel,
ReplicationDLQAckLevel: shardInfo.ReplicationDLQAckLevel,
TransferAckLevel: shardInfo.TransferAckLevel,
TimerAckLevel: shardInfo.TimerAckLevel,
ClusterTransferAckLevel: shardInfo.ClusterTransferAckLevel,
ClusterTimerAckLevel: shardInfo.ClusterTimerAckLevel,
TransferProcessingQueueStates: shardInfo.TransferProcessingQueueStates,
TimerProcessingQueueStates: shardInfo.TimerProcessingQueueStates,
ClusterReplicationLevel: shardInfo.ClusterReplicationLevel,
DomainNotificationVersion: shardInfo.DomainNotificationVersion,
PendingFailoverMarkers: shardInfo.PendingFailoverMarkers,
}
if shardInfo.TransferFailoverLevels != nil {
internalShardInfo.TransferFailoverLevels = make(map[string]InternalTransferFailoverLevel)
for k, v := range shardInfo.TransferFailoverLevels {
internalShardInfo.TransferFailoverLevels[k] = InternalTransferFailoverLevel{
StartTime: v.StartTime,
MinLevel: v.MinLevel,
CurrentLevel: v.CurrentLevel,
MaxLevel: v.MaxLevel,
DomainIDs: v.DomainIDs,
}
}
}

if shardInfo.TimerFailoverLevels != nil {
internalShardInfo.TimerFailoverLevels = make(map[string]InternalTimerFailoverLevel)
for k, v := range shardInfo.TimerFailoverLevels {
internalShardInfo.TimerFailoverLevels[k] = InternalTimerFailoverLevel{
StartTime: v.StartTime,
MinLevel: v.MinLevel,
CurrentLevel: v.CurrentLevel,
MaxLevel: v.MaxLevel,
DomainIDs: v.DomainIDs,
}
}
}

return internalShardInfo
}

func (m *shardManager) fromInternalShardInfo(internalShardInfo *InternalShardInfo) *ShardInfo {
shardInfo := &ShardInfo{
ShardID: internalShardInfo.ShardID,
Owner: internalShardInfo.Owner,
RangeID: internalShardInfo.RangeID,
StolenSinceRenew: internalShardInfo.StolenSinceRenew,
UpdatedAt: internalShardInfo.UpdatedAt,
ReplicationAckLevel: internalShardInfo.ReplicationAckLevel,
ReplicationDLQAckLevel: internalShardInfo.ReplicationDLQAckLevel,
TransferAckLevel: internalShardInfo.TransferAckLevel,
TimerAckLevel: internalShardInfo.TimerAckLevel,
ClusterTransferAckLevel: internalShardInfo.ClusterTransferAckLevel,
ClusterTimerAckLevel: internalShardInfo.ClusterTimerAckLevel,
TransferProcessingQueueStates: internalShardInfo.TransferProcessingQueueStates,
TimerProcessingQueueStates: internalShardInfo.TimerProcessingQueueStates,
ClusterReplicationLevel: internalShardInfo.ClusterReplicationLevel,
DomainNotificationVersion: internalShardInfo.DomainNotificationVersion,
PendingFailoverMarkers: internalShardInfo.PendingFailoverMarkers,
}
if internalShardInfo.TransferFailoverLevels != nil {
shardInfo.TransferFailoverLevels = make(map[string]TransferFailoverLevel)
for k, v := range internalShardInfo.TransferFailoverLevels {
shardInfo.TransferFailoverLevels[k] = TransferFailoverLevel{
StartTime: v.StartTime,
MinLevel: v.MinLevel,
CurrentLevel: v.CurrentLevel,
MaxLevel: v.MaxLevel,
DomainIDs: v.DomainIDs,
}
}
}

if internalShardInfo.TimerFailoverLevels != nil {
shardInfo.TimerFailoverLevels = make(map[string]TimerFailoverLevel)
for k, v := range internalShardInfo.TimerFailoverLevels {
shardInfo.TimerFailoverLevels[k] = TimerFailoverLevel{
StartTime: v.StartTime,
MinLevel: v.MinLevel,
CurrentLevel: v.CurrentLevel,
MaxLevel: v.MaxLevel,
DomainIDs: v.DomainIDs,
}
}
}
return shardInfo
}
18 changes: 9 additions & 9 deletions common/persistence/sql/sqlShardManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,13 @@ type sqlShardManager struct {
currentClusterName string
}

// newShardPersistence creates an instance of ShardManager
// newShardPersistence creates an instance of ShardStore
func newShardPersistence(
db sqlplugin.DB,
currentClusterName string,
log log.Logger,
parser serialization.Parser,
) (persistence.ShardManager, error) {
) (persistence.ShardStore, error) {
return &sqlShardManager{
sqlStore: sqlStore{
db: db,
Expand All @@ -60,9 +60,9 @@ func newShardPersistence(

func (m *sqlShardManager) CreateShard(
ctx context.Context,
request *persistence.CreateShardRequest,
request *persistence.InternalCreateShardRequest,
) error {
if _, err := m.GetShard(ctx, &persistence.GetShardRequest{
if _, err := m.GetShard(ctx, &persistence.InternalGetShardRequest{
ShardID: request.ShardInfo.ShardID,
}); err == nil {
return &persistence.ShardAlreadyExistError{
Expand All @@ -88,8 +88,8 @@ func (m *sqlShardManager) CreateShard(

func (m *sqlShardManager) GetShard(
ctx context.Context,
request *persistence.GetShardRequest,
) (*persistence.GetShardResponse, error) {
request *persistence.InternalGetShardRequest,
) (*persistence.InternalGetShardResponse, error) {
row, err := m.db.SelectFromShards(ctx, &sqlplugin.ShardsFilter{ShardID: int64(request.ShardID)})
if err != nil {
if err == sql.ErrNoRows {
Expand Down Expand Up @@ -147,7 +147,7 @@ func (m *sqlShardManager) GetShard(
}
}

resp := &persistence.GetShardResponse{ShardInfo: &persistence.ShardInfo{
resp := &persistence.InternalGetShardResponse{ShardInfo: &persistence.InternalShardInfo{
ShardID: int(row.ShardID),
RangeID: row.RangeID,
Owner: shardInfo.GetOwner(),
Expand All @@ -170,7 +170,7 @@ func (m *sqlShardManager) GetShard(

func (m *sqlShardManager) UpdateShard(
ctx context.Context,
request *persistence.UpdateShardRequest,
request *persistence.InternalUpdateShardRequest,
) error {
row, err := shardInfoToShardsRow(*request.ShardInfo, m.parser)
if err != nil {
Expand Down Expand Up @@ -244,7 +244,7 @@ func readLockShard(ctx context.Context, tx sqlplugin.Tx, shardID int, oldRangeID
return nil
}

func shardInfoToShardsRow(s persistence.ShardInfo, parser serialization.Parser) (*sqlplugin.ShardsRow, error) {
func shardInfoToShardsRow(s persistence.InternalShardInfo, parser serialization.Parser) (*sqlplugin.ShardsRow, error) {
timerAckLevels := make(map[string]int64, len(s.ClusterTimerAckLevel))
for k, v := range s.ClusterTimerAckLevel {
timerAckLevels[k] = v.UnixNano()
Expand Down
Loading

0 comments on commit 99b59b9

Please sign in to comment.