diff --git a/pkg/database/connector.go b/pkg/database/connector.go new file mode 100644 index 000000000..4df11eff4 --- /dev/null +++ b/pkg/database/connector.go @@ -0,0 +1,80 @@ +/* +Copyright (c) 2023 OceanBase +ob-operator is licensed under Mulan PSL v2. +You can use this software according to the terms and conditions of the Mulan PSL v2. +You may obtain a copy of Mulan PSL v2 at: + http://license.coscl.org.cn/MulanPSL2 +THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, +EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, +MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. +See the Mulan PSL v2 for more details. +*/ + +package database + +import ( + // register mysql driver + _ "github.com/go-sql-driver/mysql" + "github.com/jmoiron/sqlx" +) + +// Connector represents a connection pool. +type Connector struct { + ds DataSource + client *Client +} + +// DataSource represents a data source. +type DataSource interface { + ID() string + DriverName() string + DataSourceName() string + String() string +} + +// Client represents a wrapper around sqlx.DB. +type Client struct { + *sqlx.DB +} + +func (c *Client) configure() { + c.SetMaxOpenConns(DefaultConnMaxOpenCount) + c.SetMaxIdleConns(DefaultConnMaxIdleCount) + c.SetConnMaxLifetime(DefaultConnMaxLifetime) + c.SetConnMaxIdleTime(DefaultConnMaxIdleTime) +} + +func NewConnector(dataSource DataSource) *Connector { + return &Connector{ + ds: dataSource, + } +} + +func (c *Connector) Init() error { + db, err := sqlx.Connect(c.ds.DriverName(), c.ds.DataSourceName()) + if err != nil { + return err + } + c.client = &Client{db} + c.client.configure() + return nil +} + +func (c *Connector) IsAlive() bool { + if c.client == nil { + return false + } + err := c.client.Ping() + return err == nil +} + +func (c *Connector) GetClient() *Client { + return c.client +} + +func (c *Connector) Close() error { + if c.client.DB != nil { + return c.client.DB.Close() + } + return nil +} diff --git a/pkg/database/const.go b/pkg/database/const.go new file mode 100644 index 000000000..a60a412b1 --- /dev/null +++ b/pkg/database/const.go @@ -0,0 +1,8 @@ +package database + +const ( + DefaultConnMaxOpenCount = 20 + DefaultConnMaxIdleCount = 1 + DefaultConnMaxLifetime = 0 + DefaultConnMaxIdleTime = 0 +) diff --git a/pkg/database/pool.go b/pkg/database/pool.go new file mode 100644 index 000000000..9c909ab2a --- /dev/null +++ b/pkg/database/pool.go @@ -0,0 +1,41 @@ +/* +Copyright (c) 2023 OceanBase +ob-operator is licensed under Mulan PSL v2. +You can use this software according to the terms and conditions of the Mulan PSL v2. +You may obtain a copy of Mulan PSL v2 at: + http://license.coscl.org.cn/MulanPSL2 +THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, +EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, +MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. +See the Mulan PSL v2 for more details. +*/ + +package database + +import ( + "sync" + + "github.com/pkg/errors" +) + +var p = &pool{} + +type pool struct { + // TODO: maintain cache size and data expiration, maybe change to a cache library. + Cache sync.Map +} + +func GetConnector(dataSource DataSource) (*Connector, error) { + c, ok := p.Cache.Load(dataSource.ID()) + if ok && c.(*Connector).IsAlive() { + return c.(*Connector), nil + } + connector := NewConnector(dataSource) + err := connector.Init() + if err != nil { + err = errors.Wrap(err, "init connector failed with datasource: "+dataSource.String()) + return nil, err + } + p.Cache.Store(dataSource.ID(), connector) + return connector, nil +} diff --git a/pkg/oceanbase/connector/connection_pool_config.go b/pkg/oceanbase/connector/connection_pool_config.go deleted file mode 100644 index 88c82f204..000000000 --- a/pkg/oceanbase/connector/connection_pool_config.go +++ /dev/null @@ -1,24 +0,0 @@ -/* -Copyright (c) 2023 OceanBase -ob-operator is licensed under Mulan PSL v2. -You can use this software according to the terms and conditions of the Mulan PSL v2. -You may obtain a copy of Mulan PSL v2 at: - http://license.coscl.org.cn/MulanPSL2 -THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, -EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, -MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. -See the Mulan PSL v2 for more details. -*/ - -package connector - -import ( - "time" -) - -type ConnectionPoolConfig struct { - MaxOpenConns int - MaxIdleConns int - ConnMaxIdleTime time.Duration - ConnMaxLifetime time.Duration -} diff --git a/pkg/oceanbase/connector/connector.go b/pkg/oceanbase/connector/connector.go deleted file mode 100644 index 5295872e2..000000000 --- a/pkg/oceanbase/connector/connector.go +++ /dev/null @@ -1,31 +0,0 @@ -/* -Copyright (c) 2023 OceanBase -ob-operator is licensed under Mulan PSL v2. -You can use this software according to the terms and conditions of the Mulan PSL v2. -You may obtain a copy of Mulan PSL v2 at: - http://license.coscl.org.cn/MulanPSL2 -THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, -EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, -MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. -See the Mulan PSL v2 for more details. -*/ - -package connector - -import ( - "github.com/jmoiron/sqlx" -) - -type Connector interface { - // actually init database connection - Init() error - - // check whether the current connector is usable - IsAlive() bool - - // get sqlx.DB object as client - GetClient() *sqlx.DB - - // close db connections - Close() error -} diff --git a/pkg/oceanbase/connector/connector_cache.go b/pkg/oceanbase/connector/connector_cache.go deleted file mode 100644 index c52e2f192..000000000 --- a/pkg/oceanbase/connector/connector_cache.go +++ /dev/null @@ -1,13 +0,0 @@ -/* -Copyright (c) 2023 OceanBase -ob-operator is licensed under Mulan PSL v2. -You can use this software according to the terms and conditions of the Mulan PSL v2. -You may obtain a copy of Mulan PSL v2 at: - http://license.coscl.org.cn/MulanPSL2 -THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, -EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, -MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. -See the Mulan PSL v2 for more details. -*/ - -package connector diff --git a/pkg/oceanbase/connector/const.go b/pkg/oceanbase/connector/const.go deleted file mode 100644 index 8871e700d..000000000 --- a/pkg/oceanbase/connector/const.go +++ /dev/null @@ -1,15 +0,0 @@ -/* -Copyright (c) 2023 OceanBase -ob-operator is licensed under Mulan PSL v2. -You can use this software according to the terms and conditions of the Mulan PSL v2. -You may obtain a copy of Mulan PSL v2 at: - http://license.coscl.org.cn/MulanPSL2 -THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, -EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, -MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. -See the Mulan PSL v2 for more details. -*/ - -package connector - -const DRIVER_MYSQL = "mysql" diff --git a/pkg/oceanbase/connector/datasource.go b/pkg/oceanbase/connector/datasource.go new file mode 100644 index 000000000..dedbfe920 --- /dev/null +++ b/pkg/oceanbase/connector/datasource.go @@ -0,0 +1,69 @@ +/* +Copyright (c) 2023 OceanBase +ob-operator is licensed under Mulan PSL v2. +You can use this software according to the terms and conditions of the Mulan PSL v2. +You may obtain a copy of Mulan PSL v2 at: + http://license.coscl.org.cn/MulanPSL2 +THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, +EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, +MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. +See the Mulan PSL v2 for more details. +*/ + +package connector + +import ( + "crypto/md5" + "encoding/hex" + "fmt" +) + +// OceanBaseDataSource implements the database.DataSource interface for OceanBase. +type OceanBaseDataSource struct { + Address string + Port int64 + User string + Tenant string + Password string + Database string +} + +func NewOceanBaseDataSource(address string, port int64, user, tenant, password, database string) *OceanBaseDataSource { + return &OceanBaseDataSource{ + Address: address, + Port: port, + User: user, + Tenant: tenant, + Password: password, + Database: database, + } +} + +func (*OceanBaseDataSource) DriverName() string { + return "mysql" +} + +func (s *OceanBaseDataSource) DataSourceName() string { + passwordPart := "" + if s.Password != "" { + passwordPart = fmt.Sprintf(":%s", s.Password) + } + if s.Database != "" { + return fmt.Sprintf("%s@%s%s@tcp(%s:%d)/%s?multiStatements=true&interpolateParams=true", s.User, s.Tenant, passwordPart, s.Address, s.Port, s.Database) + } + return fmt.Sprintf("%s@%s@tcp(%s:%d)/", s.User, s.Tenant, s.Address, s.Port) +} + +func (s *OceanBaseDataSource) ID() string { + h := md5.New() + key := fmt.Sprintf("%s@%s@%s:%d/%s", s.User, s.Tenant, s.Address, s.Port, s.Database) + _, err := h.Write([]byte(key)) + if err != nil { + return key + } + return hex.EncodeToString(h.Sum(nil)) +} + +func (s *OceanBaseDataSource) String() string { + return fmt.Sprintf("address: %s, port: %d, user: %s, tenant: %s, database: %s", s.Address, s.Port, s.User, s.Tenant, s.Database) +} diff --git a/pkg/oceanbase/connector/ob_connect_properties.go b/pkg/oceanbase/connector/ob_connect_properties.go deleted file mode 100644 index 82f925840..000000000 --- a/pkg/oceanbase/connector/ob_connect_properties.go +++ /dev/null @@ -1,57 +0,0 @@ -/* -Copyright (c) 2023 OceanBase -ob-operator is licensed under Mulan PSL v2. -You can use this software according to the terms and conditions of the Mulan PSL v2. -You may obtain a copy of Mulan PSL v2 at: - http://license.coscl.org.cn/MulanPSL2 -THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, -EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, -MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. -See the Mulan PSL v2 for more details. -*/ - -package connector - -import ( - "crypto/md5" - "encoding/hex" - "fmt" -) - -type OceanbaseConnectProperties struct { - Address string - Port int64 - User string - Tenant string - Password string - Database string -} - -func NewOceanbaseConnectProperties(address string, port int64, user, tenant, password, database string) *OceanbaseConnectProperties { - return &OceanbaseConnectProperties{ - Address: address, - Port: port, - User: user, - Tenant: tenant, - Password: password, - Database: database, - } -} - -func (p *OceanbaseConnectProperties) GetDSN() string { - passwordPart := "" - if p.Password != "" { - passwordPart = fmt.Sprintf(":%s", p.Password) - } - if p.Database != "" { - return fmt.Sprintf("%s@%s%s@tcp(%s:%d)/%s?multiStatements=true&interpolateParams=true", p.User, p.Tenant, passwordPart, p.Address, p.Port, p.Database) - } - return fmt.Sprintf("%s@%s@tcp(%s:%d)/", p.User, p.Tenant, p.Address, p.Port) -} - -func (p *OceanbaseConnectProperties) HashValue() string { - hasher := md5.New() - key := fmt.Sprintf("%s@%s@%s:%d/%s", p.User, p.Tenant, p.Address, p.Port, p.Database) - hasher.Write([]byte(key)) - return hex.EncodeToString(hasher.Sum(nil)) -} diff --git a/pkg/oceanbase/connector/ob_connector.go b/pkg/oceanbase/connector/ob_connector.go deleted file mode 100644 index ccdc276e7..000000000 --- a/pkg/oceanbase/connector/ob_connector.go +++ /dev/null @@ -1,64 +0,0 @@ -/* -Copyright (c) 2023 OceanBase -ob-operator is licensed under Mulan PSL v2. -You can use this software according to the terms and conditions of the Mulan PSL v2. -You may obtain a copy of Mulan PSL v2 at: - http://license.coscl.org.cn/MulanPSL2 -THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, -EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, -MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. -See the Mulan PSL v2 for more details. -*/ - -package connector - -import ( - _ "github.com/go-sql-driver/mysql" - "github.com/jmoiron/sqlx" - "github.com/pkg/errors" -) - -// oceanbase connector, support mysql mode only -type OceanbaseConnector struct { - ConnectProperties *OceanbaseConnectProperties - Client *sqlx.DB - PoolConfig *ConnectionPoolConfig -} - -func NewOceanbaseConnector(p *OceanbaseConnectProperties) *OceanbaseConnector { - return &OceanbaseConnector{ - ConnectProperties: p, - Client: nil, - } -} - -func (oc *OceanbaseConnector) Init() error { - var err error - var db *sqlx.DB - dsn := oc.ConnectProperties.GetDSN() - db, err = sqlx.Connect(DRIVER_MYSQL, dsn) - if err != nil { - return errors.Wrapf(err, "Init db connection %s", dsn) - } - oc.Client = db - return nil -} - -func (oc *OceanbaseConnector) IsAlive() bool { - err := oc.Client.Ping() - if err != nil { - return false - } - return true -} - -func (oc *OceanbaseConnector) GetClient() *sqlx.DB { - return oc.Client -} - -func (oc *OceanbaseConnector) Close() error { - if oc.Client.DB != nil { - return oc.Client.DB.Close() - } - return nil -} diff --git a/pkg/oceanbase/connector/ob_connector_manager.go b/pkg/oceanbase/connector/ob_connector_manager.go deleted file mode 100644 index 440775510..000000000 --- a/pkg/oceanbase/connector/ob_connector_manager.go +++ /dev/null @@ -1,57 +0,0 @@ -/* -Copyright (c) 2023 OceanBase -ob-operator is licensed under Mulan PSL v2. -You can use this software according to the terms and conditions of the Mulan PSL v2. -You may obtain a copy of Mulan PSL v2 at: - http://license.coscl.org.cn/MulanPSL2 -THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, -EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, -MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. -See the Mulan PSL v2 for more details. -*/ - -package connector - -import ( - "context" - "sync" - - "github.com/go-logr/logr" - "github.com/pkg/errors" -) - -var oceanbaseConnectorManager *OceanbaseConnectorManager -var oceanbaseConnectorManagerCreateOnce sync.Once - -type OceanbaseConnectorManager struct { - // TODO: maintain cache size and data experiation, maybe change to a cache library - Cache sync.Map - Logger *logr.Logger -} - -func GetOceanbaseConnectorManager() *OceanbaseConnectorManager { - oceanbaseConnectorManagerCreateOnce.Do(func() { - logger := logr.FromContextOrDiscard(context.TODO()) - oceanbaseConnectorManager = &OceanbaseConnectorManager{ - Logger: &logger, - } - }) - return oceanbaseConnectorManager -} - -func (m *OceanbaseConnectorManager) GetOceanbaseConnector(p *OceanbaseConnectProperties) (*OceanbaseConnector, error) { - key := p.HashValue() - connectorStored, loaded := m.Cache.Load(key) - if loaded && connectorStored.(*OceanbaseConnector).IsAlive() { - return connectorStored.(*OceanbaseConnector), nil - } - m.Logger.Info("no connector or connector is not alive in cache with connect property", "address", p.Address, "port", p.Port, "user", p.User) - connector := NewOceanbaseConnector(p) - err := connector.Init() - if err != nil { - m.Logger.Error(err, "init connector failed with connect property", "address", p.Address, "port", p.Port, "user", p.User) - return nil, errors.Wrap(err, "create oceanbase connector") - } - m.Cache.Store(key, connector) - return connector, nil -} diff --git a/pkg/oceanbase/operation/manager.go b/pkg/oceanbase/operation/manager.go index 74218ac5f..9214d0e4e 100644 --- a/pkg/oceanbase/operation/manager.go +++ b/pkg/oceanbase/operation/manager.go @@ -17,17 +17,18 @@ import ( "time" "github.com/go-logr/logr" + "github.com/oceanbase/ob-operator/pkg/database" "github.com/oceanbase/ob-operator/pkg/oceanbase/connector" "github.com/oceanbase/ob-operator/pkg/oceanbase/const/config" "github.com/pkg/errors" ) type OceanbaseOperationManager struct { - Connector *connector.OceanbaseConnector + Connector *database.Connector Logger *logr.Logger } -func NewOceanbaseOperationManager(connector *connector.OceanbaseConnector) *OceanbaseOperationManager { +func NewOceanbaseOperationManager(connector *database.Connector) *OceanbaseOperationManager { logger := logr.FromContextOrDiscard(context.TODO()) return &OceanbaseOperationManager{ Connector: connector, @@ -35,10 +36,10 @@ func NewOceanbaseOperationManager(connector *connector.OceanbaseConnector) *Ocea } } -func GetOceanbaseOperationManager(p *connector.OceanbaseConnectProperties) (*OceanbaseOperationManager, error) { - connector, err := connector.GetOceanbaseConnectorManager().GetOceanbaseConnector(p) +func GetOceanbaseOperationManager(p *connector.OceanBaseDataSource) (*OceanbaseOperationManager, error) { + connector, err := database.GetConnector(p) if err != nil { - return nil, errors.Wrap(err, "Get OceanBase connector") + return nil, err } return NewOceanbaseOperationManager(connector), nil } @@ -46,7 +47,7 @@ func GetOceanbaseOperationManager(p *connector.OceanbaseConnectProperties) (*Oce func (m *OceanbaseOperationManager) ExecWithTimeout(timeout time.Duration, sql string, params ...interface{}) error { ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() - _, err := m.Connector.Client.ExecContext(ctx, sql, params...) + _, err := m.Connector.GetClient().ExecContext(ctx, sql, params...) if err != nil { m.Logger.Error(errors.Wrapf(err, "sql %s, param %v", sql, params), "Execute sql") return errors.Wrap(err, "Execute sql") @@ -59,7 +60,7 @@ func (m *OceanbaseOperationManager) ExecWithDefaultTimeout(sql string, params .. } func (m *OceanbaseOperationManager) QueryRow(ret interface{}, sql string, params ...interface{}) error { - err := m.Connector.Client.Get(ret, sql, params...) + err := m.Connector.GetClient().Get(ret, sql, params...) if err != nil { err = errors.Wrapf(err, "sql %s, param %v", sql, params) m.Logger.Error(err, "Query row") @@ -68,7 +69,7 @@ func (m *OceanbaseOperationManager) QueryRow(ret interface{}, sql string, params } func (m *OceanbaseOperationManager) QueryList(ret interface{}, sql string, params ...interface{}) error { - err := m.Connector.Client.Select(ret, sql, params...) + err := m.Connector.GetClient().Select(ret, sql, params...) if err != nil { err = errors.Wrapf(err, "sql %s, param %v", sql, params) m.Logger.Error(err, "Query list") diff --git a/pkg/resource/util.go b/pkg/resource/util.go index c5b809298..63e2e97f6 100644 --- a/pkg/resource/util.go +++ b/pkg/resource/util.go @@ -39,24 +39,24 @@ func GetOceanbaseOperationManagerFromOBCluster(c client.Client, obcluster *v1alp return nil, errors.Wrapf(err, "No observer belongs to cluster %s", obcluster.Name) } - var p *connector.OceanbaseConnectProperties + var s *connector.OceanBaseDataSource for _, observer := range observerList.Items { address := observer.Status.PodIp switch obcluster.Status.Status { case clusterstatus.New: - p = connector.NewOceanbaseConnectProperties(address, oceanbaseconst.SqlPort, oceanbaseconst.RootUser, oceanbaseconst.SysTenant, "", "") + s = connector.NewOceanBaseDataSource(address, oceanbaseconst.SqlPort, oceanbaseconst.RootUser, oceanbaseconst.SysTenant, "", "") case clusterstatus.Bootstrapped: - p = connector.NewOceanbaseConnectProperties(address, oceanbaseconst.SqlPort, oceanbaseconst.RootUser, oceanbaseconst.SysTenant, "", oceanbaseconst.DefaultDatabase) + s = connector.NewOceanBaseDataSource(address, oceanbaseconst.SqlPort, oceanbaseconst.RootUser, oceanbaseconst.SysTenant, "", oceanbaseconst.DefaultDatabase) default: // TODO use user operator and read password from secret password, err := ReadPassword(c, obcluster.Namespace, obcluster.Spec.UserSecrets.Operator) if err != nil { return nil, errors.Wrapf(err, "Get oceanbase operation manager of cluster %s", obcluster.Name) } - p = connector.NewOceanbaseConnectProperties(address, oceanbaseconst.SqlPort, oceanbaseconst.OperatorUser, oceanbaseconst.SysTenant, password, oceanbaseconst.DefaultDatabase) + s = connector.NewOceanBaseDataSource(address, oceanbaseconst.SqlPort, oceanbaseconst.OperatorUser, oceanbaseconst.SysTenant, password, oceanbaseconst.DefaultDatabase) } // if err is nil, db connection is already checked available - oceanbaseOperationManager, err := operation.GetOceanbaseOperationManager(p) + oceanbaseOperationManager, err := operation.GetOceanbaseOperationManager(s) if err == nil { return oceanbaseOperationManager, nil }