Skip to content

Commit

Permalink
optimize code
Browse files Browse the repository at this point in the history
  • Loading branch information
chris-sun-star committed May 25, 2023
1 parent a35d5bf commit 8e1b7c0
Show file tree
Hide file tree
Showing 10 changed files with 99 additions and 108 deletions.
3 changes: 1 addition & 2 deletions pkg/oceanbase/connector/ob_connect_properties.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,8 @@ func NewOceanbaseConnectProperties(address string, port int64, user, tenant, pas
func (p *OceanbaseConnectProperties) GetDSN() string {
if p.Database != "" {
return fmt.Sprintf("%s@%s:%s@tcp(%s:%d)/%s?multiStatements=true&interpolateParams=true", p.User, p.Tenant, p.Password, p.Address, p.Port, p.Database)
} else {
return fmt.Sprintf("%s@%s@tcp(%s:%d)/", p.User, p.Tenant, p.Address, p.Port)
}
return fmt.Sprintf("%s@%s@tcp(%s:%d)/", p.User, p.Tenant, p.Address, p.Port)
}

func (p *OceanbaseConnectProperties) HashValue() string {
Expand Down
4 changes: 1 addition & 3 deletions pkg/oceanbase/connector/ob_connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,8 @@ func (oc *OceanbaseConnector) Init() error {
if err != nil {
klog.Errorf("Open database connection %s failed: %v", dsn, err)
return errors.Wrap(err, "Init db connection")
} else {
oc.Client = db
// TODO: set connection pool properties to Client.DB
}
oc.Client = db
return nil
}

Expand Down
22 changes: 11 additions & 11 deletions pkg/oceanbase/connector/ob_connector_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@ See the Mulan PSL v2 for more details.
package connector

import (
"sync"

"github.com/pkg/errors"
"k8s.io/klog/v2"
"sync"
)

var oceanbaseConnectorManager *OceanbaseConnectorManager
Expand All @@ -38,15 +39,14 @@ func (ocm *OceanbaseConnectorManager) GetOceanbaseConnector(p *OceanbaseConnectP
connectorStored, loaded := ocm.Cache.Load(key)
if loaded && connectorStored.(*OceanbaseConnector).IsAlive() {
return connectorStored.(*OceanbaseConnector), nil
} else {
klog.Warningf("no connector or connector is not alive in cache with connect property: %s:%d %s", p.Address, p.Port, p.User)
connector := NewOceanbaseConnector(p)
err := connector.Init()
if err != nil {
klog.Errorf("init connector failed with connect property: %s:%d %s, %v", p.Address, p.Port, p.User, err)
return nil, errors.Wrap(err, "create oceanbase connector")
}
ocm.Cache.Store(key, connector)
return connector, nil
}
klog.Warningf("no connector or connector is not alive in cache with connect property: %s:%d %s", p.Address, p.Port, p.User)
connector := NewOceanbaseConnector(p)
err := connector.Init()
if err != nil {
klog.Errorf("init connector failed with connect property: %s:%d %s, %v", p.Address, p.Port, p.User, err)
return nil, errors.Wrap(err, "create oceanbase connector")
}
ocm.Cache.Store(key, connector)
return connector, nil
}
11 changes: 5 additions & 6 deletions pkg/oceanbase/operation/parameter.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,25 +14,24 @@ package operation

import (
"fmt"

"github.com/oceanbase/ob-operator/pkg/oceanbase/const/sql"
"github.com/oceanbase/ob-operator/pkg/oceanbase/param"
)

func (m *OceanbaseOperationManager) GetParameter(name string, scope *param.Scope) error {
if scope == nil {
return m.ExecWithDefaultTimeout(sql.QueryParameter, name)
} else {
queryParameterSql := fmt.Sprintf(sql.QueryParameterWithScope, scope.Name)
return m.ExecWithDefaultTimeout(queryParameterSql, name, scope.Value)
}
queryParameterSql := fmt.Sprintf(sql.QueryParameterWithScope, scope.Name)
return m.ExecWithDefaultTimeout(queryParameterSql, name, scope.Value)
}

func (m *OceanbaseOperationManager) SetParameter(name string, value interface{}, scope *param.Scope) error {
if scope == nil {
setParameterSql := fmt.Sprintf(sql.SetParameter, name)
return m.ExecWithDefaultTimeout(setParameterSql, value)
} else {
setParameterSql := fmt.Sprintf(sql.SetParameterWithScope, name, scope.Name)
return m.ExecWithDefaultTimeout(setParameterSql, value, scope.Value)
}
setParameterSql := fmt.Sprintf(sql.SetParameterWithScope, name, scope.Name)
return m.ExecWithDefaultTimeout(setParameterSql, value, scope.Value)
}
27 changes: 13 additions & 14 deletions pkg/resource/obcluster_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,22 +60,21 @@ func (m *OBClusterManager) GetTaskFlow() (*task.TaskFlow, error) {
if m.OBCluster.Status.OperationContext != nil {
m.Logger.Info("get task flow from obcluster status")
return task.NewTaskFlow(m.OBCluster.Status.OperationContext), nil
} else {
// newly created cluster
m.Logger.Info("create task flow according to obcluster status")
if m.OBCluster.Status.Status == clusterstatus.New {
taskFlow, err := task.GetRegistry().Get(flowname.CreateCluster)
if err != nil {
return nil, errors.Wrap(err, "Get create obcluster task flow")
}
return taskFlow, nil
}
// newly created cluster
m.Logger.Info("create task flow according to obcluster status")
if m.OBCluster.Status.Status == clusterstatus.New {
taskFlow, err := task.GetRegistry().Get(flowname.CreateCluster)
if err != nil {
return nil, errors.Wrap(err, "Get create obcluster task flow")
}
// scale observer
// scale obzone
// upgrade
// no need to execute task flow
return nil, nil
return taskFlow, nil
}
// scale observer
// scale obzone
// upgrade
// no need to execute task flow
return nil, nil
}

func (m *OBClusterManager) UpdateStatus() error {
Expand Down
3 changes: 1 addition & 2 deletions pkg/resource/obcluster_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,8 @@ func (m *OBClusterManager) WaitOBZoneBootstrapReady() error {
}
if allready {
return nil
} else {
time.Sleep(time.Second)
}
time.Sleep(time.Second)
}
return errors.New("all server still not bootstrap ready when timeout")
}
Expand Down
59 changes: 29 additions & 30 deletions pkg/resource/observer_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,38 +107,37 @@ func (m *OBServerManager) GetTaskFlow() (*task.TaskFlow, error) {
if m.OBServer.Status.OperationContext != nil {
m.Logger.Info("get task flow from observer status")
return task.NewTaskFlow(m.OBServer.Status.OperationContext), nil
} else {
// newly created observer
var taskFlow *task.TaskFlow
var err error
var obcluster *cloudv2alpha1.OBCluster

m.Logger.Info("create task flow according to observer status")
if m.OBServer.Status.Status == serverstatus.New {
obcluster, err = m.getOBCluster()
if err != nil {
return nil, errors.Wrap(err, "Get obcluster")
}
if obcluster.Status.Status == clusterstatus.New {
// created when create obcluster
m.Logger.Info("Create observer when create obcluster")
taskFlow, err = task.GetRegistry().Get(flowname.CreateServerForBootstrap)
} else {
// created normally
m.Logger.Info("Create observer when obcluster already exists")
taskFlow, err = task.GetRegistry().Get(flowname.CreateServer)
}
if err != nil {
return nil, errors.Wrap(err, "Get create observer task flow")
}
return taskFlow, nil
}
// newly created observer
var taskFlow *task.TaskFlow
var err error
var obcluster *cloudv2alpha1.OBCluster

m.Logger.Info("create task flow according to observer status")
if m.OBServer.Status.Status == serverstatus.New {
obcluster, err = m.getOBCluster()
if err != nil {
return nil, errors.Wrap(err, "Get obcluster")
}
// scale observer
// upgrade

// no need to execute task flow
return nil, nil
if obcluster.Status.Status == clusterstatus.New {
// created when create obcluster
m.Logger.Info("Create observer when create obcluster")
taskFlow, err = task.GetRegistry().Get(flowname.CreateServerForBootstrap)
} else {
// created normally
m.Logger.Info("Create observer when obcluster already exists")
taskFlow, err = task.GetRegistry().Get(flowname.CreateServer)
}
if err != nil {
return nil, errors.Wrap(err, "Get create observer task flow")
}
return taskFlow, nil
}
// scale observer
// upgrade

// no need to execute task flow
return nil, nil
}

func (m *OBServerManager) ClearTaskInfo() {
Expand Down
59 changes: 29 additions & 30 deletions pkg/resource/obzone_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,38 +62,37 @@ func (m *OBZoneManager) GetTaskFlow() (*task.TaskFlow, error) {
if m.OBZone.Status.OperationContext != nil {
m.Logger.Info("get task flow from obzone status")
return task.NewTaskFlow(m.OBZone.Status.OperationContext), nil
} else {
// newly created zone
var taskFlow *task.TaskFlow
var err error
var obcluster *cloudv2alpha1.OBCluster

m.Logger.Info("create task flow according to obzone status")
if m.OBZone.Status.Status == zonestatus.New {
obcluster, err = m.getOBCluster()
if err != nil {
return nil, errors.Wrap(err, "Get obcluster")
}
if obcluster.Status.Status == clusterstatus.New {
// created when create obcluster
m.Logger.Info("Create obzone when create obcluster")
taskFlow, err = task.GetRegistry().Get(flowname.CreateZoneForBootstrap)
} else {
// created normally
m.Logger.Info("Create obzone when obcluster already exists")
taskFlow, err = task.GetRegistry().Get(flowname.CreateZone)
}
if err != nil {
return nil, errors.Wrap(err, "Get create obzone task flow")
}
return taskFlow, nil
}
// newly created zone
var taskFlow *task.TaskFlow
var err error
var obcluster *cloudv2alpha1.OBCluster

m.Logger.Info("create task flow according to obzone status")
if m.OBZone.Status.Status == zonestatus.New {
obcluster, err = m.getOBCluster()
if err != nil {
return nil, errors.Wrap(err, "Get obcluster")
}
// scale observer
// upgrade

// no need to execute task flow
return nil, nil
if obcluster.Status.Status == clusterstatus.New {
// created when create obcluster
m.Logger.Info("Create obzone when create obcluster")
taskFlow, err = task.GetRegistry().Get(flowname.CreateZoneForBootstrap)
} else {
// created normally
m.Logger.Info("Create obzone when obcluster already exists")
taskFlow, err = task.GetRegistry().Get(flowname.CreateZone)
}
if err != nil {
return nil, errors.Wrap(err, "Get create obzone task flow")
}
return taskFlow, nil
}
// scale observer
// upgrade

// no need to execute task flow
return nil, nil
}

func (m *OBZoneManager) UpdateStatus() error {
Expand Down
3 changes: 1 addition & 2 deletions pkg/resource/obzone_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,8 @@ func (m *OBZoneManager) WaitOBServerBootstrapReady() error {
}
if allready {
return nil
} else {
time.Sleep(time.Second)
}
time.Sleep(time.Second)
}
return errors.New("all server still not bootstrap ready when timeout")
}
Expand Down
16 changes: 8 additions & 8 deletions pkg/task/task_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@ package task

import (
"fmt"
"sync"

"github.com/go-logr/logr"
"github.com/google/uuid"
taskstatus "github.com/oceanbase/ob-operator/pkg/task/const/task/status"
"github.com/pkg/errors"
"sync"
)

var taskManager *TaskManager
Expand Down Expand Up @@ -71,13 +72,12 @@ func (m *TaskManager) GetTaskResult(taskId string) (*TaskResult, error) {
if !exists {
// m.Logger.Info("Query a task id that's not exists", "task id", taskId)
return nil, errors.New(fmt.Sprintf("Task %s not exists", taskId))
} else {
select {
case result := <-retCh:
return result, nil
default:
return nil, nil
}
}
select {
case result := <-retCh:
return result, nil
default:
return nil, nil
}
}

Expand Down

0 comments on commit 8e1b7c0

Please sign in to comment.