From b2464b709fc91c453b2e21c7092e658f6112b76f Mon Sep 17 00:00:00 2001 From: Andrew Dawson Date: Wed, 23 Sep 2020 19:40:35 -0700 Subject: [PATCH] Add persistence encoding and decoder interfaces (#3534) --- common/constants.go | 1 + common/persistence/client/factory.go | 33 +- .../persistence/serialization/interfaces.go | 110 ++++++ common/persistence/serialization/parser.go | 358 ++++++++++++++++++ .../persistence/serialization/parser_test.go | 47 +++ .../serialization/proto_decoder.go | 91 +++++ .../serialization/proto_encoder.go | 94 +++++ .../serialization/thrift_decoder.go | 160 ++++++++ .../serialization/thrift_encoder.go | 110 ++++++ common/persistence/sql/blob.go | 211 ----------- common/persistence/sql/common.go | 3 + common/persistence/sql/factory.go | 21 +- common/persistence/sql/sqlExecutionManager.go | 45 ++- .../sql/sqlExecutionManagerUtil.go | 86 +++-- common/persistence/sql/sqlHistoryManager.go | 10 +- .../persistence/sql/sqlMetadataManagerV2.go | 10 +- common/persistence/sql/sqlPersistenceTest.go | 2 + common/persistence/sql/sqlShardManager.go | 20 +- common/persistence/sql/sqlTaskManager.go | 26 +- .../sql/sqlplugin/mysql/dsn_test.go | 10 + common/persistence/sql/workflowStateMaps.go | 32 +- common/service/config/config.go | 5 + config/development_mysql.yaml | 4 + config/development_postgres.yaml | 4 + docker/config_template.yaml | 8 + tools/sql/clitest/connTest.go | 4 + tools/sql/clitest/versionTest.go | 4 + 27 files changed, 1217 insertions(+), 292 deletions(-) create mode 100644 common/persistence/serialization/interfaces.go create mode 100644 common/persistence/serialization/parser.go create mode 100644 common/persistence/serialization/parser_test.go create mode 100644 common/persistence/serialization/proto_decoder.go create mode 100644 common/persistence/serialization/proto_encoder.go create mode 100644 common/persistence/serialization/thrift_decoder.go create mode 100644 common/persistence/serialization/thrift_encoder.go delete mode 100644 common/persistence/sql/blob.go diff --git a/common/constants.go b/common/constants.go index 0fae398f053..42d5620feb0 100644 --- a/common/constants.go +++ b/common/constants.go @@ -74,6 +74,7 @@ const ( EncodingTypeGob EncodingType = "gob" EncodingTypeUnknown EncodingType = "unknow" EncodingTypeEmpty EncodingType = "" + EncodingTypeProto EncodingType = "proto3" ) type ( diff --git a/common/persistence/client/factory.go b/common/persistence/client/factory.go index caaaa9b074b..36f26e02c55 100644 --- a/common/persistence/client/factory.go +++ b/common/persistence/client/factory.go @@ -23,6 +23,11 @@ package client import ( "sync" + "github.com/uber/cadence/common/log/tag" + + "github.com/uber/cadence/common" + "github.com/uber/cadence/common/persistence/serialization" + "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/metrics" p "github.com/uber/cadence/common/persistence" @@ -297,7 +302,15 @@ func (f *factoryImpl) init(clusterName string, limiters map[string]quotas.Limite case defaultCfg.Cassandra != nil: defaultDataStore.factory = cassandra.NewFactory(*defaultCfg.Cassandra, clusterName, f.logger) case defaultCfg.SQL != nil: - defaultDataStore.factory = sql.NewFactory(*defaultCfg.SQL, clusterName, f.logger) + var decodingTypes []common.EncodingType + for _, dt := range defaultCfg.SQL.DecodingTypes { + decodingTypes = append(decodingTypes, common.EncodingType(dt)) + } + defaultDataStore.factory = sql.NewFactory( + *defaultCfg.SQL, + clusterName, + f.logger, + getSQLParser(f.logger, common.EncodingType(defaultCfg.SQL.EncodingType), decodingTypes...)) case defaultCfg.CustomDataStoreConfig != nil: defaultDataStore.factory = f.abstractDataStoreFactory.NewFactory(*defaultCfg.CustomDataStoreConfig, clusterName, f.logger) default: @@ -316,7 +329,15 @@ func (f *factoryImpl) init(clusterName string, limiters map[string]quotas.Limite case visibilityCfg.Cassandra != nil: visibilityDataStore.factory = cassandra.NewFactory(*visibilityCfg.Cassandra, clusterName, f.logger) case visibilityCfg.SQL != nil: - visibilityDataStore.factory = sql.NewFactory(*visibilityCfg.SQL, clusterName, f.logger) + var decodingTypes []common.EncodingType + for _, dt := range visibilityCfg.SQL.DecodingTypes { + decodingTypes = append(decodingTypes, common.EncodingType(dt)) + } + visibilityDataStore.factory = sql.NewFactory( + *visibilityCfg.SQL, + clusterName, + f.logger, + getSQLParser(f.logger, common.EncodingType(visibilityCfg.SQL.EncodingType), decodingTypes...)) default: f.logger.Fatal("invalid config: one of cassandra or sql params must be specified") } @@ -324,6 +345,14 @@ func (f *factoryImpl) init(clusterName string, limiters map[string]quotas.Limite f.datastores[storeTypeVisibility] = visibilityDataStore } +func getSQLParser(logger log.Logger, encodingType common.EncodingType, decodingTypes ...common.EncodingType) serialization.Parser { + parser, err := serialization.NewParser(encodingType, decodingTypes...) + if err != nil { + logger.Fatal("failed to construct sql parser", tag.Error(err)) + } + return parser +} + func buildRatelimiters(cfg *config.Persistence, maxQPS dynamicconfig.IntPropertyFn) map[string]quotas.Limiter { result := make(map[string]quotas.Limiter, len(cfg.DataStores)) for dsName := range cfg.DataStores { diff --git a/common/persistence/serialization/interfaces.go b/common/persistence/serialization/interfaces.go new file mode 100644 index 00000000000..af1d462ae1a --- /dev/null +++ b/common/persistence/serialization/interfaces.go @@ -0,0 +1,110 @@ +// The MIT License (MIT) +// +// Copyright (c) 2017-2020 Uber Technologies Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package serialization + +import ( + "go.uber.org/thriftrw/wire" + + "github.com/uber/cadence/.gen/go/sqlblobs" + "github.com/uber/cadence/common" + "github.com/uber/cadence/common/persistence" +) + +type ( + // Parser is used to do serialization and deserialization. A parser is backed by a + // a single encoder which encodes into one format and a collection of decoders. + // Parser selects the appropriate decoder for the provided blob. + Parser interface { + ShardInfoToBlob(*sqlblobs.ShardInfo) (persistence.DataBlob, error) + DomainInfoToBlob(*sqlblobs.DomainInfo) (persistence.DataBlob, error) + HistoryTreeInfoToBlob(*sqlblobs.HistoryTreeInfo) (persistence.DataBlob, error) + WorkflowExecutionInfoToBlob(*sqlblobs.WorkflowExecutionInfo) (persistence.DataBlob, error) + ActivityInfoToBlob(*sqlblobs.ActivityInfo) (persistence.DataBlob, error) + ChildExecutionInfoToBlob(*sqlblobs.ChildExecutionInfo) (persistence.DataBlob, error) + SignalInfoToBlob(*sqlblobs.SignalInfo) (persistence.DataBlob, error) + RequestCancelInfoToBlob(*sqlblobs.RequestCancelInfo) (persistence.DataBlob, error) + TimerInfoToBlob(*sqlblobs.TimerInfo) (persistence.DataBlob, error) + TaskInfoToBlob(*sqlblobs.TaskInfo) (persistence.DataBlob, error) + TaskListInfoToBlob(*sqlblobs.TaskListInfo) (persistence.DataBlob, error) + TransferTaskInfoToBlob(*sqlblobs.TransferTaskInfo) (persistence.DataBlob, error) + TimerTaskInfoToBlob(*sqlblobs.TimerTaskInfo) (persistence.DataBlob, error) + ReplicationTaskInfoToBlob(*sqlblobs.ReplicationTaskInfo) (persistence.DataBlob, error) + + ShardInfoFromBlob([]byte, string) (*sqlblobs.ShardInfo, error) + DomainInfoFromBlob([]byte, string) (*sqlblobs.DomainInfo, error) + HistoryTreeInfoFromBlob([]byte, string) (*sqlblobs.HistoryTreeInfo, error) + WorkflowExecutionInfoFromBlob([]byte, string) (*sqlblobs.WorkflowExecutionInfo, error) + ActivityInfoFromBlob([]byte, string) (*sqlblobs.ActivityInfo, error) + ChildExecutionInfoFromBlob([]byte, string) (*sqlblobs.ChildExecutionInfo, error) + SignalInfoFromBlob([]byte, string) (*sqlblobs.SignalInfo, error) + RequestCancelInfoFromBlob([]byte, string) (*sqlblobs.RequestCancelInfo, error) + TimerInfoFromBlob([]byte, string) (*sqlblobs.TimerInfo, error) + TaskInfoFromBlob([]byte, string) (*sqlblobs.TaskInfo, error) + TaskListInfoFromBlob([]byte, string) (*sqlblobs.TaskListInfo, error) + TransferTaskInfoFromBlob([]byte, string) (*sqlblobs.TransferTaskInfo, error) + TimerTaskInfoFromBlob([]byte, string) (*sqlblobs.TimerTaskInfo, error) + ReplicationTaskInfoFromBlob([]byte, string) (*sqlblobs.ReplicationTaskInfo, error) + } + + // encoder is used to serialize structs. Each encoder implementation uses one serialization format. + encoder interface { + shardInfoToBlob(*sqlblobs.ShardInfo) ([]byte, error) + domainInfoToBlob(*sqlblobs.DomainInfo) ([]byte, error) + historyTreeInfoToBlob(*sqlblobs.HistoryTreeInfo) ([]byte, error) + workflowExecutionInfoToBlob(*sqlblobs.WorkflowExecutionInfo) ([]byte, error) + activityInfoToBlob(*sqlblobs.ActivityInfo) ([]byte, error) + childExecutionInfoToBlob(*sqlblobs.ChildExecutionInfo) ([]byte, error) + signalInfoToBlob(*sqlblobs.SignalInfo) ([]byte, error) + requestCancelInfoToBlob(*sqlblobs.RequestCancelInfo) ([]byte, error) + timerInfoToBlob(*sqlblobs.TimerInfo) ([]byte, error) + taskInfoToBlob(*sqlblobs.TaskInfo) ([]byte, error) + taskListInfoToBlob(*sqlblobs.TaskListInfo) ([]byte, error) + transferTaskInfoToBlob(*sqlblobs.TransferTaskInfo) ([]byte, error) + timerTaskInfoToBlob(*sqlblobs.TimerTaskInfo) ([]byte, error) + replicationTaskInfoToBlob(*sqlblobs.ReplicationTaskInfo) ([]byte, error) + encodingType() common.EncodingType + } + + // decoder is used to deserialize structs. Each decoder implementation uses one serialization format. + decoder interface { + shardInfoFromBlob([]byte) (*sqlblobs.ShardInfo, error) + domainInfoFromBlob([]byte) (*sqlblobs.DomainInfo, error) + historyTreeInfoFromBlob([]byte) (*sqlblobs.HistoryTreeInfo, error) + workflowExecutionInfoFromBlob([]byte) (*sqlblobs.WorkflowExecutionInfo, error) + activityInfoFromBlob([]byte) (*sqlblobs.ActivityInfo, error) + childExecutionInfoFromBlob([]byte) (*sqlblobs.ChildExecutionInfo, error) + signalInfoFromBlob([]byte) (*sqlblobs.SignalInfo, error) + requestCancelInfoFromBlob([]byte) (*sqlblobs.RequestCancelInfo, error) + timerInfoFromBlob([]byte) (*sqlblobs.TimerInfo, error) + taskInfoFromBlob([]byte) (*sqlblobs.TaskInfo, error) + taskListInfoFromBlob([]byte) (*sqlblobs.TaskListInfo, error) + transferTaskInfoFromBlob([]byte) (*sqlblobs.TransferTaskInfo, error) + timerTaskInfoFromBlob([]byte) (*sqlblobs.TimerTaskInfo, error) + replicationTaskInfoFromBlob([]byte) (*sqlblobs.ReplicationTaskInfo, error) + } + + thriftRWType interface { + ToWire() (wire.Value, error) + FromWire(w wire.Value) error + } +) diff --git a/common/persistence/serialization/parser.go b/common/persistence/serialization/parser.go new file mode 100644 index 00000000000..77fa1d11da9 --- /dev/null +++ b/common/persistence/serialization/parser.go @@ -0,0 +1,358 @@ +// The MIT License (MIT) +// +// Copyright (c) 2017-2020 Uber Technologies Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package serialization + +import ( + "fmt" + + "github.com/uber/cadence/.gen/go/sqlblobs" + "github.com/uber/cadence/common" + "github.com/uber/cadence/common/persistence" +) + +type ( + parser struct { + encoder encoder + decoders map[common.EncodingType]decoder + } +) + +// NewParser constructs a new parser using encoder as specified by encodingType and using decoders specified by decodingTypes +func NewParser(encodingType common.EncodingType, decodingTypes ...common.EncodingType) (Parser, error) { + encoder, err := getEncoder(encodingType) + if err != nil { + return nil, err + } + decoders := make(map[common.EncodingType]decoder) + for _, dt := range decodingTypes { + decoder, err := getDecoder(dt) + if err != nil { + return nil, err + } + decoders[dt] = decoder + } + return &parser{ + encoder: encoder, + decoders: decoders, + }, nil +} + +func (p *parser) ShardInfoToBlob(info *sqlblobs.ShardInfo) (persistence.DataBlob, error) { + db := persistence.DataBlob{} + data, err := p.encoder.shardInfoToBlob(info) + if err != nil { + return db, err + } + db.Data = data + db.Encoding = p.encoder.encodingType() + return db, nil +} + +func (p *parser) DomainInfoToBlob(info *sqlblobs.DomainInfo) (persistence.DataBlob, error) { + db := persistence.DataBlob{} + data, err := p.encoder.domainInfoToBlob(info) + if err != nil { + return db, err + } + db.Data = data + db.Encoding = p.encoder.encodingType() + return db, nil +} + +func (p *parser) HistoryTreeInfoToBlob(info *sqlblobs.HistoryTreeInfo) (persistence.DataBlob, error) { + db := persistence.DataBlob{} + data, err := p.encoder.historyTreeInfoToBlob(info) + if err != nil { + return db, err + } + db.Data = data + db.Encoding = p.encoder.encodingType() + return db, nil +} + +func (p *parser) WorkflowExecutionInfoToBlob(info *sqlblobs.WorkflowExecutionInfo) (persistence.DataBlob, error) { + db := persistence.DataBlob{} + data, err := p.encoder.workflowExecutionInfoToBlob(info) + if err != nil { + return db, err + } + db.Data = data + db.Encoding = p.encoder.encodingType() + return db, nil +} + +func (p *parser) ActivityInfoToBlob(info *sqlblobs.ActivityInfo) (persistence.DataBlob, error) { + db := persistence.DataBlob{} + data, err := p.encoder.activityInfoToBlob(info) + if err != nil { + return db, err + } + db.Data = data + db.Encoding = p.encoder.encodingType() + return db, nil +} + +func (p *parser) ChildExecutionInfoToBlob(info *sqlblobs.ChildExecutionInfo) (persistence.DataBlob, error) { + db := persistence.DataBlob{} + data, err := p.encoder.childExecutionInfoToBlob(info) + if err != nil { + return db, err + } + db.Data = data + db.Encoding = p.encoder.encodingType() + return db, nil +} + +func (p *parser) SignalInfoToBlob(info *sqlblobs.SignalInfo) (persistence.DataBlob, error) { + db := persistence.DataBlob{} + data, err := p.encoder.signalInfoToBlob(info) + if err != nil { + return db, err + } + db.Data = data + db.Encoding = p.encoder.encodingType() + return db, nil +} + +func (p *parser) RequestCancelInfoToBlob(info *sqlblobs.RequestCancelInfo) (persistence.DataBlob, error) { + db := persistence.DataBlob{} + data, err := p.encoder.requestCancelInfoToBlob(info) + if err != nil { + return db, err + } + db.Data = data + db.Encoding = p.encoder.encodingType() + return db, nil +} + +func (p *parser) TimerInfoToBlob(info *sqlblobs.TimerInfo) (persistence.DataBlob, error) { + db := persistence.DataBlob{} + data, err := p.encoder.timerInfoToBlob(info) + if err != nil { + return db, err + } + db.Data = data + db.Encoding = p.encoder.encodingType() + return db, nil +} + +func (p *parser) TaskInfoToBlob(info *sqlblobs.TaskInfo) (persistence.DataBlob, error) { + db := persistence.DataBlob{} + data, err := p.encoder.taskInfoToBlob(info) + if err != nil { + return db, err + } + db.Data = data + db.Encoding = p.encoder.encodingType() + return db, nil +} + +func (p *parser) TaskListInfoToBlob(info *sqlblobs.TaskListInfo) (persistence.DataBlob, error) { + db := persistence.DataBlob{} + data, err := p.encoder.taskListInfoToBlob(info) + if err != nil { + return db, err + } + db.Data = data + db.Encoding = p.encoder.encodingType() + return db, nil +} + +func (p *parser) TransferTaskInfoToBlob(info *sqlblobs.TransferTaskInfo) (persistence.DataBlob, error) { + db := persistence.DataBlob{} + data, err := p.encoder.transferTaskInfoToBlob(info) + if err != nil { + return db, err + } + db.Data = data + db.Encoding = p.encoder.encodingType() + return db, nil +} + +func (p *parser) TimerTaskInfoToBlob(info *sqlblobs.TimerTaskInfo) (persistence.DataBlob, error) { + db := persistence.DataBlob{} + data, err := p.encoder.timerTaskInfoToBlob(info) + if err != nil { + return db, err + } + db.Data = data + db.Encoding = p.encoder.encodingType() + return db, nil +} + +func (p *parser) ReplicationTaskInfoToBlob(info *sqlblobs.ReplicationTaskInfo) (persistence.DataBlob, error) { + db := persistence.DataBlob{} + data, err := p.encoder.replicationTaskInfoToBlob(info) + if err != nil { + return db, err + } + db.Data = data + db.Encoding = p.encoder.encodingType() + return db, nil +} + +func (p *parser) ShardInfoFromBlob(data []byte, encoding string) (*sqlblobs.ShardInfo, error) { + decoder, err := p.getCachedDecoder(common.EncodingType(encoding)) + if err != nil { + return nil, err + } + return decoder.shardInfoFromBlob(data) +} + +func (p *parser) DomainInfoFromBlob(data []byte, encoding string) (*sqlblobs.DomainInfo, error) { + decoder, err := p.getCachedDecoder(common.EncodingType(encoding)) + if err != nil { + return nil, err + } + return decoder.domainInfoFromBlob(data) +} + +func (p *parser) HistoryTreeInfoFromBlob(data []byte, encoding string) (*sqlblobs.HistoryTreeInfo, error) { + decoder, err := p.getCachedDecoder(common.EncodingType(encoding)) + if err != nil { + return nil, err + } + return decoder.historyTreeInfoFromBlob(data) +} + +func (p *parser) WorkflowExecutionInfoFromBlob(data []byte, encoding string) (*sqlblobs.WorkflowExecutionInfo, error) { + decoder, err := p.getCachedDecoder(common.EncodingType(encoding)) + if err != nil { + return nil, err + } + return decoder.workflowExecutionInfoFromBlob(data) +} + +func (p *parser) ActivityInfoFromBlob(data []byte, encoding string) (*sqlblobs.ActivityInfo, error) { + decoder, err := p.getCachedDecoder(common.EncodingType(encoding)) + if err != nil { + return nil, err + } + return decoder.activityInfoFromBlob(data) +} + +func (p *parser) ChildExecutionInfoFromBlob(data []byte, encoding string) (*sqlblobs.ChildExecutionInfo, error) { + decoder, err := p.getCachedDecoder(common.EncodingType(encoding)) + if err != nil { + return nil, err + } + return decoder.childExecutionInfoFromBlob(data) +} + +func (p *parser) SignalInfoFromBlob(data []byte, encoding string) (*sqlblobs.SignalInfo, error) { + decoder, err := p.getCachedDecoder(common.EncodingType(encoding)) + if err != nil { + return nil, err + } + return decoder.signalInfoFromBlob(data) +} + +func (p *parser) RequestCancelInfoFromBlob(data []byte, encoding string) (*sqlblobs.RequestCancelInfo, error) { + decoder, err := p.getCachedDecoder(common.EncodingType(encoding)) + if err != nil { + return nil, err + } + return decoder.requestCancelInfoFromBlob(data) +} + +func (p *parser) TimerInfoFromBlob(data []byte, encoding string) (*sqlblobs.TimerInfo, error) { + decoder, err := p.getCachedDecoder(common.EncodingType(encoding)) + if err != nil { + return nil, err + } + return decoder.timerInfoFromBlob(data) +} + +func (p *parser) TaskInfoFromBlob(data []byte, encoding string) (*sqlblobs.TaskInfo, error) { + decoder, err := p.getCachedDecoder(common.EncodingType(encoding)) + if err != nil { + return nil, err + } + return decoder.taskInfoFromBlob(data) +} + +func (p *parser) TaskListInfoFromBlob(data []byte, encoding string) (*sqlblobs.TaskListInfo, error) { + decoder, err := p.getCachedDecoder(common.EncodingType(encoding)) + if err != nil { + return nil, err + } + return decoder.taskListInfoFromBlob(data) +} + +func (p *parser) TransferTaskInfoFromBlob(data []byte, encoding string) (*sqlblobs.TransferTaskInfo, error) { + decoder, err := p.getCachedDecoder(common.EncodingType(encoding)) + if err != nil { + return nil, err + } + return decoder.transferTaskInfoFromBlob(data) +} + +func (p *parser) TimerTaskInfoFromBlob(data []byte, encoding string) (*sqlblobs.TimerTaskInfo, error) { + decoder, err := p.getCachedDecoder(common.EncodingType(encoding)) + if err != nil { + return nil, err + } + return decoder.timerTaskInfoFromBlob(data) +} + +func (p *parser) ReplicationTaskInfoFromBlob(data []byte, encoding string) (*sqlblobs.ReplicationTaskInfo, error) { + decoder, err := p.getCachedDecoder(common.EncodingType(encoding)) + if err != nil { + return nil, err + } + return decoder.replicationTaskInfoFromBlob(data) +} + +func (p *parser) getCachedDecoder(encoding common.EncodingType) (decoder, error) { + decoder, ok := p.decoders[encoding] + if !ok { + return nil, unsupportedEncodingError(encoding) + } + return decoder, nil +} + +func getDecoder(encoding common.EncodingType) (decoder, error) { + switch encoding { + case common.EncodingTypeThriftRW: + return newThriftDecoder(), nil + case common.EncodingTypeProto: + return newProtoDecoder(), nil + default: + return nil, unsupportedEncodingError(encoding) + } +} + +func getEncoder(encoding common.EncodingType) (encoder, error) { + switch encoding { + case common.EncodingTypeThriftRW: + return newThriftEncoder(), nil + case common.EncodingTypeProto: + return newProtoEncoder(), nil + default: + return nil, unsupportedEncodingError(encoding) + } +} + +func unsupportedEncodingError(encoding common.EncodingType) error { + return fmt.Errorf("invalid encoding type: %v", encoding) +} diff --git a/common/persistence/serialization/parser_test.go b/common/persistence/serialization/parser_test.go new file mode 100644 index 00000000000..f449d6d8cec --- /dev/null +++ b/common/persistence/serialization/parser_test.go @@ -0,0 +1,47 @@ +// The MIT License (MIT) +// +// Copyright (c) 2017-2020 Uber Technologies Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package serialization + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/uber/cadence/.gen/go/sqlblobs" + "github.com/uber/cadence/common" +) + +func TestParse(t *testing.T) { + thriftParser, err := NewParser(common.EncodingTypeThriftRW, common.EncodingTypeThriftRW) + assert.NoError(t, err) + domainInfo := &sqlblobs.DomainInfo{ + Name: common.StringPtr("test_name"), + Data: map[string]string{"test_key": "test_value"}, + } + db, err := thriftParser.DomainInfoToBlob(domainInfo) + assert.NoError(t, err) + assert.NotNil(t, db.Data) + decodedDomainInfo, err := thriftParser.DomainInfoFromBlob(db.Data, string(db.Encoding)) + assert.NoError(t, err) + assert.Equal(t, decodedDomainInfo, domainInfo) +} diff --git a/common/persistence/serialization/proto_decoder.go b/common/persistence/serialization/proto_decoder.go new file mode 100644 index 00000000000..39dec04679a --- /dev/null +++ b/common/persistence/serialization/proto_decoder.go @@ -0,0 +1,91 @@ +// The MIT License (MIT) +// +// Copyright (c) 2017-2020 Uber Technologies Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package serialization + +import ( + "github.com/uber/cadence/.gen/go/sqlblobs" +) + +type ( + protoDecoder struct{} +) + +func newProtoDecoder() decoder { + return &protoDecoder{} +} + +func (d *protoDecoder) shardInfoFromBlob(data []byte) (*sqlblobs.ShardInfo, error) { + panic("not implemented") +} + +func (d *protoDecoder) domainInfoFromBlob(data []byte) (*sqlblobs.DomainInfo, error) { + panic("not implemented") +} + +func (d *protoDecoder) historyTreeInfoFromBlob(data []byte) (*sqlblobs.HistoryTreeInfo, error) { + panic("not implemented") +} + +func (d *protoDecoder) workflowExecutionInfoFromBlob(data []byte) (*sqlblobs.WorkflowExecutionInfo, error) { + panic("not implemented") +} + +func (d *protoDecoder) activityInfoFromBlob(data []byte) (*sqlblobs.ActivityInfo, error) { + panic("not implemented") +} + +func (d *protoDecoder) childExecutionInfoFromBlob(data []byte) (*sqlblobs.ChildExecutionInfo, error) { + panic("not implemented") +} + +func (d *protoDecoder) signalInfoFromBlob(data []byte) (*sqlblobs.SignalInfo, error) { + panic("not implemented") +} + +func (d *protoDecoder) requestCancelInfoFromBlob(data []byte) (*sqlblobs.RequestCancelInfo, error) { + panic("not implemented") +} + +func (d *protoDecoder) timerInfoFromBlob(data []byte) (*sqlblobs.TimerInfo, error) { + panic("not implemented") +} + +func (d *protoDecoder) taskInfoFromBlob(data []byte) (*sqlblobs.TaskInfo, error) { + panic("not implemented") +} + +func (d *protoDecoder) taskListInfoFromBlob(data []byte) (*sqlblobs.TaskListInfo, error) { + panic("not implemented") +} + +func (d *protoDecoder) transferTaskInfoFromBlob(data []byte) (*sqlblobs.TransferTaskInfo, error) { + panic("not implemented") +} + +func (d *protoDecoder) timerTaskInfoFromBlob(data []byte) (*sqlblobs.TimerTaskInfo, error) { + panic("not implemented") +} + +func (d *protoDecoder) replicationTaskInfoFromBlob(data []byte) (*sqlblobs.ReplicationTaskInfo, error) { + panic("not implemented") +} diff --git a/common/persistence/serialization/proto_encoder.go b/common/persistence/serialization/proto_encoder.go new file mode 100644 index 00000000000..6b46d781b95 --- /dev/null +++ b/common/persistence/serialization/proto_encoder.go @@ -0,0 +1,94 @@ +// The MIT License (MIT) +// +// Copyright (c) 2017-2020 Uber Technologies Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package serialization + +import ( + "github.com/uber/cadence/.gen/go/sqlblobs" + "github.com/uber/cadence/common" +) + +type protoEncoder struct{} + +func newProtoEncoder() encoder { + return &protoEncoder{} +} + +func (e *protoEncoder) shardInfoToBlob(*sqlblobs.ShardInfo) ([]byte, error) { + panic("not implemented") +} + +func (e *protoEncoder) domainInfoToBlob(*sqlblobs.DomainInfo) ([]byte, error) { + panic("not implemented") +} + +func (e *protoEncoder) historyTreeInfoToBlob(*sqlblobs.HistoryTreeInfo) ([]byte, error) { + panic("not implemented") +} + +func (e *protoEncoder) workflowExecutionInfoToBlob(*sqlblobs.WorkflowExecutionInfo) ([]byte, error) { + panic("not implemented") +} + +func (e *protoEncoder) activityInfoToBlob(*sqlblobs.ActivityInfo) ([]byte, error) { + panic("not implemented") +} + +func (e *protoEncoder) childExecutionInfoToBlob(*sqlblobs.ChildExecutionInfo) ([]byte, error) { + panic("not implemented") +} + +func (e *protoEncoder) signalInfoToBlob(*sqlblobs.SignalInfo) ([]byte, error) { + panic("not implemented") +} + +func (e *protoEncoder) requestCancelInfoToBlob(*sqlblobs.RequestCancelInfo) ([]byte, error) { + panic("not implemented") +} + +func (e *protoEncoder) timerInfoToBlob(*sqlblobs.TimerInfo) ([]byte, error) { + panic("not implemented") +} + +func (e *protoEncoder) taskInfoToBlob(*sqlblobs.TaskInfo) ([]byte, error) { + panic("not implemented") +} + +func (e *protoEncoder) taskListInfoToBlob(*sqlblobs.TaskListInfo) ([]byte, error) { + panic("not implemented") +} + +func (e *protoEncoder) transferTaskInfoToBlob(*sqlblobs.TransferTaskInfo) ([]byte, error) { + panic("not implemented") +} + +func (e *protoEncoder) timerTaskInfoToBlob(*sqlblobs.TimerTaskInfo) ([]byte, error) { + panic("not implemented") +} + +func (e *protoEncoder) replicationTaskInfoToBlob(*sqlblobs.ReplicationTaskInfo) ([]byte, error) { + panic("not implemented") +} + +func (e *protoEncoder) encodingType() common.EncodingType { + return common.EncodingTypeProto +} diff --git a/common/persistence/serialization/thrift_decoder.go b/common/persistence/serialization/thrift_decoder.go new file mode 100644 index 00000000000..0f436542b8e --- /dev/null +++ b/common/persistence/serialization/thrift_decoder.go @@ -0,0 +1,160 @@ +// The MIT License (MIT) +// +// Copyright (c) 2017-2020 Uber Technologies Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package serialization + +import ( + "bytes" + + "go.uber.org/thriftrw/protocol" + "go.uber.org/thriftrw/wire" + + "github.com/uber/cadence/.gen/go/sqlblobs" +) + +type ( + thriftDecoder struct{} +) + +func newThriftDecoder() decoder { + return &thriftDecoder{} +} + +func (d *thriftDecoder) shardInfoFromBlob(data []byte) (*sqlblobs.ShardInfo, error) { + result := &sqlblobs.ShardInfo{} + if err := thriftRWDecode(data, result); err != nil { + return nil, err + } + return result, nil +} + +func (d *thriftDecoder) domainInfoFromBlob(data []byte) (*sqlblobs.DomainInfo, error) { + result := &sqlblobs.DomainInfo{} + if err := thriftRWDecode(data, result); err != nil { + return nil, err + } + return result, nil +} + +func (d *thriftDecoder) historyTreeInfoFromBlob(data []byte) (*sqlblobs.HistoryTreeInfo, error) { + result := &sqlblobs.HistoryTreeInfo{} + if err := thriftRWDecode(data, result); err != nil { + return nil, err + } + return result, nil +} + +func (d *thriftDecoder) workflowExecutionInfoFromBlob(data []byte) (*sqlblobs.WorkflowExecutionInfo, error) { + result := &sqlblobs.WorkflowExecutionInfo{} + if err := thriftRWDecode(data, result); err != nil { + return nil, err + } + return result, nil +} + +func (d *thriftDecoder) activityInfoFromBlob(data []byte) (*sqlblobs.ActivityInfo, error) { + result := &sqlblobs.ActivityInfo{} + if err := thriftRWDecode(data, result); err != nil { + return nil, err + } + return result, nil +} + +func (d *thriftDecoder) childExecutionInfoFromBlob(data []byte) (*sqlblobs.ChildExecutionInfo, error) { + result := &sqlblobs.ChildExecutionInfo{} + if err := thriftRWDecode(data, result); err != nil { + return nil, err + } + return result, nil +} + +func (d *thriftDecoder) signalInfoFromBlob(data []byte) (*sqlblobs.SignalInfo, error) { + result := &sqlblobs.SignalInfo{} + if err := thriftRWDecode(data, result); err != nil { + return nil, err + } + return result, nil +} + +func (d *thriftDecoder) requestCancelInfoFromBlob(data []byte) (*sqlblobs.RequestCancelInfo, error) { + result := &sqlblobs.RequestCancelInfo{} + if err := thriftRWDecode(data, result); err != nil { + return nil, err + } + return result, nil +} + +func (d *thriftDecoder) timerInfoFromBlob(data []byte) (*sqlblobs.TimerInfo, error) { + result := &sqlblobs.TimerInfo{} + if err := thriftRWDecode(data, result); err != nil { + return nil, err + } + return result, nil +} + +func (d *thriftDecoder) taskInfoFromBlob(data []byte) (*sqlblobs.TaskInfo, error) { + result := &sqlblobs.TaskInfo{} + if err := thriftRWDecode(data, result); err != nil { + return nil, err + } + return result, nil +} + +func (d *thriftDecoder) taskListInfoFromBlob(data []byte) (*sqlblobs.TaskListInfo, error) { + result := &sqlblobs.TaskListInfo{} + if err := thriftRWDecode(data, result); err != nil { + return nil, err + } + return result, nil +} + +func (d *thriftDecoder) transferTaskInfoFromBlob(data []byte) (*sqlblobs.TransferTaskInfo, error) { + result := &sqlblobs.TransferTaskInfo{} + if err := thriftRWDecode(data, result); err != nil { + return nil, err + } + return result, nil +} + +func (d *thriftDecoder) timerTaskInfoFromBlob(data []byte) (*sqlblobs.TimerTaskInfo, error) { + result := &sqlblobs.TimerTaskInfo{} + if err := thriftRWDecode(data, result); err != nil { + return nil, err + } + return result, nil +} + +func (d *thriftDecoder) replicationTaskInfoFromBlob(data []byte) (*sqlblobs.ReplicationTaskInfo, error) { + result := &sqlblobs.ReplicationTaskInfo{} + if err := thriftRWDecode(data, result); err != nil { + return nil, err + } + return result, nil +} + +func thriftRWDecode(b []byte, result thriftRWType) error { + value, err := protocol.Binary.Decode(bytes.NewReader(b), wire.TStruct) + if err != nil { + return err + } + return result.FromWire(value) +} diff --git a/common/persistence/serialization/thrift_encoder.go b/common/persistence/serialization/thrift_encoder.go new file mode 100644 index 00000000000..463f19c50a1 --- /dev/null +++ b/common/persistence/serialization/thrift_encoder.go @@ -0,0 +1,110 @@ +// The MIT License (MIT) +// +// Copyright (c) 2017-2020 Uber Technologies Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package serialization + +import ( + "bytes" + + "go.uber.org/thriftrw/protocol" + + "github.com/uber/cadence/.gen/go/sqlblobs" + "github.com/uber/cadence/common" +) + +type thriftEncoder struct{} + +func newThriftEncoder() encoder { + return &thriftEncoder{} +} + +func (e *thriftEncoder) shardInfoToBlob(info *sqlblobs.ShardInfo) ([]byte, error) { + return thriftRWEncode(info) +} + +func (e *thriftEncoder) domainInfoToBlob(info *sqlblobs.DomainInfo) ([]byte, error) { + return thriftRWEncode(info) +} + +func (e *thriftEncoder) historyTreeInfoToBlob(info *sqlblobs.HistoryTreeInfo) ([]byte, error) { + return thriftRWEncode(info) +} + +func (e *thriftEncoder) workflowExecutionInfoToBlob(info *sqlblobs.WorkflowExecutionInfo) ([]byte, error) { + return thriftRWEncode(info) +} + +func (e *thriftEncoder) activityInfoToBlob(info *sqlblobs.ActivityInfo) ([]byte, error) { + return thriftRWEncode(info) +} + +func (e *thriftEncoder) childExecutionInfoToBlob(info *sqlblobs.ChildExecutionInfo) ([]byte, error) { + return thriftRWEncode(info) +} + +func (e *thriftEncoder) signalInfoToBlob(info *sqlblobs.SignalInfo) ([]byte, error) { + return thriftRWEncode(info) +} + +func (e *thriftEncoder) requestCancelInfoToBlob(info *sqlblobs.RequestCancelInfo) ([]byte, error) { + return thriftRWEncode(info) +} + +func (e *thriftEncoder) timerInfoToBlob(info *sqlblobs.TimerInfo) ([]byte, error) { + return thriftRWEncode(info) +} + +func (e *thriftEncoder) taskInfoToBlob(info *sqlblobs.TaskInfo) ([]byte, error) { + return thriftRWEncode(info) +} + +func (e *thriftEncoder) taskListInfoToBlob(info *sqlblobs.TaskListInfo) ([]byte, error) { + return thriftRWEncode(info) +} + +func (e *thriftEncoder) transferTaskInfoToBlob(info *sqlblobs.TransferTaskInfo) ([]byte, error) { + return thriftRWEncode(info) +} + +func (e *thriftEncoder) timerTaskInfoToBlob(info *sqlblobs.TimerTaskInfo) ([]byte, error) { + return thriftRWEncode(info) +} + +func (e *thriftEncoder) replicationTaskInfoToBlob(info *sqlblobs.ReplicationTaskInfo) ([]byte, error) { + return thriftRWEncode(info) +} + +func (e *thriftEncoder) encodingType() common.EncodingType { + return common.EncodingTypeThriftRW +} + +func thriftRWEncode(t thriftRWType) ([]byte, error) { + value, err := t.ToWire() + if err != nil { + return nil, err + } + var b bytes.Buffer + if err := protocol.Binary.Encode(value, &b); err != nil { + return nil, err + } + return b.Bytes(), nil +} diff --git a/common/persistence/sql/blob.go b/common/persistence/sql/blob.go deleted file mode 100644 index 60a9984690a..00000000000 --- a/common/persistence/sql/blob.go +++ /dev/null @@ -1,211 +0,0 @@ -// Copyright (c) 2017 Uber Technologies, Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -package sql - -import ( - "bytes" - "fmt" - - "go.uber.org/thriftrw/protocol" - "go.uber.org/thriftrw/wire" - - "github.com/uber/cadence/.gen/go/sqlblobs" - "github.com/uber/cadence/common" - p "github.com/uber/cadence/common/persistence" -) - -// thriftRWType represents an thrift auto generated type -type thriftRWType interface { - ToWire() (wire.Value, error) - FromWire(w wire.Value) error -} - -func validateProto(p string) error { - if common.EncodingType(p) != common.EncodingTypeThriftRW { - return fmt.Errorf("invalid encoding type: %v", p) - } - return nil -} - -func encodeErr(err error) error { - if err == nil { - return nil - } - return fmt.Errorf("error serializing struct to thrift blob: %v", err) -} - -func decodeErr(err error) error { - if err == nil { - return nil - } - return fmt.Errorf("error deserializing blob to thrift struct: %v", err) -} - -func thriftRWEncode(t thriftRWType) (p.DataBlob, error) { - blob := p.DataBlob{Encoding: common.EncodingTypeThriftRW} - value, err := t.ToWire() - if err != nil { - return blob, encodeErr(err) - } - var b bytes.Buffer - if err := protocol.Binary.Encode(value, &b); err != nil { - return blob, encodeErr(err) - } - blob.Data = b.Bytes() - return blob, nil -} - -func thriftRWDecode(b []byte, proto string, result thriftRWType) error { - if err := validateProto(proto); err != nil { - return err - } - value, err := protocol.Binary.Decode(bytes.NewReader(b), wire.TStruct) - if err != nil { - return decodeErr(err) - } - return decodeErr(result.FromWire(value)) -} - -func shardInfoToBlob(info *sqlblobs.ShardInfo) (p.DataBlob, error) { - return thriftRWEncode(info) -} - -func shardInfoFromBlob(b []byte, proto string) (*sqlblobs.ShardInfo, error) { - result := &sqlblobs.ShardInfo{} - return result, thriftRWDecode(b, proto, result) -} - -func domainInfoToBlob(info *sqlblobs.DomainInfo) (p.DataBlob, error) { - return thriftRWEncode(info) -} - -func domainInfoFromBlob(b []byte, proto string) (*sqlblobs.DomainInfo, error) { - result := &sqlblobs.DomainInfo{} - return result, thriftRWDecode(b, proto, result) -} - -func historyTreeInfoToBlob(info *sqlblobs.HistoryTreeInfo) (p.DataBlob, error) { - return thriftRWEncode(info) -} - -func historyTreeInfoFromBlob(b []byte, proto string) (*sqlblobs.HistoryTreeInfo, error) { - result := &sqlblobs.HistoryTreeInfo{} - return result, thriftRWDecode(b, proto, result) -} - -func workflowExecutionInfoToBlob(info *sqlblobs.WorkflowExecutionInfo) (p.DataBlob, error) { - return thriftRWEncode(info) -} - -func workflowExecutionInfoFromBlob(b []byte, proto string) (*sqlblobs.WorkflowExecutionInfo, error) { - result := &sqlblobs.WorkflowExecutionInfo{} - return result, thriftRWDecode(b, proto, result) -} - -func activityInfoToBlob(info *sqlblobs.ActivityInfo) (p.DataBlob, error) { - return thriftRWEncode(info) -} - -func activityInfoFromBlob(b []byte, proto string) (*sqlblobs.ActivityInfo, error) { - result := &sqlblobs.ActivityInfo{} - return result, thriftRWDecode(b, proto, result) -} - -func childExecutionInfoToBlob(info *sqlblobs.ChildExecutionInfo) (p.DataBlob, error) { - return thriftRWEncode(info) -} - -func childExecutionInfoFromBlob(b []byte, proto string) (*sqlblobs.ChildExecutionInfo, error) { - result := &sqlblobs.ChildExecutionInfo{} - return result, thriftRWDecode(b, proto, result) -} - -func signalInfoToBlob(info *sqlblobs.SignalInfo) (p.DataBlob, error) { - return thriftRWEncode(info) -} - -func signalInfoFromBlob(b []byte, proto string) (*sqlblobs.SignalInfo, error) { - result := &sqlblobs.SignalInfo{} - return result, thriftRWDecode(b, proto, result) -} - -func requestCancelInfoToBlob(info *sqlblobs.RequestCancelInfo) (p.DataBlob, error) { - return thriftRWEncode(info) -} - -func requestCancelInfoFromBlob(b []byte, proto string) (*sqlblobs.RequestCancelInfo, error) { - result := &sqlblobs.RequestCancelInfo{} - return result, thriftRWDecode(b, proto, result) -} - -func timerInfoToBlob(info *sqlblobs.TimerInfo) (p.DataBlob, error) { - return thriftRWEncode(info) -} - -func timerInfoFromBlob(b []byte, proto string) (*sqlblobs.TimerInfo, error) { - result := &sqlblobs.TimerInfo{} - return result, thriftRWDecode(b, proto, result) -} - -func taskInfoToBlob(info *sqlblobs.TaskInfo) (p.DataBlob, error) { - return thriftRWEncode(info) -} - -func taskInfoFromBlob(b []byte, proto string) (*sqlblobs.TaskInfo, error) { - result := &sqlblobs.TaskInfo{} - return result, thriftRWDecode(b, proto, result) -} - -func taskListInfoToBlob(info *sqlblobs.TaskListInfo) (p.DataBlob, error) { - return thriftRWEncode(info) -} - -func taskListInfoFromBlob(b []byte, proto string) (*sqlblobs.TaskListInfo, error) { - result := &sqlblobs.TaskListInfo{} - return result, thriftRWDecode(b, proto, result) -} - -func transferTaskInfoToBlob(info *sqlblobs.TransferTaskInfo) (p.DataBlob, error) { - return thriftRWEncode(info) -} - -func transferTaskInfoFromBlob(b []byte, proto string) (*sqlblobs.TransferTaskInfo, error) { - result := &sqlblobs.TransferTaskInfo{} - return result, thriftRWDecode(b, proto, result) -} - -func timerTaskInfoToBlob(info *sqlblobs.TimerTaskInfo) (p.DataBlob, error) { - return thriftRWEncode(info) -} - -func timerTaskInfoFromBlob(b []byte, proto string) (*sqlblobs.TimerTaskInfo, error) { - result := &sqlblobs.TimerTaskInfo{} - return result, thriftRWDecode(b, proto, result) -} - -func replicationTaskInfoToBlob(info *sqlblobs.ReplicationTaskInfo) (p.DataBlob, error) { - return thriftRWEncode(info) -} - -func replicationTaskInfoFromBlob(b []byte, proto string) (*sqlblobs.ReplicationTaskInfo, error) { - result := &sqlblobs.ReplicationTaskInfo{} - return result, thriftRWDecode(b, proto, result) -} diff --git a/common/persistence/sql/common.go b/common/persistence/sql/common.go index e946e86cd47..e6e44b7f9a7 100644 --- a/common/persistence/sql/common.go +++ b/common/persistence/sql/common.go @@ -26,6 +26,8 @@ import ( "encoding/gob" "fmt" + "github.com/uber/cadence/common/persistence/serialization" + workflow "github.com/uber/cadence/.gen/go/shared" "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/log/tag" @@ -37,6 +39,7 @@ import ( type sqlStore struct { db sqlplugin.DB logger log.Logger + parser serialization.Parser } func (m *sqlStore) GetName() string { diff --git a/common/persistence/sql/factory.go b/common/persistence/sql/factory.go index 5395f1d2c65..7193d1d9e85 100644 --- a/common/persistence/sql/factory.go +++ b/common/persistence/sql/factory.go @@ -24,6 +24,8 @@ import ( "fmt" "sync" + "github.com/uber/cadence/common/persistence/serialization" + "github.com/uber/cadence/common/log" p "github.com/uber/cadence/common/persistence" "github.com/uber/cadence/common/persistence/sql/sqlplugin" @@ -37,6 +39,7 @@ type ( dbConn dbConn clusterName string logger log.Logger + parser serialization.Parser } // dbConn represents a logical mysql connection - its a @@ -52,12 +55,18 @@ type ( // NewFactory returns an instance of a factory object which can be used to create // datastores backed by any kind of SQL store -func NewFactory(cfg config.SQL, clusterName string, logger log.Logger) *Factory { +func NewFactory( + cfg config.SQL, + clusterName string, + logger log.Logger, + parser serialization.Parser, +) *Factory { return &Factory{ cfg: cfg, clusterName: clusterName, logger: logger, dbConn: newRefCountedDBConn(&cfg), + parser: parser, } } @@ -67,7 +76,7 @@ func (f *Factory) NewTaskStore() (p.TaskStore, error) { if err != nil { return nil, err } - return newTaskPersistence(conn, f.cfg.NumShards, f.logger) + return newTaskPersistence(conn, f.cfg.NumShards, f.logger, f.parser) } // NewShardStore returns a new shard store @@ -76,7 +85,7 @@ func (f *Factory) NewShardStore() (p.ShardStore, error) { if err != nil { return nil, err } - return newShardPersistence(conn, f.clusterName, f.logger) + return newShardPersistence(conn, f.clusterName, f.logger, f.parser) } // NewHistoryV2Store returns a new history store @@ -85,7 +94,7 @@ func (f *Factory) NewHistoryV2Store() (p.HistoryStore, error) { if err != nil { return nil, err } - return newHistoryV2Persistence(conn, f.logger) + return newHistoryV2Persistence(conn, f.logger, f.parser) } // NewMetadataStore returns a new metadata store @@ -94,7 +103,7 @@ func (f *Factory) NewMetadataStore() (p.MetadataStore, error) { if err != nil { return nil, err } - return newMetadataPersistenceV2(conn, f.clusterName, f.logger) + return newMetadataPersistenceV2(conn, f.clusterName, f.logger, f.parser) } // NewExecutionStore returns an ExecutionStore for a given shardID @@ -103,7 +112,7 @@ func (f *Factory) NewExecutionStore(shardID int) (p.ExecutionStore, error) { if err != nil { return nil, err } - return NewSQLExecutionStore(conn, f.logger, shardID) + return NewSQLExecutionStore(conn, f.logger, shardID, f.parser) } // NewVisibilityStore returns a visibility store diff --git a/common/persistence/sql/sqlExecutionManager.go b/common/persistence/sql/sqlExecutionManager.go index dc43665ba2b..3e7215db142 100644 --- a/common/persistence/sql/sqlExecutionManager.go +++ b/common/persistence/sql/sqlExecutionManager.go @@ -36,6 +36,7 @@ import ( "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/log/tag" p "github.com/uber/cadence/common/persistence" + "github.com/uber/cadence/common/persistence/serialization" "github.com/uber/cadence/common/persistence/sql/sqlplugin" ) @@ -56,6 +57,7 @@ func NewSQLExecutionStore( db sqlplugin.DB, logger log.Logger, shardID int, + parser serialization.Parser, ) (p.ExecutionStore, error) { return &sqlExecutionManager{ @@ -63,6 +65,7 @@ func NewSQLExecutionStore( sqlStore: sqlStore{ db: db, logger: logger, + parser: parser, }, }, nil } @@ -203,7 +206,7 @@ func (m *sqlExecutionManager) createWorkflowExecutionTx( return nil, err } - if err := m.applyWorkflowSnapshotTxAsNew(tx, shardID, &request.NewWorkflowSnapshot); err != nil { + if err := m.applyWorkflowSnapshotTxAsNew(tx, shardID, &request.NewWorkflowSnapshot, m.parser); err != nil { return nil, err } @@ -236,7 +239,7 @@ func (m *sqlExecutionManager) GetWorkflowExecution( } } - info, err := workflowExecutionInfoFromBlob(execution.Data, execution.DataEncoding) + info, err := m.parser.WorkflowExecutionInfoFromBlob(execution.Data, execution.DataEncoding) if err != nil { return nil, err } @@ -345,7 +348,8 @@ func (m *sqlExecutionManager) GetWorkflowExecution( m.shardID, domainID, wfID, - runID) + runID, + m.parser) if err != nil { return nil, &workflow.InternalServiceError{ Message: fmt.Sprintf("GetWorkflowExecution: failed to get activity info. Error: %v", err), @@ -359,7 +363,8 @@ func (m *sqlExecutionManager) GetWorkflowExecution( m.shardID, domainID, wfID, - runID) + runID, + m.parser) if err != nil { return nil, &workflow.InternalServiceError{ Message: fmt.Sprintf("GetWorkflowExecution: failed to get timer info. Error: %v", err), @@ -373,7 +378,8 @@ func (m *sqlExecutionManager) GetWorkflowExecution( m.shardID, domainID, wfID, - runID) + runID, + m.parser) if err != nil { return nil, &workflow.InternalServiceError{ Message: fmt.Sprintf("GetWorkflowExecution: failed to get child execution info. Error: %v", err), @@ -387,7 +393,8 @@ func (m *sqlExecutionManager) GetWorkflowExecution( m.shardID, domainID, wfID, - runID) + runID, + m.parser) if err != nil { return nil, &workflow.InternalServiceError{ Message: fmt.Sprintf("GetWorkflowExecution: failed to get request cancel info. Error: %v", err), @@ -401,7 +408,8 @@ func (m *sqlExecutionManager) GetWorkflowExecution( m.shardID, domainID, wfID, - runID) + runID, + m.parser) if err != nil { return nil, &workflow.InternalServiceError{ Message: fmt.Sprintf("GetWorkflowExecution: failed to get signal info. Error: %v", err), @@ -538,11 +546,11 @@ func (m *sqlExecutionManager) updateWorkflowExecutionTx( } } - if err := applyWorkflowMutationTx(tx, shardID, &updateWorkflow); err != nil { + if err := applyWorkflowMutationTx(tx, shardID, &updateWorkflow, m.parser); err != nil { return err } if newWorkflow != nil { - if err := m.applyWorkflowSnapshotTxAsNew(tx, shardID, newWorkflow); err != nil { + if err := m.applyWorkflowSnapshotTxAsNew(tx, shardID, newWorkflow, m.parser); err != nil { return err } } @@ -614,7 +622,7 @@ func (m *sqlExecutionManager) resetWorkflowExecutionTx( // 3. update or lock current run if request.CurrentWorkflowMutation != nil { - if err := applyWorkflowMutationTx(tx, m.shardID, request.CurrentWorkflowMutation); err != nil { + if err := applyWorkflowMutationTx(tx, m.shardID, request.CurrentWorkflowMutation, m.parser); err != nil { return err } } else { @@ -634,7 +642,7 @@ func (m *sqlExecutionManager) resetWorkflowExecutionTx( } // 4. create the new reset workflow - return m.applyWorkflowSnapshotTxAsNew(tx, m.shardID, &request.NewWorkflowSnapshot) + return m.applyWorkflowSnapshotTxAsNew(tx, m.shardID, &request.NewWorkflowSnapshot, m.parser) } func (m *sqlExecutionManager) ConflictResolveWorkflowExecution( @@ -764,16 +772,16 @@ func (m *sqlExecutionManager) conflictResolveWorkflowExecutionTx( } } - if err := applyWorkflowSnapshotTxAsReset(tx, shardID, &resetWorkflow); err != nil { + if err := applyWorkflowSnapshotTxAsReset(tx, shardID, &resetWorkflow, m.parser); err != nil { return err } if currentWorkflow != nil { - if err := applyWorkflowMutationTx(tx, shardID, currentWorkflow); err != nil { + if err := applyWorkflowMutationTx(tx, shardID, currentWorkflow, m.parser); err != nil { return err } } if newWorkflow != nil { - if err := m.applyWorkflowSnapshotTxAsNew(tx, shardID, newWorkflow); err != nil { + if err := m.applyWorkflowSnapshotTxAsNew(tx, shardID, newWorkflow, m.parser); err != nil { return err } } @@ -880,7 +888,7 @@ func (m *sqlExecutionManager) GetTransferTasks( } resp := &p.GetTransferTasksResponse{Tasks: make([]*p.TransferTaskInfo, len(rows))} for i, row := range rows { - info, err := transferTaskInfoFromBlob(row.Data, row.DataEncoding) + info, err := m.parser.TransferTaskInfoFromBlob(row.Data, row.DataEncoding) if err != nil { return nil, err } @@ -988,7 +996,7 @@ func (m *sqlExecutionManager) populateGetReplicationTasksResponse( var tasks = make([]*p.ReplicationTaskInfo, len(rows)) for i, row := range rows { - info, err := replicationTaskInfoFromBlob(row.Data, row.DataEncoding) + info, err := m.parser.ReplicationTaskInfoFromBlob(row.Data, row.DataEncoding) if err != nil { return nil, err } @@ -1174,6 +1182,7 @@ func (m *sqlExecutionManager) CreateFailoverMarkerTasks( sqlplugin.MustParseUUID(task.DomainID), emptyWorkflowID, sqlplugin.MustParseUUID(emptyReplicationRunID), + m.parser, ); err != nil { rollBackErr := tx.Rollback() if rollBackErr != nil { @@ -1232,7 +1241,7 @@ func (m *sqlExecutionManager) GetTimerIndexTasks( resp := &p.GetTimerIndexTasksResponse{Timers: make([]*p.TimerTaskInfo, len(rows))} for i, row := range rows { - info, err := timerTaskInfoFromBlob(row.Data, row.DataEncoding) + info, err := m.parser.TimerTaskInfoFromBlob(row.Data, row.DataEncoding) if err != nil { return nil, err } @@ -1309,7 +1318,7 @@ func (m *sqlExecutionManager) PutReplicationTaskToDLQ( request *p.PutReplicationTaskToDLQRequest, ) error { replicationTask := request.TaskInfo - blob, err := replicationTaskInfoToBlob(&sqlblobs.ReplicationTaskInfo{ + blob, err := m.parser.ReplicationTaskInfoToBlob(&sqlblobs.ReplicationTaskInfo{ DomainID: sqlplugin.MustParseUUID(replicationTask.DomainID), WorkflowID: &replicationTask.WorkflowID, RunID: sqlplugin.MustParseUUID(replicationTask.RunID), diff --git a/common/persistence/sql/sqlExecutionManagerUtil.go b/common/persistence/sql/sqlExecutionManagerUtil.go index 779261f4488..55e174152fb 100644 --- a/common/persistence/sql/sqlExecutionManagerUtil.go +++ b/common/persistence/sql/sqlExecutionManagerUtil.go @@ -26,6 +26,8 @@ import ( "fmt" "time" + "github.com/uber/cadence/common/persistence/serialization" + workflow "github.com/uber/cadence/.gen/go/shared" "github.com/uber/cadence/.gen/go/sqlblobs" "github.com/uber/cadence/common" @@ -37,6 +39,7 @@ func applyWorkflowMutationTx( tx sqlplugin.Tx, shardID int, workflowMutation *p.InternalWorkflowMutation, + parser serialization.Parser, ) error { executionInfo := workflowMutation.ExecutionInfo @@ -79,7 +82,8 @@ func applyWorkflowMutationTx( startVersion, lastWriteVersion, currentVersion, - shardID); err != nil { + shardID, + parser); err != nil { return &workflow.InternalServiceError{ Message: fmt.Sprintf("applyWorkflowMutationTx failed. Failed to update executions row. Erorr: %v", err), } @@ -93,6 +97,7 @@ func applyWorkflowMutationTx( workflowMutation.TransferTasks, workflowMutation.ReplicationTasks, workflowMutation.TimerTasks, + parser, ); err != nil { return err } @@ -103,7 +108,8 @@ func applyWorkflowMutationTx( shardID, domainID, workflowID, - runID); err != nil { + runID, + parser); err != nil { return &workflow.InternalServiceError{ Message: fmt.Sprintf("applyWorkflowMutationTx failed. Error: %v", err), } @@ -115,7 +121,8 @@ func applyWorkflowMutationTx( shardID, domainID, workflowID, - runID); err != nil { + runID, + parser); err != nil { return &workflow.InternalServiceError{ Message: fmt.Sprintf("applyWorkflowMutationTx failed. Error: %v", err), } @@ -127,7 +134,8 @@ func applyWorkflowMutationTx( shardID, domainID, workflowID, - runID); err != nil { + runID, + parser); err != nil { return &workflow.InternalServiceError{ Message: fmt.Sprintf("applyWorkflowMutationTx failed. Error: %v", err), } @@ -139,7 +147,8 @@ func applyWorkflowMutationTx( shardID, domainID, workflowID, - runID); err != nil { + runID, + parser); err != nil { return &workflow.InternalServiceError{ Message: fmt.Sprintf("applyWorkflowMutationTx failed. Error: %v", err), } @@ -151,7 +160,8 @@ func applyWorkflowMutationTx( shardID, domainID, workflowID, - runID); err != nil { + runID, + parser); err != nil { return &workflow.InternalServiceError{ Message: fmt.Sprintf("applyWorkflowMutationTx failed. Error: %v", err), } @@ -198,6 +208,7 @@ func applyWorkflowSnapshotTxAsReset( tx sqlplugin.Tx, shardID int, workflowSnapshot *p.InternalWorkflowSnapshot, + parser serialization.Parser, ) error { executionInfo := workflowSnapshot.ExecutionInfo @@ -240,7 +251,8 @@ func applyWorkflowSnapshotTxAsReset( startVersion, lastWriteVersion, currentVersion, - shardID); err != nil { + shardID, + parser); err != nil { return &workflow.InternalServiceError{ Message: fmt.Sprintf("applyWorkflowSnapshotTxAsReset failed. Failed to update executions row. Erorr: %v", err), } @@ -254,6 +266,7 @@ func applyWorkflowSnapshotTxAsReset( workflowSnapshot.TransferTasks, workflowSnapshot.ReplicationTasks, workflowSnapshot.TimerTasks, + parser, ); err != nil { return err } @@ -274,7 +287,8 @@ func applyWorkflowSnapshotTxAsReset( shardID, domainID, workflowID, - runID); err != nil { + runID, + parser); err != nil { return &workflow.InternalServiceError{ Message: fmt.Sprintf("applyWorkflowSnapshotTxAsReset failed. Failed to insert into activity info map after clearing. Error: %v", err), } @@ -296,7 +310,8 @@ func applyWorkflowSnapshotTxAsReset( shardID, domainID, workflowID, - runID); err != nil { + runID, + parser); err != nil { return &workflow.InternalServiceError{ Message: fmt.Sprintf("applyWorkflowSnapshotTxAsReset failed. Failed to insert into timer info map after clearing. Error: %v", err), } @@ -318,7 +333,8 @@ func applyWorkflowSnapshotTxAsReset( shardID, domainID, workflowID, - runID); err != nil { + runID, + parser); err != nil { return &workflow.InternalServiceError{ Message: fmt.Sprintf("applyWorkflowSnapshotTxAsReset failed. Failed to insert into activity info map after clearing. Error: %v", err), } @@ -340,7 +356,8 @@ func applyWorkflowSnapshotTxAsReset( shardID, domainID, workflowID, - runID); err != nil { + runID, + parser); err != nil { return &workflow.InternalServiceError{ Message: fmt.Sprintf("applyWorkflowSnapshotTxAsReset failed. Failed to insert into request cancel info map after clearing. Error: %v", err), } @@ -362,7 +379,8 @@ func applyWorkflowSnapshotTxAsReset( shardID, domainID, workflowID, - runID); err != nil { + runID, + parser); err != nil { return &workflow.InternalServiceError{ Message: fmt.Sprintf("applyWorkflowSnapshotTxAsReset failed. Failed to insert into signal info map after clearing. Error: %v", err), } @@ -406,6 +424,7 @@ func (m *sqlExecutionManager) applyWorkflowSnapshotTxAsNew( tx sqlplugin.Tx, shardID int, workflowSnapshot *p.InternalWorkflowSnapshot, + parser serialization.Parser, ) error { executionInfo := workflowSnapshot.ExecutionInfo @@ -431,7 +450,8 @@ func (m *sqlExecutionManager) applyWorkflowSnapshotTxAsNew( startVersion, lastWriteVersion, currentVersion, - shardID); err != nil { + shardID, + parser); err != nil { return err } @@ -443,6 +463,7 @@ func (m *sqlExecutionManager) applyWorkflowSnapshotTxAsNew( workflowSnapshot.TransferTasks, workflowSnapshot.ReplicationTasks, workflowSnapshot.TimerTasks, + parser, ); err != nil { return err } @@ -453,7 +474,8 @@ func (m *sqlExecutionManager) applyWorkflowSnapshotTxAsNew( shardID, domainID, workflowID, - runID); err != nil { + runID, + parser); err != nil { return &workflow.InternalServiceError{ Message: fmt.Sprintf("applyWorkflowSnapshotTxAsNew failed. Failed to insert into activity info map after clearing. Error: %v", err), } @@ -465,7 +487,8 @@ func (m *sqlExecutionManager) applyWorkflowSnapshotTxAsNew( shardID, domainID, workflowID, - runID); err != nil { + runID, + parser); err != nil { return &workflow.InternalServiceError{ Message: fmt.Sprintf("applyWorkflowSnapshotTxAsNew failed. Failed to insert into timer info map after clearing. Error: %v", err), } @@ -477,7 +500,8 @@ func (m *sqlExecutionManager) applyWorkflowSnapshotTxAsNew( shardID, domainID, workflowID, - runID); err != nil { + runID, + parser); err != nil { return &workflow.InternalServiceError{ Message: fmt.Sprintf("applyWorkflowSnapshotTxAsNew failed. Failed to insert into activity info map after clearing. Error: %v", err), } @@ -489,7 +513,8 @@ func (m *sqlExecutionManager) applyWorkflowSnapshotTxAsNew( shardID, domainID, workflowID, - runID); err != nil { + runID, + parser); err != nil { return &workflow.InternalServiceError{ Message: fmt.Sprintf("applyWorkflowSnapshotTxAsNew failed. Failed to insert into request cancel info map after clearing. Error: %v", err), } @@ -501,7 +526,8 @@ func (m *sqlExecutionManager) applyWorkflowSnapshotTxAsNew( shardID, domainID, workflowID, - runID); err != nil { + runID, + parser); err != nil { return &workflow.InternalServiceError{ Message: fmt.Sprintf("applyWorkflowSnapshotTxAsNew failed. Failed to insert into signal info map after clearing. Error: %v", err), } @@ -531,6 +557,7 @@ func applyTasks( transferTasks []p.Task, replicationTasks []p.Task, timerTasks []p.Task, + parser serialization.Parser, ) error { if err := createTransferTasks(tx, @@ -538,7 +565,8 @@ func applyTasks( shardID, domainID, workflowID, - runID); err != nil { + runID, + parser); err != nil { return &workflow.InternalServiceError{ Message: fmt.Sprintf("applyTasks failed. Failed to create transfer tasks. Error: %v", err), } @@ -550,6 +578,7 @@ func applyTasks( domainID, workflowID, runID, + parser, ); err != nil { return &workflow.InternalServiceError{ Message: fmt.Sprintf("applyTasks failed. Failed to create replication tasks. Error: %v", err), @@ -561,7 +590,8 @@ func applyTasks( shardID, domainID, workflowID, - runID); err != nil { + runID, + parser); err != nil { return &workflow.InternalServiceError{ Message: fmt.Sprintf("applyTasks failed. Failed to create timer tasks. Error: %v", err), } @@ -734,6 +764,7 @@ func createTransferTasks( domainID sqlplugin.UUID, workflowID string, runID sqlplugin.UUID, + parser serialization.Parser, ) error { if len(transferTasks) == 0 { @@ -804,7 +835,7 @@ func createTransferTasks( info.Version = common.Int64Ptr(task.GetVersion()) info.VisibilityTimestampNanos = common.Int64Ptr(task.GetVisibilityTimestamp().UnixNano()) - blob, err := transferTaskInfoToBlob(info) + blob, err := parser.TransferTaskInfoToBlob(info) if err != nil { return err } @@ -842,6 +873,7 @@ func createReplicationTasks( domainID sqlplugin.UUID, workflowID string, runID sqlplugin.UUID, + parser serialization.Parser, ) error { if len(replicationTasks) == 0 { @@ -893,7 +925,7 @@ func createReplicationTasks( } } - blob, err := replicationTaskInfoToBlob(&sqlblobs.ReplicationTaskInfo{ + blob, err := parser.ReplicationTaskInfoToBlob(&sqlblobs.ReplicationTaskInfo{ DomainID: domainID, WorkflowID: &workflowID, RunID: runID, @@ -949,6 +981,7 @@ func createTimerTasks( domainID sqlplugin.UUID, workflowID string, runID sqlplugin.UUID, + parser serialization.Parser, ) error { if len(timerTasks) > 0 { @@ -996,7 +1029,7 @@ func createTimerTasks( info.Version = common.Int64Ptr(task.GetVersion()) info.TaskType = common.Int16Ptr(int16(task.GetType())) - blob, err := timerTaskInfoToBlob(info) + blob, err := parser.TimerTaskInfoToBlob(info) if err != nil { return err } @@ -1217,6 +1250,7 @@ func buildExecutionRow( lastWriteVersion int64, currentVersion int64, shardID int, + parser serialization.Parser, ) (row *sqlplugin.ExecutionsRow, err error) { info := &sqlblobs.WorkflowExecutionInfo{ @@ -1306,7 +1340,7 @@ func buildExecutionRow( info.CancelRequestID = &executionInfo.CancelRequestID } - blob, err := workflowExecutionInfoToBlob(info) + blob, err := parser.WorkflowExecutionInfoToBlob(info) if err != nil { return nil, err } @@ -1332,6 +1366,7 @@ func (m *sqlExecutionManager) createExecution( lastWriteVersion int64, currentVersion int64, shardID int, + parser serialization.Parser, ) error { // validate workflow state & close status @@ -1353,6 +1388,7 @@ func (m *sqlExecutionManager) createExecution( lastWriteVersion, currentVersion, shardID, + parser, ) if err != nil { return err @@ -1397,6 +1433,7 @@ func updateExecution( lastWriteVersion int64, currentVersion int64, shardID int, + parser serialization.Parser, ) error { // validate workflow state & close status @@ -1417,6 +1454,7 @@ func updateExecution( lastWriteVersion, currentVersion, shardID, + parser, ) if err != nil { return err diff --git a/common/persistence/sql/sqlHistoryManager.go b/common/persistence/sql/sqlHistoryManager.go index 8a14609e0ec..34c2fdf5c18 100644 --- a/common/persistence/sql/sqlHistoryManager.go +++ b/common/persistence/sql/sqlHistoryManager.go @@ -25,6 +25,8 @@ import ( "database/sql" "fmt" + "github.com/uber/cadence/common/persistence/serialization" + "github.com/uber/cadence/.gen/go/shared" "github.com/uber/cadence/.gen/go/sqlblobs" "github.com/uber/cadence/common" @@ -41,12 +43,14 @@ type sqlHistoryV2Manager struct { func newHistoryV2Persistence( db sqlplugin.DB, logger log.Logger, + parser serialization.Parser, ) (p.HistoryStore, error) { return &sqlHistoryV2Manager{ sqlStore: sqlStore{ db: db, logger: logger, + parser: parser, }, }, nil } @@ -88,7 +92,7 @@ func (m *sqlHistoryV2Manager) AppendHistoryNodes( CreatedTimeNanos: common.TimeNowNanosPtr(), } - blob, err := historyTreeInfoToBlob(treeInfo) + blob, err := m.parser.HistoryTreeInfoToBlob(treeInfo) if err != nil { return err } @@ -329,7 +333,7 @@ func (m *sqlHistoryV2Manager) ForkHistoryBranch( CreatedTimeNanos: common.TimeNowNanosPtr(), } - blob, err := historyTreeInfoToBlob(treeInfo) + blob, err := m.parser.HistoryTreeInfoToBlob(treeInfo) if err != nil { return nil, err } @@ -460,7 +464,7 @@ func (m *sqlHistoryV2Manager) GetHistoryTree( return &p.GetHistoryTreeResponse{}, nil } for _, row := range rows { - treeInfo, err := historyTreeInfoFromBlob(row.Data, row.DataEncoding) + treeInfo, err := m.parser.HistoryTreeInfoFromBlob(row.Data, row.DataEncoding) if err != nil { return nil, err } diff --git a/common/persistence/sql/sqlMetadataManagerV2.go b/common/persistence/sql/sqlMetadataManagerV2.go index d72317f3c28..2c09f7d490f 100644 --- a/common/persistence/sql/sqlMetadataManagerV2.go +++ b/common/persistence/sql/sqlMetadataManagerV2.go @@ -25,6 +25,8 @@ import ( "database/sql" "fmt" + "github.com/uber/cadence/common/persistence/serialization" + workflow "github.com/uber/cadence/.gen/go/shared" "github.com/uber/cadence/.gen/go/sqlblobs" "github.com/uber/cadence/common" @@ -43,11 +45,13 @@ func newMetadataPersistenceV2( db sqlplugin.DB, currentClusterName string, logger log.Logger, + parser serialization.Parser, ) (persistence.MetadataStore, error) { return &sqlMetadataManagerV2{ sqlStore: sqlStore{ db: db, logger: logger, + parser: parser, }, activeClusterName: currentClusterName, }, nil @@ -129,7 +133,7 @@ func (m *sqlMetadataManagerV2) CreateDomain( BadBinariesEncoding: badBinariesEncoding, } - blob, err := domainInfoToBlob(domainInfo) + blob, err := m.parser.DomainInfoToBlob(domainInfo) if err != nil { return nil, err } @@ -211,7 +215,7 @@ func (m *sqlMetadataManagerV2) GetDomain( } func (m *sqlMetadataManagerV2) domainRowToGetDomainResponse(row *sqlplugin.DomainRow) (*persistence.InternalGetDomainResponse, error) { - domainInfo, err := domainInfoFromBlob(row.Data, row.DataEncoding) + domainInfo, err := m.parser.DomainInfoFromBlob(row.Data, row.DataEncoding) if err != nil { return nil, err } @@ -312,7 +316,7 @@ func (m *sqlMetadataManagerV2) UpdateDomain( BadBinariesEncoding: badBinariesEncoding, } - blob, err := domainInfoToBlob(domainInfo) + blob, err := m.parser.DomainInfoToBlob(domainInfo) if err != nil { return err } diff --git a/common/persistence/sql/sqlPersistenceTest.go b/common/persistence/sql/sqlPersistenceTest.go index bf4edf9b117..05efb947280 100644 --- a/common/persistence/sql/sqlPersistenceTest.go +++ b/common/persistence/sql/sqlPersistenceTest.go @@ -60,6 +60,8 @@ func NewTestCluster(pluginName, dbName, username, password, host string, port in PluginName: pluginName, DatabaseName: dbName, NumShards: 4, + EncodingType: "thriftrw", + DecodingTypes: []string{"thriftrw"}, } return &result } diff --git a/common/persistence/sql/sqlShardManager.go b/common/persistence/sql/sqlShardManager.go index 97ddb491f6b..8e18e35e8bc 100644 --- a/common/persistence/sql/sqlShardManager.go +++ b/common/persistence/sql/sqlShardManager.go @@ -26,6 +26,8 @@ import ( "fmt" "time" + "github.com/uber/cadence/common/persistence/serialization" + workflow "github.com/uber/cadence/.gen/go/shared" "github.com/uber/cadence/.gen/go/sqlblobs" "github.com/uber/cadence/common" @@ -40,11 +42,17 @@ type sqlShardManager struct { } // newShardPersistence creates an instance of ShardManager -func newShardPersistence(db sqlplugin.DB, currentClusterName string, log log.Logger) (persistence.ShardManager, error) { +func newShardPersistence( + db sqlplugin.DB, + currentClusterName string, + log log.Logger, + parser serialization.Parser, +) (persistence.ShardManager, error) { return &sqlShardManager{ sqlStore: sqlStore{ db: db, logger: log, + parser: parser, }, currentClusterName: currentClusterName, }, nil @@ -62,7 +70,7 @@ func (m *sqlShardManager) CreateShard( } } - row, err := shardInfoToShardsRow(*request.ShardInfo) + row, err := shardInfoToShardsRow(*request.ShardInfo, m.parser) if err != nil { return &workflow.InternalServiceError{ Message: fmt.Sprintf("CreateShard operation failed. Error: %v", err), @@ -94,7 +102,7 @@ func (m *sqlShardManager) GetShard( } } - shardInfo, err := shardInfoFromBlob(row.Data, row.DataEncoding) + shardInfo, err := m.parser.ShardInfoFromBlob(row.Data, row.DataEncoding) if err != nil { return nil, err } @@ -164,7 +172,7 @@ func (m *sqlShardManager) UpdateShard( _ context.Context, request *persistence.UpdateShardRequest, ) error { - row, err := shardInfoToShardsRow(*request.ShardInfo) + row, err := shardInfoToShardsRow(*request.ShardInfo, m.parser) if err != nil { return &workflow.InternalServiceError{ Message: fmt.Sprintf("UpdateShard operation failed. Error: %v", err), @@ -236,7 +244,7 @@ func readLockShard(tx sqlplugin.Tx, shardID int, oldRangeID int64) error { return nil } -func shardInfoToShardsRow(s persistence.ShardInfo) (*sqlplugin.ShardsRow, error) { +func shardInfoToShardsRow(s persistence.ShardInfo, parser serialization.Parser) (*sqlplugin.ShardsRow, error) { timerAckLevels := make(map[string]int64, len(s.ClusterTimerAckLevel)) for k, v := range s.ClusterTimerAckLevel { timerAckLevels[k] = v.UnixNano() @@ -283,7 +291,7 @@ func shardInfoToShardsRow(s persistence.ShardInfo) (*sqlplugin.ShardsRow, error) PendingFailoverMarkersEncoding: common.StringPtr(markerEncoding), } - blob, err := shardInfoToBlob(shardInfo) + blob, err := parser.ShardInfoToBlob(shardInfo) if err != nil { return nil, err } diff --git a/common/persistence/sql/sqlTaskManager.go b/common/persistence/sql/sqlTaskManager.go index 05cf59f0148..cb29f8d63b9 100644 --- a/common/persistence/sql/sqlTaskManager.go +++ b/common/persistence/sql/sqlTaskManager.go @@ -27,6 +27,8 @@ import ( "math" "time" + "github.com/uber/cadence/common/persistence/serialization" + "github.com/dgryski/go-farm" workflow "github.com/uber/cadence/.gen/go/shared" @@ -47,11 +49,17 @@ var ( ) // newTaskPersistence creates a new instance of TaskManager -func newTaskPersistence(db sqlplugin.DB, nShards int, log log.Logger) (persistence.TaskManager, error) { +func newTaskPersistence( + db sqlplugin.DB, + nShards int, + log log.Logger, + parser serialization.Parser, +) (persistence.TaskManager, error) { return &sqlTaskManager{ sqlStore: sqlStore{ db: db, logger: log, + parser: parser, }, nShards: nShards, }, nil @@ -78,7 +86,7 @@ func (m *sqlTaskManager) LeaseTaskList( ExpiryTimeNanos: common.Int64Ptr(0), LastUpdatedNanos: common.Int64Ptr(time.Now().UnixNano()), } - blob, err := taskListInfoToBlob(tlInfo) + blob, err := m.parser.TaskListInfoToBlob(tlInfo) if err != nil { return nil, err } @@ -111,7 +119,7 @@ func (m *sqlTaskManager) LeaseTaskList( } } - tlInfo, err := taskListInfoFromBlob(row.Data, row.DataEncoding) + tlInfo, err := m.parser.TaskListInfoFromBlob(row.Data, row.DataEncoding) if err != nil { return nil, err } @@ -129,7 +137,7 @@ func (m *sqlTaskManager) LeaseTaskList( } now := time.Now() tlInfo.LastUpdatedNanos = common.Int64Ptr(now.UnixNano()) - blob, err1 := taskListInfoToBlob(tlInfo) + blob, err1 := m.parser.TaskListInfoToBlob(tlInfo) if err1 != nil { return err1 } @@ -180,7 +188,7 @@ func (m *sqlTaskManager) UpdateTaskList( } if request.TaskListInfo.Kind == persistence.TaskListKindSticky { tlInfo.ExpiryTimeNanos = common.Int64Ptr(stickyTaskListTTL().UnixNano()) - blob, err := taskListInfoToBlob(tlInfo) + blob, err := m.parser.TaskListInfoToBlob(tlInfo) if err != nil { return nil, err } @@ -199,7 +207,7 @@ func (m *sqlTaskManager) UpdateTaskList( } } var resp *persistence.UpdateTaskListResponse - blob, err := taskListInfoToBlob(tlInfo) + blob, err := m.parser.TaskListInfoToBlob(tlInfo) if err != nil { return nil, err } @@ -295,7 +303,7 @@ func (m *sqlTaskManager) ListTaskList( } for i := range rows { - info, err := taskListInfoFromBlob(rows[i].Data, rows[i].DataEncoding) + info, err := m.parser.TaskListInfoFromBlob(rows[i].Data, rows[i].DataEncoding) if err != nil { return nil, err } @@ -347,7 +355,7 @@ func (m *sqlTaskManager) CreateTasks( if v.Data.ScheduleToStartTimeout > 0 { expiryTime = time.Now().Add(time.Second * time.Duration(v.Data.ScheduleToStartTimeout)) } - blob, err := taskInfoToBlob(&sqlblobs.TaskInfo{ + blob, err := m.parser.TaskInfoToBlob(&sqlblobs.TaskInfo{ WorkflowID: &v.Data.WorkflowID, RunID: sqlplugin.MustParseUUID(v.Data.RunID), ScheduleID: &v.Data.ScheduleID, @@ -406,7 +414,7 @@ func (m *sqlTaskManager) GetTasks( var tasks = make([]*persistence.TaskInfo, len(rows)) for i, v := range rows { - info, err := taskInfoFromBlob(v.Data, v.DataEncoding) + info, err := m.parser.TaskInfoFromBlob(v.Data, v.DataEncoding) if err != nil { return nil, err } diff --git a/common/persistence/sql/sqlplugin/mysql/dsn_test.go b/common/persistence/sql/sqlplugin/mysql/dsn_test.go index c7f88e0d88c..9df88cad525 100644 --- a/common/persistence/sql/sqlplugin/mysql/dsn_test.go +++ b/common/persistence/sql/sqlplugin/mysql/dsn_test.go @@ -52,6 +52,8 @@ func (s *StoreTestSuite) TestBuildDSN() { ConnectProtocol: "tcp", ConnectAddr: "192.168.0.1:3306", DatabaseName: "db1", + EncodingType: "thriftrw", + DecodingTypes: []string{"thriftrw"}, }, outIsolationKey: "transaction_isolation", outIsolationVal: "'READ-COMMITTED'", @@ -65,6 +67,8 @@ func (s *StoreTestSuite) TestBuildDSN() { ConnectAddr: "192.168.0.1:3306", DatabaseName: "db1", ConnectAttributes: map[string]string{"k1": "v1", "k2": "v2"}, + EncodingType: "thriftrw", + DecodingTypes: []string{"thriftrw"}, }, outIsolationKey: "transaction_isolation", outIsolationVal: "'READ-COMMITTED'", @@ -78,6 +82,8 @@ func (s *StoreTestSuite) TestBuildDSN() { ConnectAddr: "192.168.0.1:3306", DatabaseName: "db1", ConnectAttributes: map[string]string{"k1": "v1", "k2": "v2", "tx_isolation": "'REPEATABLE-READ'"}, + EncodingType: "thriftrw", + DecodingTypes: []string{"thriftrw"}, }, outIsolationKey: "tx_isolation", outIsolationVal: "'repeatable-read'", @@ -91,6 +97,8 @@ func (s *StoreTestSuite) TestBuildDSN() { ConnectAddr: "192.168.0.1:3306", DatabaseName: "db1", ConnectAttributes: map[string]string{"k1": "v1", "k2": "v2", "tx_isolation": "REPEATABLE-READ"}, + EncodingType: "thriftrw", + DecodingTypes: []string{"thriftrw"}, }, outIsolationKey: "tx_isolation", outIsolationVal: "'repeatable-read'", @@ -104,6 +112,8 @@ func (s *StoreTestSuite) TestBuildDSN() { ConnectAddr: "192.168.0.1:3306", DatabaseName: "db1", ConnectAttributes: map[string]string{"k1": "v1", "k2": "v2", "transaction_isolation": "REPEATABLE-READ"}, + EncodingType: "thriftrw", + DecodingTypes: []string{"thriftrw"}, }, outIsolationKey: "transaction_isolation", outIsolationVal: "'repeatable-read'", diff --git a/common/persistence/sql/workflowStateMaps.go b/common/persistence/sql/workflowStateMaps.go index 731cb5f3dfb..4985cbeb0b9 100644 --- a/common/persistence/sql/workflowStateMaps.go +++ b/common/persistence/sql/workflowStateMaps.go @@ -25,6 +25,8 @@ import ( "fmt" "time" + "github.com/uber/cadence/common/persistence/serialization" + workflow "github.com/uber/cadence/.gen/go/shared" "github.com/uber/cadence/.gen/go/sqlblobs" "github.com/uber/cadence/common" @@ -40,6 +42,7 @@ func updateActivityInfos( domainID sqlplugin.UUID, workflowID string, runID sqlplugin.UUID, + parser serialization.Parser, ) error { if len(activityInfos) > 0 { @@ -81,7 +84,7 @@ func updateActivityInfos( RetryLastWorkerIdentity: &v.LastWorkerIdentity, RetryLastFailureDetails: v.LastFailureDetails, } - blob, err := activityInfoToBlob(info) + blob, err := parser.ActivityInfoToBlob(info) if err != nil { return err } @@ -142,6 +145,7 @@ func getActivityInfoMap( domainID sqlplugin.UUID, workflowID string, runID sqlplugin.UUID, + parser serialization.Parser, ) (map[int64]*persistence.InternalActivityInfo, error) { rows, err := db.SelectFromActivityInfoMaps(&sqlplugin.ActivityInfoMapsFilter{ @@ -158,7 +162,7 @@ func getActivityInfoMap( ret := make(map[int64]*persistence.InternalActivityInfo) for _, v := range rows { - decoded, err := activityInfoFromBlob(v.Data, v.DataEncoding) + decoded, err := parser.ActivityInfoFromBlob(v.Data, v.DataEncoding) if err != nil { return nil, err } @@ -234,12 +238,13 @@ func updateTimerInfos( domainID sqlplugin.UUID, workflowID string, runID sqlplugin.UUID, + parser serialization.Parser, ) error { if len(timerInfos) > 0 { rows := make([]sqlplugin.TimerInfoMapsRow, len(timerInfos)) for i, v := range timerInfos { - blob, err := timerInfoToBlob(&sqlblobs.TimerInfo{ + blob, err := parser.TimerInfoToBlob(&sqlblobs.TimerInfo{ Version: &v.Version, StartedID: &v.StartedID, ExpiryTimeNanos: common.Int64Ptr(v.ExpiryTime.UnixNano()), @@ -303,6 +308,7 @@ func getTimerInfoMap( domainID sqlplugin.UUID, workflowID string, runID sqlplugin.UUID, + parser serialization.Parser, ) (map[string]*persistence.TimerInfo, error) { rows, err := db.SelectFromTimerInfoMaps(&sqlplugin.TimerInfoMapsFilter{ @@ -318,7 +324,7 @@ func getTimerInfoMap( } ret := make(map[string]*persistence.TimerInfo) for _, v := range rows { - info, err := timerInfoFromBlob(v.Data, v.DataEncoding) + info, err := parser.TimerInfoFromBlob(v.Data, v.DataEncoding) if err != nil { return nil, err } @@ -366,6 +372,7 @@ func updateChildExecutionInfos( domainID sqlplugin.UUID, workflowID string, runID sqlplugin.UUID, + parser serialization.Parser, ) error { if len(childExecutionInfos) > 0 { @@ -389,7 +396,7 @@ func updateChildExecutionInfos( WorkflowTypeName: &v.WorkflowTypeName, ParentClosePolicy: common.Int32Ptr(int32(v.ParentClosePolicy)), } - blob, err := childExecutionInfoToBlob(info) + blob, err := parser.ChildExecutionInfoToBlob(info) if err != nil { return err } @@ -432,6 +439,7 @@ func getChildExecutionInfoMap( domainID sqlplugin.UUID, workflowID string, runID sqlplugin.UUID, + parser serialization.Parser, ) (map[int64]*persistence.InternalChildExecutionInfo, error) { rows, err := db.SelectFromChildExecutionInfoMaps(&sqlplugin.ChildExecutionInfoMapsFilter{ @@ -448,7 +456,7 @@ func getChildExecutionInfoMap( ret := make(map[int64]*persistence.InternalChildExecutionInfo) for _, v := range rows { - rowInfo, err := childExecutionInfoFromBlob(v.Data, v.DataEncoding) + rowInfo, err := parser.ChildExecutionInfoFromBlob(v.Data, v.DataEncoding) if err != nil { return nil, err } @@ -505,12 +513,13 @@ func updateRequestCancelInfos( domainID sqlplugin.UUID, workflowID string, runID sqlplugin.UUID, + parser serialization.Parser, ) error { if len(requestCancelInfos) > 0 { rows := make([]sqlplugin.RequestCancelInfoMapsRow, len(requestCancelInfos)) for i, v := range requestCancelInfos { - blob, err := requestCancelInfoToBlob(&sqlblobs.RequestCancelInfo{ + blob, err := parser.RequestCancelInfoToBlob(&sqlblobs.RequestCancelInfo{ Version: &v.Version, InitiatedEventBatchID: &v.InitiatedEventBatchID, CancelRequestID: &v.CancelRequestID, @@ -570,6 +579,7 @@ func getRequestCancelInfoMap( domainID sqlplugin.UUID, workflowID string, runID sqlplugin.UUID, + parser serialization.Parser, ) (map[int64]*persistence.RequestCancelInfo, error) { rows, err := db.SelectFromRequestCancelInfoMaps(&sqlplugin.RequestCancelInfoMapsFilter{ @@ -586,7 +596,7 @@ func getRequestCancelInfoMap( ret := make(map[int64]*persistence.RequestCancelInfo) for _, v := range rows { - rowInfo, err := requestCancelInfoFromBlob(v.Data, v.DataEncoding) + rowInfo, err := parser.RequestCancelInfoFromBlob(v.Data, v.DataEncoding) if err != nil { return nil, err } @@ -630,12 +640,13 @@ func updateSignalInfos( domainID sqlplugin.UUID, workflowID string, runID sqlplugin.UUID, + parser serialization.Parser, ) error { if len(signalInfos) > 0 { rows := make([]sqlplugin.SignalInfoMapsRow, len(signalInfos)) for i, v := range signalInfos { - blob, err := signalInfoToBlob(&sqlblobs.SignalInfo{ + blob, err := parser.SignalInfoToBlob(&sqlblobs.SignalInfo{ Version: &v.Version, InitiatedEventBatchID: &v.InitiatedEventBatchID, RequestID: &v.SignalRequestID, @@ -698,6 +709,7 @@ func getSignalInfoMap( domainID sqlplugin.UUID, workflowID string, runID sqlplugin.UUID, + parser serialization.Parser, ) (map[int64]*persistence.SignalInfo, error) { rows, err := db.SelectFromSignalInfoMaps(&sqlplugin.SignalInfoMapsFilter{ @@ -714,7 +726,7 @@ func getSignalInfoMap( ret := make(map[int64]*persistence.SignalInfo) for _, v := range rows { - rowInfo, err := signalInfoFromBlob(v.Data, v.DataEncoding) + rowInfo, err := parser.SignalInfoFromBlob(v.Data, v.DataEncoding) if err != nil { return nil, err } diff --git a/common/service/config/config.go b/common/service/config/config.go index ce869ad5102..187619eb90c 100644 --- a/common/service/config/config.go +++ b/common/service/config/config.go @@ -226,6 +226,11 @@ type ( NumShards int `yaml:"nShards"` // TLS is the configuration for TLS connections TLS *auth.TLS `yaml:"tls"` + // EncodingType is the configuration for the type of encoding used for sql blobs + EncodingType string `yaml:"encodingType"` + // DecodingTypes is the configuration for all the sql blob decoding types which need to be supported + // DecodingTypes should not be removed unless there are no blobs in database with the encoding type + DecodingTypes []string `yaml:"decodingTypes"` } // CustomDatastoreConfig is the configuration for connecting to a custom datastore that is not supported by cadence core diff --git a/config/development_mysql.yaml b/config/development_mysql.yaml index e510e578e29..2a02ed7a917 100644 --- a/config/development_mysql.yaml +++ b/config/development_mysql.yaml @@ -14,6 +14,8 @@ persistence: maxConns: 20 maxIdleConns: 20 maxConnLifetime: "1h" + encodingType: "thriftrw" + decodingType: ["thriftrw"] mysql-visibility: sql: pluginName: "mysql" @@ -25,6 +27,8 @@ persistence: maxConns: 2 maxIdleConns: 2 maxConnLifetime: "1h" + encodingType: "thriftrw" + decodingType: ["thriftrw"] ringpop: name: cadence diff --git a/config/development_postgres.yaml b/config/development_postgres.yaml index 61c2b10f6e8..48ae4a812c9 100644 --- a/config/development_postgres.yaml +++ b/config/development_postgres.yaml @@ -14,6 +14,8 @@ persistence: maxConns: 20 maxIdleConns: 20 maxConnLifetime: "1h" + encodingType: "thriftrw" + decodingType: ["thriftrw"] postgres-visibility: sql: pluginName: "postgres" @@ -25,6 +27,8 @@ persistence: maxConns: 2 maxIdleConns: 2 maxConnLifetime: "1h" + encodingType: "thriftrw" + decodingType: ["thriftrw"] ringpop: name: cadence diff --git a/docker/config_template.yaml b/docker/config_template.yaml index 5e45b8f4fd1..bf0363e0072 100644 --- a/docker/config_template.yaml +++ b/docker/config_template.yaml @@ -25,6 +25,8 @@ persistence: default: sql: pluginName: "mysql" + encodingType: "thriftrw" + decodingType: ["thriftrw"] databaseName: {{ default .Env.DBNAME "cadence" }} connectAddr: "{{ default .Env.MYSQL_SEEDS "" }}:{{ default .Env.DB_PORT "3306" }}" connectProtocol: "tcp" @@ -37,6 +39,8 @@ persistence: visibility: sql: pluginName: "mysql" + encodingType: "thriftrw" + decodingType: ["thriftrw"] databaseName: {{ default .Env.VISIBILITY_DBNAME "cadence_visibility" }} connectAddr: "{{ default .Env.MYSQL_SEEDS "" }}:{{ default .Env.DB_PORT "3306" }}" connectProtocol: "tcp" @@ -50,6 +54,8 @@ persistence: default: sql: pluginName: "postgres" + encodingType: "thriftrw" + decodingType: ["thriftrw"] databaseName: {{ default .Env.DBNAME "cadence" }} connectAddr: "{{ default .Env.POSTGRES_SEEDS "" }}:{{ default .Env.DB_PORT "5432" }}" connectProtocol: "tcp" @@ -61,6 +67,8 @@ persistence: visibility: sql: pluginName: "postgres" + encodingType: "thriftrw" + decodingType: ["thriftrw"] databaseName: {{ default .Env.VISIBILITY_DBNAME "cadence_visibility" }} connectAddr: "{{ default .Env.POSTGRES_SEEDS "" }}:{{ default .Env.DB_PORT "5432" }}" connectProtocol: "tcp" diff --git a/tools/sql/clitest/connTest.go b/tools/sql/clitest/connTest.go index 5b0ed585201..3daacf100c0 100644 --- a/tools/sql/clitest/connTest.go +++ b/tools/sql/clitest/connTest.go @@ -92,6 +92,8 @@ func (s *SQLConnTestSuite) TestSQLConn() { Password: testPassword, PluginName: s.pluginName, DatabaseName: s.DBName, + EncodingType: "thriftrw", + DecodingTypes: []string{"thriftrw"}, }) s.Nil(err) s.RunCreateTest(conn) @@ -110,6 +112,8 @@ func newTestConn(database, pluginName string) (*sql.Connection, error) { Password: testPassword, PluginName: pluginName, DatabaseName: database, + EncodingType: "thriftrw", + DecodingTypes: []string{"thriftrw"}, }) } diff --git a/tools/sql/clitest/versionTest.go b/tools/sql/clitest/versionTest.go index 970a5a772f0..76dfccd6654 100644 --- a/tools/sql/clitest/versionTest.go +++ b/tools/sql/clitest/versionTest.go @@ -110,6 +110,8 @@ func (s *VersionTestSuite) TestVerifyCompatibleVersion() { Password: testPassword, PluginName: s.pluginName, DatabaseName: database, + EncodingType: "thriftrw", + DecodingTypes: []string{"thriftrw"}, } visibilityCfg := defaultCfg visibilityCfg.DatabaseName = visDatabase @@ -199,6 +201,8 @@ func (s *VersionTestSuite) runCheckCompatibleVersion( Password: testPassword, PluginName: s.pluginName, DatabaseName: database, + EncodingType: "thriftrw", + DecodingTypes: []string{"thriftrw"}, } err = sql.CheckCompatibleVersion(cfg, expected) if len(errStr) > 0 {