Skip to content

Commit

Permalink
sql: allow configuration of connection pool settings (cadence-workflo…
Browse files Browse the repository at this point in the history
  • Loading branch information
venkat1109 authored Apr 26, 2019
1 parent ff14c2e commit 6b91753
Show file tree
Hide file tree
Showing 9 changed files with 90 additions and 120 deletions.
57 changes: 28 additions & 29 deletions common/persistence/persistence-factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,17 +137,8 @@ func New(
metricsClient: metricsClient,
logger: logger,
}
defaultCfg := cfg.DataStores[cfg.DefaultStore]
visibilityCfg := cfg.DataStores[cfg.VisibilityStore]
limiters := buildRatelimiters(cfg)
factory.datastores = map[storeType]Datastore{
storeTypeTask: newStore(defaultCfg, limiters[cfg.DefaultStore], clusterName, 0, logger),
storeTypeShard: newStore(defaultCfg, limiters[cfg.DefaultStore], clusterName, 0, logger),
storeTypeMetadata: newStore(defaultCfg, limiters[cfg.DefaultStore], clusterName, 0, logger),
storeTypeExecution: newStore(defaultCfg, limiters[cfg.DefaultStore], clusterName, 0, logger),
storeTypeHistory: newStore(defaultCfg, limiters[cfg.DefaultStore], clusterName, cfg.HistoryMaxConns, logger),
storeTypeVisibility: newStore(visibilityCfg, limiters[cfg.VisibilityStore], clusterName, 0, logger),
}
factory.init(clusterName, limiters)
return factory
}

Expand Down Expand Up @@ -305,29 +296,37 @@ func (f *factoryImpl) getCassandraConfig() *config.Cassandra {
return cfg.DataStores[cfg.VisibilityStore].Cassandra
}

