Skip to content

Commit

Permalink
Add persistence encoding and decoder interfaces (cadence-workflow#3534)
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewjdawson2016 authored Sep 24, 2020
1 parent 392da7f commit b2464b7
Show file tree
Hide file tree
Showing 27 changed files with 1,217 additions and 292 deletions.
1 change: 1 addition & 0 deletions common/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ const (
EncodingTypeGob EncodingType = "gob"
EncodingTypeUnknown EncodingType = "unknow"
EncodingTypeEmpty EncodingType = ""
EncodingTypeProto EncodingType = "proto3"
)

type (
Expand Down
33 changes: 31 additions & 2 deletions common/persistence/client/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ package client
import (
"sync"

"github.com/uber/cadence/common/log/tag"

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/persistence/serialization"

"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/metrics"
p "github.com/uber/cadence/common/persistence"
Expand Down Expand Up @@ -297,7 +302,15 @@ func (f *factoryImpl) init(clusterName string, limiters map[string]quotas.Limite
case defaultCfg.Cassandra != nil:
defaultDataStore.factory = cassandra.NewFactory(*defaultCfg.Cassandra, clusterName, f.logger)
case defaultCfg.SQL != nil:
defaultDataStore.factory = sql.NewFactory(*defaultCfg.SQL, clusterName, f.logger)
var decodingTypes []common.EncodingType
for _, dt := range defaultCfg.SQL.DecodingTypes {
decodingTypes = append(decodingTypes, common.EncodingType(dt))
}
defaultDataStore.factory = sql.NewFactory(
*defaultCfg.SQL,
clusterName,
f.logger,
getSQLParser(f.logger, common.EncodingType(defaultCfg.SQL.EncodingType), decodingTypes...))
case defaultCfg.CustomDataStoreConfig != nil:
defaultDataStore.factory = f.abstractDataStoreFactory.NewFactory(*defaultCfg.CustomDataStoreConfig, clusterName, f.logger)
default:
Expand All @@ -316,14 +329,30 @@ func (f *factoryImpl) init(clusterName string, limiters map[string]quotas.Limite
case visibilityCfg.Cassandra != nil:
visibilityDataStore.factory = cassandra.NewFactory(*visibilityCfg.Cassandra, clusterName, f.logger)
case visibilityCfg.SQL != nil:
visibilityDataStore.factory = sql.NewFactory(*visibilityCfg.SQL, clusterName, f.logger)
var decodingTypes []common.EncodingType
for _, dt := range visibilityCfg.SQL.DecodingTypes {
decodingTypes = append(decodingTypes, common.EncodingType(dt))
}
visibilityDataStore.factory = sql.NewFactory(
*visibilityCfg.SQL,
clusterName,
f.logger,
getSQLParser(f.logger, common.EncodingType(visibilityCfg.SQL.EncodingType), decodingTypes...))
default:
f.logger.Fatal("invalid config: one of cassandra or sql params must be specified")
}

f.datastores[storeTypeVisibility] = visibilityDataStore
}

func getSQLParser(logger log.Logger, encodingType common.EncodingType, decodingTypes ...common.EncodingType) serialization.Parser {
parser, err := serialization.NewParser(encodingType, decodingTypes...)
if err != nil {
logger.Fatal("failed to construct sql parser", tag.Error(err))
}
return parser
}

func buildRatelimiters(cfg *config.Persistence, maxQPS dynamicconfig.IntPropertyFn) map[string]quotas.Limiter {
result := make(map[string]quotas.Limiter, len(cfg.DataStores))
for dsName := range cfg.DataStores {
Expand Down
110 changes: 110 additions & 0 deletions common/persistence/serialization/interfaces.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
// The MIT License (MIT)
//
// Copyright (c) 2017-2020 Uber Technologies Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.

package serialization

import (
"go.uber.org/thriftrw/wire"

"github.com/uber/cadence/.gen/go/sqlblobs"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/persistence"
)

type (
// Parser is used to do serialization and deserialization. A parser is backed by a
// a single encoder which encodes into one format and a collection of decoders.
// Parser selects the appropriate decoder for the provided blob.
Parser interface {
ShardInfoToBlob(*sqlblobs.ShardInfo) (persistence.DataBlob, error)
DomainInfoToBlob(*sqlblobs.DomainInfo) (persistence.DataBlob, error)
HistoryTreeInfoToBlob(*sqlblobs.HistoryTreeInfo) (persistence.DataBlob, error)
WorkflowExecutionInfoToBlob(*sqlblobs.WorkflowExecutionInfo) (persistence.DataBlob, error)
ActivityInfoToBlob(*sqlblobs.ActivityInfo) (persistence.DataBlob, error)
ChildExecutionInfoToBlob(*sqlblobs.ChildExecutionInfo) (persistence.DataBlob, error)
SignalInfoToBlob(*sqlblobs.SignalInfo) (persistence.DataBlob, error)
RequestCancelInfoToBlob(*sqlblobs.RequestCancelInfo) (persistence.DataBlob, error)
TimerInfoToBlob(*sqlblobs.TimerInfo) (persistence.DataBlob, error)
TaskInfoToBlob(*sqlblobs.TaskInfo) (persistence.DataBlob, error)
TaskListInfoToBlob(*sqlblobs.TaskListInfo) (persistence.DataBlob, error)
TransferTaskInfoToBlob(*sqlblobs.TransferTaskInfo) (persistence.DataBlob, error)
TimerTaskInfoToBlob(*sqlblobs.TimerTaskInfo) (persistence.DataBlob, error)
ReplicationTaskInfoToBlob(*sqlblobs.ReplicationTaskInfo) (persistence.DataBlob, error)

ShardInfoFromBlob([]byte, string) (*sqlblobs.ShardInfo, error)
DomainInfoFromBlob([]byte, string) (*sqlblobs.DomainInfo, error)
HistoryTreeInfoFromBlob([]byte, string) (*sqlblobs.HistoryTreeInfo, error)
WorkflowExecutionInfoFromBlob([]byte, string) (*sqlblobs.WorkflowExecutionInfo, error)
ActivityInfoFromBlob([]byte, string) (*sqlblobs.ActivityInfo, error)
ChildExecutionInfoFromBlob([]byte, string) (*sqlblobs.ChildExecutionInfo, error)
SignalInfoFromBlob([]byte, string) (*sqlblobs.SignalInfo, error)
RequestCancelInfoFromBlob([]byte, string) (*sqlblobs.RequestCancelInfo, error)
TimerInfoFromBlob([]byte, string) (*sqlblobs.TimerInfo, error)
TaskInfoFromBlob([]byte, string) (*sqlblobs.TaskInfo, error)
TaskListInfoFromBlob([]byte, string) (*sqlblobs.TaskListInfo, error)
TransferTaskInfoFromBlob([]byte, string) (*sqlblobs.TransferTaskInfo, error)
TimerTaskInfoFromBlob([]byte, string) (*sqlblobs.TimerTaskInfo, error)
ReplicationTaskInfoFromBlob([]byte, string) (*sqlblobs.ReplicationTaskInfo, error)
}

// encoder is used to serialize structs. Each encoder implementation uses one serialization format.
encoder interface {
shardInfoToBlob(*sqlblobs.ShardInfo) ([]byte, error)
domainInfoToBlob(*sqlblobs.DomainInfo) ([]byte, error)
historyTreeInfoToBlob(*sqlblobs.HistoryTreeInfo) ([]byte, error)
workflowExecutionInfoToBlob(*sqlblobs.WorkflowExecutionInfo) ([]byte, error)
activityInfoToBlob(*sqlblobs.ActivityInfo) ([]byte, error)
childExecutionInfoToBlob(*sqlblobs.ChildExecutionInfo) ([]byte, error)
signalInfoToBlob(*sqlblobs.SignalInfo) ([]byte, error)
requestCancelInfoToBlob(*sqlblobs.RequestCancelInfo) ([]byte, error)
timerInfoToBlob(*sqlblobs.TimerInfo) ([]byte, error)
taskInfoToBlob(*sqlblobs.TaskInfo) ([]byte, error)
taskListInfoToBlob(*sqlblobs.TaskListInfo) ([]byte, error)
transferTaskInfoToBlob(*sqlblobs.TransferTaskInfo) ([]byte, error)
timerTaskInfoToBlob(*sqlblobs.TimerTaskInfo) ([]byte, error)
replicationTaskInfoToBlob(*sqlblobs.ReplicationTaskInfo) ([]byte, error)
encodingType() common.EncodingType
}

// decoder is used to deserialize structs. Each decoder implementation uses one serialization format.
decoder interface {
shardInfoFromBlob([]byte) (*sqlblobs.ShardInfo, error)
domainInfoFromBlob([]byte) (*sqlblobs.DomainInfo, error)
historyTreeInfoFromBlob([]byte) (*sqlblobs.HistoryTreeInfo, error)
workflowExecutionInfoFromBlob([]byte) (*sqlblobs.WorkflowExecutionInfo, error)
activityInfoFromBlob([]byte) (*sqlblobs.ActivityInfo, error)
childExecutionInfoFromBlob([]byte) (*sqlblobs.ChildExecutionInfo, error)
signalInfoFromBlob([]byte) (*sqlblobs.SignalInfo, error)
requestCancelInfoFromBlob([]byte) (*sqlblobs.RequestCancelInfo, error)
timerInfoFromBlob([]byte) (*sqlblobs.TimerInfo, error)
taskInfoFromBlob([]byte) (*sqlblobs.TaskInfo, error)
taskListInfoFromBlob([]byte) (*sqlblobs.TaskListInfo, error)
transferTaskInfoFromBlob([]byte) (*sqlblobs.TransferTaskInfo, error)
timerTaskInfoFromBlob([]byte) (*sqlblobs.TimerTaskInfo, error)
replicationTaskInfoFromBlob([]byte) (*sqlblobs.ReplicationTaskInfo, error)
}

thriftRWType interface {
ToWire() (wire.Value, error)
FromWire(w wire.Value) error
}
)
Loading

0 comments on commit b2464b7

Please sign in to comment.