Skip to content

Commit

Permalink
sql: make the factory use a refcounted conn object (cadence-workflow#…
Browse files Browse the repository at this point in the history
  • Loading branch information
venkat1109 authored May 8, 2019
1 parent 4027955 commit ade14e5
Showing 1 changed file with 76 additions and 30 deletions.
106 changes: 76 additions & 30 deletions common/persistence/sql/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,24 +30,40 @@ import (
"github.com/uber/cadence/common/service/config"
)

// Factory vends store objects backed by MySQL
type Factory struct {
sync.RWMutex
db sqldb.Interface
cfg config.SQL
clusterName string
logger log.Logger
}
type (
// Factory vends store objects backed by MySQL
Factory struct {
cfg config.SQL
dbConn dbConn
clusterName string
logger log.Logger
}

// dbConn represents a logical mysql connection - its a
// wrapper around the standard sql connection pool with
// additional reference counting
dbConn struct {
sync.Mutex
sqldb.Interface
refCnt int
cfg *config.SQL
}
)

// NewFactory returns an instance of a factory object which can be used to create
// datastores backed by any kind of SQL store
func NewFactory(cfg config.SQL, clusterName string, logger log.Logger) *Factory {
return &Factory{cfg: cfg, clusterName: clusterName, logger: logger}
return &Factory{
cfg: cfg,
clusterName: clusterName,
logger: logger,
dbConn: newRefCountedDBConn(&cfg),
}
}

// NewTaskStore returns a new task store
func (f *Factory) NewTaskStore() (p.TaskStore, error) {
conn, err := f.conn()
conn, err := f.dbConn.get()
if err != nil {
return nil, err
}
Expand All @@ -56,7 +72,7 @@ func (f *Factory) NewTaskStore() (p.TaskStore, error) {

// NewShardStore returns a new shard store
func (f *Factory) NewShardStore() (p.ShardStore, error) {
conn, err := f.conn()
conn, err := f.dbConn.get()
if err != nil {
return nil, err
}
Expand All @@ -65,7 +81,7 @@ func (f *Factory) NewShardStore() (p.ShardStore, error) {

// NewHistoryStore returns a new history store
func (f *Factory) NewHistoryStore() (p.HistoryStore, error) {
conn, err := f.conn()
conn, err := f.dbConn.get()
if err != nil {
return nil, err
}
Expand All @@ -74,7 +90,7 @@ func (f *Factory) NewHistoryStore() (p.HistoryStore, error) {

// NewHistoryV2Store returns a new history store
func (f *Factory) NewHistoryV2Store() (p.HistoryV2Store, error) {
conn, err := f.conn()
conn, err := f.dbConn.get()
if err != nil {
return nil, err
}
Expand All @@ -83,7 +99,7 @@ func (f *Factory) NewHistoryV2Store() (p.HistoryV2Store, error) {

// NewMetadataStore returns a new metadata store
func (f *Factory) NewMetadataStore() (p.MetadataStore, error) {
conn, err := f.conn()
conn, err := f.dbConn.get()
if err != nil {
return nil, err
}
Expand All @@ -102,7 +118,7 @@ func (f *Factory) NewMetadataStoreV2() (p.MetadataStore, error) {

// NewExecutionStore returns an ExecutionStore for a given shardID
func (f *Factory) NewExecutionStore(shardID int) (p.ExecutionStore, error) {
conn, err := f.conn()
conn, err := f.dbConn.get()
if err != nil {
return nil, err
}
Expand All @@ -116,23 +132,53 @@ func (f *Factory) NewVisibilityStore() (p.VisibilityStore, error) {

// Close closes the factory
func (f *Factory) Close() {
f.Lock()
defer f.Unlock()
if f.db != nil {
f.db.Close()
f.dbConn.forceClose()
}

// newRefCountedDBConn returns a logical mysql connection that
// uses reference counting to decide when to close the
// underlying connection object. The reference count gets incremented
// everytime get() is called and decremented everytime Close() is called
func newRefCountedDBConn(cfg *config.SQL) dbConn {
return dbConn{cfg: cfg}
}

// get returns a mysql db connection and increments a reference count
// this method will create a new connection, if an existing connection
// does not exist
func (c *dbConn) get() (sqldb.Interface, error) {
c.Lock()
defer c.Unlock()
if c.refCnt == 0 {
conn, err := storage.NewSQLDB(c.cfg)
if err != nil {
return nil, err
}
c.Interface = conn
}
c.refCnt++
return c, nil
}

// forceClose ignores reference counts and shutsdown the underlying connection pool
func (c *dbConn) forceClose() {
c.Lock()
defer c.Unlock()
if c.Interface != nil {
c.Interface.Close()
}
c.refCnt = 0
}

func (f *Factory) conn() (sqldb.Interface, error) {
f.RLock()
if f.db != nil {
f.RUnlock()
return f.db, nil
// Close closes the underlying connection if the reference count becomes zero
func (c *dbConn) Close() error {
c.Lock()
defer c.Unlock()
c.refCnt--
if c.refCnt == 0 {
err := c.Interface.Close()
c.Interface = nil
return err
}
f.RUnlock()
f.Lock()
defer f.Unlock()
var err error
f.db, err = storage.NewSQLDB(&f.cfg)
return f.db, err
return nil
}

0 comments on commit ade14e5

Please sign in to comment.