func newStore(cfg config.DataStore, tb tokenbucket.TokenBucket, clusterName string, maxConnsOverride int, logger log.Logger) Datastore {
var ds Datastore
ds.ratelimit = tb
if cfg.SQL != nil {
ds.factory = newSQLStore(*cfg.SQL, clusterName, maxConnsOverride, logger)
return ds
}
ds.factory = newCassandraStore(*cfg.Cassandra, clusterName, maxConnsOverride, logger)
return ds
}
func (f *factoryImpl) init(clusterName string, limiters map[string]tokenbucket.TokenBucket) {
f.datastores = make(map[storeType]Datastore, len(storeTypes))
defaultCfg := f.config.DataStores[f.config.DefaultStore]
defaultDataStore := Datastore{ratelimit: limiters[f.config.DefaultStore]}
switch {
case defaultCfg.Cassandra != nil:
defaultDataStore.factory = cassandra.NewFactory(*defaultCfg.Cassandra, clusterName, f.logger)
case defaultCfg.SQL != nil:
defaultDataStore.factory = sql.NewFactory(*defaultCfg.SQL, clusterName, f.logger)
default:
f.logger.Fatal("invalid config: one of cassandra or sql params must be specified")
}

func newSQLStore(cfg config.SQL, clusterName string, maxConnsOverride int, logger log.Logger) DataStoreFactory {
if maxConnsOverride > 0 {
cfg.MaxConns = maxConnsOverride
for _, st := range storeTypes {
if st != storeTypeVisibility {
f.datastores[st] = defaultDataStore
}
}
return sql.NewFactory(cfg, clusterName, logger)
}

func newCassandraStore(cfg config.Cassandra, clusterName string, maxConnsOverride int, logger log.Logger) DataStoreFactory {
if maxConnsOverride > 0 {
cfg.MaxConns = maxConnsOverride
visibilityCfg := f.config.DataStores[f.config.VisibilityStore]
visibilityDataStore := Datastore{ratelimit: limiters[f.config.VisibilityStore]}
switch {
case defaultCfg.Cassandra != nil:
visibilityDataStore.factory = cassandra.NewFactory(*visibilityCfg.Cassandra, clusterName, f.logger)
case visibilityCfg.SQL != nil:
visibilityDataStore.factory = sql.NewFactory(*visibilityCfg.SQL, clusterName, f.logger)
default:
f.logger.Fatal("invalid config: one of cassandra or sql params must be specified")
}
return cassandra.NewFactory(cfg, clusterName, logger)

f.datastores[storeTypeVisibility] = visibilityDataStore
}

func buildRatelimiters(cfg *config.Persistence) map[string]tokenbucket.TokenBucket {
Expand Down
96 changes: 42 additions & 54 deletions common/persistence/sql/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,20 +30,14 @@ import (
"github.com/uber/cadence/common/service/config"
)

type (
// Factory vends store objects backed by MySQL
Factory struct {
sync.RWMutex
cfg config.SQL
clusterName string
logger log.Logger
execStoreFactory *executionStoreFactory
}
executionStoreFactory struct {
db sqldb.Interface
logger log.Logger
}
)
// Factory vends store objects backed by MySQL
type Factory struct {
sync.RWMutex
db sqldb.Interface
cfg config.SQL
clusterName string
logger log.Logger
}

// NewFactory returns an instance of a factory object which can be used to create
// datastores backed by any kind of SQL store
Expand All @@ -53,27 +47,47 @@ func NewFactory(cfg config.SQL, clusterName string, logger log.Logger) *Factory

// NewTaskStore returns a new task store
func (f *Factory) NewTaskStore() (p.TaskStore, error) {
return newTaskPersistence(f.cfg, f.logger)
conn, err := f.conn()
if err != nil {
return nil, err
}
return newTaskPersistence(conn, f.cfg.NumShards, f.logger)
}

// NewShardStore returns a new shard store
func (f *Factory) NewShardStore() (p.ShardStore, error) {
return newShardPersistence(f.cfg, f.clusterName, f.logger)
conn, err := f.conn()
if err != nil {
return nil, err
}
return newShardPersistence(conn, f.clusterName, f.logger)
}

// NewHistoryStore returns a new history store
func (f *Factory) NewHistoryStore() (p.HistoryStore, error) {
return newHistoryPersistence(f.cfg, f.logger)
conn, err := f.conn()
if err != nil {
return nil, err
}
return newHistoryPersistence(conn, f.logger)
}

// NewHistoryV2Store returns a new history store
func (f *Factory) NewHistoryV2Store() (p.HistoryV2Store, error) {
return newHistoryV2Persistence(f.cfg, f.logger)
conn, err := f.conn()
if err != nil {
return nil, err
}
return newHistoryV2Persistence(conn, f.logger)
}

// NewMetadataStore returns a new metadata store
func (f *Factory) NewMetadataStore() (p.MetadataStore, error) {
return newMetadataPersistenceV2(f.cfg, f.clusterName, f.logger)
conn, err := f.conn()
if err != nil {
return nil, err
}
return newMetadataPersistenceV2(conn, f.clusterName, f.logger)
}

// NewMetadataStoreV1 returns the default metadatastore
Expand All @@ -88,11 +102,11 @@ func (f *Factory) NewMetadataStoreV2() (p.MetadataStore, error) {

// NewExecutionStore returns an ExecutionStore for a given shardID
func (f *Factory) NewExecutionStore(shardID int) (p.ExecutionStore, error) {
factory, err := f.newExecutionStoreFactory()
conn, err := f.conn()
if err != nil {
return nil, err
}
return factory.new(shardID)
return NewSQLExecutionStore(conn, f.logger, shardID)
}

// NewVisibilityStore returns a visibility store
Expand All @@ -104,47 +118,21 @@ func (f *Factory) NewVisibilityStore() (p.VisibilityStore, error) {
func (f *Factory) Close() {
f.Lock()
defer f.Unlock()
if f.execStoreFactory != nil {
f.execStoreFactory.close()
if f.db != nil {
f.db.Close()
}
}

// newExecutionStoreFactory returns a new instance of a factory that vends
// execution stores. This factory exist to make sure all of the execution
// managers reuse the same underlying db connection / object and that closing
// one closes all of them
func (f *Factory) newExecutionStoreFactory() (*executionStoreFactory, error) {
func (f *Factory) conn() (sqldb.Interface, error) {
f.RLock()
if f.execStoreFactory != nil {
if f.db != nil {
f.RUnlock()
return f.execStoreFactory, nil
return f.db, nil
}
f.RUnlock()
f.Lock()
defer f.Unlock()
var err error
f.execStoreFactory, err = newExecutionStoreFactory(f.cfg, f.logger)
return f.execStoreFactory, err
}

func newExecutionStoreFactory(cfg config.SQL, logger log.Logger) (*executionStoreFactory, error) {
db, err := storage.NewSQLDB(&cfg)
if err != nil {
return nil, err
}
return &executionStoreFactory{
db: db,
logger: logger,
}, nil
}

func (f *executionStoreFactory) new(shardID int) (p.ExecutionStore, error) {
return NewSQLExecutionStore(f.db, f.logger, shardID)
}

// close closes the factory
func (f *executionStoreFactory) close() {
if f.db != nil {
f.db.Close()
}
f.db, err = storage.NewSQLDB(&f.cfg)
return f.db, err
}
8 changes: 1 addition & 7 deletions common/persistence/sql/sqlHistoryManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,7 @@ import (
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/log"
p "github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/persistence/sql/storage"
"github.com/uber/cadence/common/persistence/sql/storage/sqldb"
"github.com/uber/cadence/common/service/config"
)

type sqlHistoryManager struct {
Expand All @@ -40,11 +38,7 @@ type sqlHistoryManager struct {
}

// newHistoryPersistence creates an instance of HistoryManager
func newHistoryPersistence(cfg config.SQL, logger log.Logger) (p.HistoryStore, error) {
var db, err = storage.NewSQLDB(&cfg)
if err != nil {
return nil, err
}
func newHistoryPersistence(db sqldb.Interface, logger log.Logger) (p.HistoryStore, error) {
return &sqlHistoryManager{
sqlStore: sqlStore{
db: db,
Expand Down
10 changes: 2 additions & 8 deletions common/persistence/sql/sqlHistoryV2Manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,22 +32,16 @@ import (
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/log"
p "github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/persistence/sql/storage"
"github.com/uber/cadence/common/persistence/sql/storage/sqldb"
"github.com/uber/cadence/common/service/config"
)

type sqlHistoryV2Manager struct {
sqlStore
shardID int
}

// newHistoryPersistence creates an instance of HistoryManager
func newHistoryV2Persistence(cfg config.SQL, logger log.Logger) (p.HistoryV2Store, error) {
var db, err = storage.NewSQLDB(&cfg)
if err != nil {
return nil, err
}
// newHistoryV2Persistence creates an instance of HistoryManager
func newHistoryV2Persistence(db sqldb.Interface, logger log.Logger) (p.HistoryV2Store, error) {
return &sqlHistoryV2Manager{
sqlStore: sqlStore{
db: db,
Expand Down
8 changes: 1 addition & 7 deletions common/persistence/sql/sqlMetadataManagerV2.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,7 @@ import (
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/persistence/sql/storage"
"github.com/uber/cadence/common/persistence/sql/storage/sqldb"
"github.com/uber/cadence/common/service/config"
)

type sqlMetadataManagerV2 struct {
Expand All @@ -41,12 +39,8 @@ type sqlMetadataManagerV2 struct {
}

// newMetadataPersistenceV2 creates an instance of sqlMetadataManagerV2
func newMetadataPersistenceV2(cfg config.SQL, currentClusterName string,
func newMetadataPersistenceV2(db sqldb.Interface, currentClusterName string,
logger log.Logger) (persistence.MetadataManager, error) {
var db, err = storage.NewSQLDB(&cfg)
if err != nil {
return nil, err
}
return &sqlMetadataManagerV2{
sqlStore: sqlStore{
db: db,
Expand Down
8 changes: 1 addition & 7 deletions common/persistence/sql/sqlShardManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,7 @@ import (
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/persistence/sql/storage"
"github.com/uber/cadence/common/persistence/sql/storage/sqldb"
"github.com/uber/cadence/common/service/config"
)

type sqlShardManager struct {
Expand All @@ -41,11 +39,7 @@ type sqlShardManager struct {
}

// newShardPersistence creates an instance of ShardManager
func newShardPersistence(cfg config.SQL, currentClusterName string, log log.Logger) (persistence.ShardManager, error) {
var db, err = storage.NewSQLDB(&cfg)
if err != nil {
return nil, err
}
func newShardPersistence(db sqldb.Interface, currentClusterName string, log log.Logger) (persistence.ShardManager, error) {
return &sqlShardManager{
sqlStore: sqlStore{
db: db,
Expand Down
10 changes: 2 additions & 8 deletions common/persistence/sql/sqlTaskManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,7 @@ import (
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/persistence/sql/storage"
"github.com/uber/cadence/common/persistence/sql/storage/sqldb"
"github.com/uber/cadence/common/service/config"
)

type sqlTaskManager struct {
Expand All @@ -47,17 +45,13 @@ var (
)

// newTaskPersistence creates a new instance of TaskManager
func newTaskPersistence(cfg config.SQL, log log.Logger) (persistence.TaskManager, error) {
var db, err = storage.NewSQLDB(&cfg)
if err != nil {
return nil, err
}
func newTaskPersistence(db sqldb.Interface, nShards int, log log.Logger) (persistence.TaskManager, error) {
return &sqlTaskManager{
sqlStore: sqlStore{
db: db,
logger: log,
},
nShards: cfg.NumShards,
nShards: nShards,
}, nil
}

Expand Down
9 changes: 9 additions & 0 deletions common/persistence/sql/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,15 @@ func NewSQLDB(cfg *config.SQL) (sqldb.Interface, error) {
if err != nil {
return nil, err
}
if cfg.MaxConns > 0 {
db.SetMaxOpenConns(cfg.MaxConns)
}
if cfg.MaxIdleConns > 0 {
db.SetMaxIdleConns(cfg.MaxIdleConns)
}
if cfg.MaxConnLifetime > 0 {
db.SetConnMaxLifetime(cfg.MaxConnLifetime)
}
// Maps struct names in CamelCase to snake without need for db struct tags.
db.MapperFunc(strcase.ToSnake)
return mysql.NewDB(db, nil), nil
Expand Down
4 changes: 4 additions & 0 deletions common/service/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,10 @@ type (
MaxQPS int `yaml:"maxQPS"`
// MaxConns the max number of connections to this datastore
MaxConns int `yaml:"maxConns"`
// MaxIdleConns is the max number of idle connections to this datastore
MaxIdleConns int `yaml:"maxIdleConns"`
// MaxConnLifetime is the maximum time a connection can be alive
MaxConnLifetime time.Duration `yaml:"maxConnLifetime"`
// NumShards is the number of storage shards to use for tables
// in a sharded sql database. The default value for this param is 1
NumShards int `yaml:"nShards"`
Expand Down

0 comments on commit 6b91753

Please sign in to comment.