Skip to content

Commit

Permalink
feat: Lorry support PolarX (apecloud#5073)
Browse files Browse the repository at this point in the history
  • Loading branch information
xuriwuyun authored Sep 11, 2023
1 parent 786d9c4 commit 633d1d7
Show file tree
Hide file tree
Showing 6 changed files with 261 additions and 2 deletions.
2 changes: 2 additions & 0 deletions cmd/lorry/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import (
"github.com/apecloud/kubeblocks/lorry/binding/kafka"
"github.com/apecloud/kubeblocks/lorry/binding/mongodb"
"github.com/apecloud/kubeblocks/lorry/binding/mysql"
"github.com/apecloud/kubeblocks/lorry/binding/polardbx"
"github.com/apecloud/kubeblocks/lorry/binding/postgres"
"github.com/apecloud/kubeblocks/lorry/binding/redis"
"github.com/apecloud/kubeblocks/lorry/highavailability"
Expand All @@ -67,6 +68,7 @@ var (
func init() {
viper.AutomaticEnv()
bindingsLoader.DefaultRegistry.RegisterOutputBinding(mysql.NewMysql, "mysql")
bindingsLoader.DefaultRegistry.RegisterOutputBinding(polardbx.NewPolardbx, "polardbx")
bindingsLoader.DefaultRegistry.RegisterOutputBinding(etcd.NewEtcd, "etcd")
bindingsLoader.DefaultRegistry.RegisterOutputBinding(mongodb.NewMongoDB, "mongodb")
bindingsLoader.DefaultRegistry.RegisterOutputBinding(redis.NewRedis, "redis")
Expand Down
12 changes: 12 additions & 0 deletions config/lorry/components/binding_polarx.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: polardbx
spec:
type: bindings.polardbx
version: v1
metadata:
- name: url # Required, define DB connection in DSN format
value: "root:@tcp(127.0.0.1:3306)/mysql?multiStatements=true"
- name: maxOpenConns
value: "5"
127 changes: 127 additions & 0 deletions lorry/binding/polardbx/polardbx.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
Copyright (C) 2022-2023 ApeCloud Co., Ltd
This file is part of KubeBlocks project
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

package polardbx

import (
"context"

"github.com/dapr/components-contrib/bindings"
"github.com/dapr/kit/logger"

. "github.com/apecloud/kubeblocks/lorry/binding"
"github.com/apecloud/kubeblocks/lorry/component"
"github.com/apecloud/kubeblocks/lorry/component/polardbx"
. "github.com/apecloud/kubeblocks/lorry/util"
)

// PolardbxOperations represents polardbx output bindings.
type PolardbxOperations struct {
BaseOperations
}

type QueryRes []map[string]interface{}

// NewPolardbx returns a new polardbx output binding.
func NewPolardbx(logger logger.Logger) bindings.OutputBinding {
return &PolardbxOperations{BaseOperations: BaseOperations{Logger: logger}}
}

// Init initializes the polardbx binding.
func (polardbxOps *PolardbxOperations) Init(metadata bindings.Metadata) error {
polardbxOps.Logger.Debug("Initializing polardbx binding")
polardbxOps.BaseOperations.Init(metadata)
config, err := polardbx.NewConfig(metadata.Properties)
if err != nil {
polardbxOps.Logger.Errorf("polardbx config initialize failed: %v", err)
return err
}

var manager component.DBManager

manager, err = polardbx.NewManager(polardbxOps.Logger)
if err != nil {
polardbxOps.Logger.Errorf("polardbx DB Manager initialize failed: %v", err)
return err
}

polardbxOps.Manager = manager
polardbxOps.DBType = "polardbx"
// polardbxOps.InitIfNeed = polardbxOps.initIfNeed
polardbxOps.BaseOperations.GetRole = polardbxOps.GetRole
polardbxOps.DBPort = config.GetDBPort()

polardbxOps.RegisterOperationOnDBReady(GetRoleOperation, polardbxOps.GetRoleOps, manager)
polardbxOps.RegisterOperationOnDBReady(ExecOperation, polardbxOps.ExecOps, manager)
polardbxOps.RegisterOperationOnDBReady(QueryOperation, polardbxOps.QueryOps, manager)

return nil
}

func (polardbxOps *PolardbxOperations) GetRole(ctx context.Context, request *bindings.InvokeRequest, response *bindings.InvokeResponse) (string, error) {
manager := polardbxOps.Manager.(*polardbx.Manager)
return manager.GetRole(ctx)
}

func (polardbxOps *PolardbxOperations) GetRunningPort() int {
return 0
}

func (polardbxOps *PolardbxOperations) ExecOps(ctx context.Context, req *bindings.InvokeRequest, resp *bindings.InvokeResponse) (OpsResult, error) {
result := OpsResult{}
sql, ok := req.Metadata["sql"]
if !ok || sql == "" {
result["event"] = "ExecFailed"
result["message"] = ErrNoSQL
return result, nil
}

manager, _ := polardbxOps.Manager.(*polardbx.Manager)
count, err := manager.Exec(ctx, sql)
if err != nil {
polardbxOps.Logger.Infof("exec error: %v", err)
result["event"] = OperationFailed
result["message"] = err.Error()
} else {
result["event"] = OperationSuccess
result["count"] = count
}
return result, nil
}

func (polardbxOps *PolardbxOperations) QueryOps(ctx context.Context, req *bindings.InvokeRequest, resp *bindings.InvokeResponse) (OpsResult, error) {
result := OpsResult{}
sql, ok := req.Metadata["sql"]
if !ok || sql == "" {
result["event"] = OperationFailed
result["message"] = "no sql provided"
return result, nil
}
manager, _ := polardbxOps.Manager.(*polardbx.Manager)
data, err := manager.Query(ctx, sql)
if err != nil {
polardbxOps.Logger.Infof("Query error: %v", err)
result["event"] = OperationFailed
result["message"] = err.Error()
} else {
result["event"] = OperationSuccess
result["message"] = string(data)
}
return result, nil
}
2 changes: 0 additions & 2 deletions lorry/component/mysql/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,6 @@ func NewConfig(properties map[string]string) (*Config, error) {

if viper.IsSet("KB_SERVICE_USER") {
config.username = viper.GetString("KB_SERVICE_USER")
} else {
config.username = "root"
}

if viper.IsSet("KB_SERVICE_PASSWORD") {
Expand Down
41 changes: 41 additions & 0 deletions lorry/component/polardbx/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
Copyright (C) 2022-2023 ApeCloud Co., Ltd
This file is part of KubeBlocks project
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

package polardbx

import (
"github.com/apecloud/kubeblocks/lorry/component/mysql"
)

type Config struct {
*mysql.Config
}

var config *Config

func NewConfig(properties map[string]string) (*Config, error) {
mysqlConfig, err := mysql.NewConfig(properties)
if err != nil {
return nil, err
}
config = &Config{
Config: mysqlConfig,
}
return config, nil
}
79 changes: 79 additions & 0 deletions lorry/component/polardbx/manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
Copyright (C) 2022-2023 ApeCloud Co., Ltd
This file is part of KubeBlocks project
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

package polardbx

import (
"context"

"github.com/dapr/kit/logger"
"github.com/pkg/errors"

"github.com/apecloud/kubeblocks/lorry/component"
"github.com/apecloud/kubeblocks/lorry/component/mysql"
"github.com/apecloud/kubeblocks/lorry/util"
)

type Manager struct {
mysql.Manager
}

var _ component.DBManager = &Manager{}

func NewManager(logger logger.Logger) (*Manager, error) {
mysqlMgr, err := mysql.NewManager(logger)
if err != nil {
return nil, err
}
mgr := &Manager{
Manager: *mysqlMgr,
}

component.RegisterManager("polardbx", util.Consensus, mgr)
return mgr, nil
}

func (mgr *Manager) GetRole(ctx context.Context) (string, error) {
sql := "select role from information_schema.alisql_cluster_local"

rows, err := mgr.DB.QueryContext(ctx, sql)
if err != nil {
mgr.Logger.Infof("error executing %s: %v", sql, err)
return "", errors.Wrapf(err, "error executing %s", sql)
}

defer func() {
_ = rows.Close()
_ = rows.Err()
}()

var role string
var isReady bool
for rows.Next() {
if err = rows.Scan(&role); err != nil {
mgr.Logger.Errorf("Role query error: %v", err)
return role, err
}
isReady = true
}
if isReady {
return role, nil
}
return "", errors.Errorf("exec sql %s failed: no data returned", sql)
}

0 comments on commit 633d1d7

Please sign in to comment.