Skip to content

Commit

Permalink
optimization code
Browse files Browse the repository at this point in the history
  • Loading branch information
dk-lockdown committed Dec 15, 2021
1 parent 7d18bf1 commit 0b5cf11
Show file tree
Hide file tree
Showing 7 changed files with 86 additions and 72 deletions.
11 changes: 9 additions & 2 deletions cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package main

import (
"encoding/json"
"github.com/dubbogo/arana/pkg/proto"
"os"
)

Expand Down Expand Up @@ -59,12 +60,18 @@ var (

Run: func(cmd *cobra.Command, args []string) {
conf := config.Load(configPath)

executors := make(map[string]proto.Executor)
for _, executorConf := range conf.Executors {
executor := executor.NewRedirectExecutor(executorConf)
executors[executorConf.Name] = executor
}

listener, err := mysql.NewListener(conf.Listeners[0])
if err != nil {
panic(err)
}
exec := executor.NewRedirectExecutor()
listener.SetExecutor(exec)
listener.SetExecutor(executors[conf.Listeners[0].Executor])

resource.InitDataSourceManager(conf.DataSources, func(config json.RawMessage) pools.Factory {
collector, err := mysql.NewConnector(config)
Expand Down
9 changes: 7 additions & 2 deletions docker/conf/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,13 @@ listeners:
users:
dksl: "123456"
server_version: 5.7.0
master_data_source:
- employees
executor: redirect

executors:
- name: redirect
mode: singledb
data_sources:
- master: employees

data_source_cluster:
- role: master
Expand Down
5 changes: 1 addition & 4 deletions makefile
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,9 @@ integration-test: build docker-build
@sleep 30
@go clean -testcache
go test -tags integration -v ./test/...
docker-compose -f docker/docker-compose.yaml down
@rm -rf dist
@rm -rf docker/data
@rm -rf docker/mysqld

clean:
docker-compose -f docker/docker-compose.yaml down
@rm -rf coverage.txt
@rm -rf dist
@rm -rf docker/data
Expand Down
56 changes: 17 additions & 39 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"bytes"
"encoding/json"
"fmt"
"github.com/dubbogo/arana/pkg/proto"
"io/ioutil"
"path/filepath"
"time"
Expand All @@ -41,16 +42,17 @@ import (
type Configuration struct {
Listeners []*Listener `yaml:"listeners" json:"listeners"`

Executors []*Executor `yaml:"executors" json:"executors"`

Filters []*Filter `yaml:"filters" json:"filters"`

DataSources []*DataSource `yaml:"data_source_cluster" json:"data_source_cluster"`
}

type (
// ProtocolType protocol type enum
ProtocolType int32

// ExecutorMode executor mode enum
ExecutorMode int32

// SocketAddress specify either a logical or physical address and port, which are
// used to tell server where to bind/listen, connect to upstream and find
// management servers
Expand All @@ -64,18 +66,25 @@ type (
Config json.RawMessage `json:"config,omitempty"`
}

DataSourceGroup struct {
Master string `yaml:"master" json:"master"`
Slaves []string `yaml:"slaves,omitempty" json:"slaves,omitempty"`
}

Executor struct {
Mode string `json:"mode,omitempty"`
ProcessDistributedTransaction bool `json:"process_distributed_transaction,omitempty"`
Config json.RawMessage `json:"config,omitempty"`
Name string `yaml:"name" json:"name"`
Mode proto.ExecuteMode `yaml:"mode" json:"mode"`
DataSources []*DataSourceGroup `yaml:"data_sources" json:"data_sources"`
Filters []string `yaml:"filters" json:"filters"`
ProcessDistributedTransaction bool `yaml:"process_distributed_transaction,omitempty" json:"process_distributed_transaction,omitempty"`
}

Listener struct {
ProtocolType ProtocolType `yaml:"protocol_type" json:"protocol_type"`
SocketAddress SocketAddress `yaml:"socket_address" json:"socket_address"`
Filters []*Filter `yaml:"filters" json:"filters"`
Filters []string `yaml:"filters" json:"filters"`
Config json.RawMessage `yaml:"config" json:"config"`
Executor Executor `yaml:"executor" json:"executor"`
Executor string `yaml:"executor" json:"executor"`
}
)

Expand All @@ -84,12 +93,6 @@ const (
Mysql
)

const (
SingleDB ExecutorMode = iota
ReadWriteSplitting
Sharding
)

func (t *ProtocolType) UnmarshalText(text []byte) error {
if t == nil {
return errors.New("can't unmarshal a nil *ProtocolType")
Expand All @@ -113,31 +116,6 @@ func (t *ProtocolType) unmarshalText(text []byte) bool {
return true
}

func (m *ExecutorMode) UnmarshalText(text []byte) error {
if m == nil {
return errors.New("can't unmarshal a nil *ExecutorMode")
}
if !m.unmarshalText(bytes.ToLower(text)) {
return fmt.Errorf("unrecognized executor mode: %q", text)
}
return nil
}

func (m *ExecutorMode) unmarshalText(text []byte) bool {
executorMode := string(text)
switch executorMode {
case "singledb":
*m = SingleDB
case "readwritesplitting":
*m = ReadWriteSplitting
case "sharding":
*m = Sharding
default:
return false
}
return true
}

func parse(path string) *Configuration {
log.Infof("load config from : %s", path)
content, err := ioutil.ReadFile(path)
Expand Down
29 changes: 18 additions & 11 deletions pkg/executor/redirect.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
)

import (
"github.com/dubbogo/arana/pkg/config"
"github.com/dubbogo/arana/pkg/mysql"
"github.com/dubbogo/arana/pkg/proto"
"github.com/dubbogo/arana/pkg/resource"
Expand All @@ -38,33 +39,39 @@ import (
)

type RedirectExecutor struct {
mode proto.ExecuteMode
preFilters []proto.PreFilter
postFilters []proto.PostFilter
dataSources []*config.DataSourceGroup
localTransactionMap map[uint32]pools.Resource
}

func NewRedirectExecutor() proto.Executor {
func NewRedirectExecutor(conf *config.Executor) proto.Executor {
return &RedirectExecutor{
mode: conf.Mode,
dataSources: conf.DataSources,
localTransactionMap: make(map[uint32]pools.Resource, 0),
}
}

func (executor *RedirectExecutor) AddPreFilter(filter proto.PreFilter) {

executor.preFilters = append(executor.preFilters, filter)
}

func (executor *RedirectExecutor) AddPostFilter(filter proto.PostFilter) {

executor.postFilters = append(executor.postFilters, filter)
}

func (executor *RedirectExecutor) GetPreFilters() []proto.PreFilter {
return nil
return executor.preFilters
}

func (executor *RedirectExecutor) GetPostFilter() []proto.PostFilter {
return nil
return executor.postFilters
}

func (executor *RedirectExecutor) ExecuteMode() proto.ExecuteMode {
return 0
return executor.mode
}

func (executor *RedirectExecutor) ProcessDistributedTransaction() bool {
Expand All @@ -81,7 +88,7 @@ func (executor *RedirectExecutor) InGlobalTransaction(ctx *proto.Context) bool {
}

func (executor *RedirectExecutor) ExecuteUseDB(ctx *proto.Context) error {
resourcePool := resource.GetDataSourceManager().GetMasterResourcePool(ctx.MasterDataSource[0])
resourcePool := resource.GetDataSourceManager().GetMasterResourcePool(executor.dataSources[0].Master)
r, err := resourcePool.Get(ctx)
defer func() {
resourcePool.Put(r)
Expand All @@ -98,7 +105,7 @@ func (executor *RedirectExecutor) ExecuteFieldList(ctx *proto.Context) ([]proto.
index := bytes.IndexByte(ctx.Data, 0x00)
table := string(ctx.Data[0:index])
wildcard := string(ctx.Data[index+1:])
resourcePool := resource.GetDataSourceManager().GetMasterResourcePool(ctx.MasterDataSource[0])
resourcePool := resource.GetDataSourceManager().GetMasterResourcePool(executor.dataSources[0].Master)
r, err := resourcePool.Get(ctx)
defer func() {
resourcePool.Put(r)
Expand Down Expand Up @@ -127,7 +134,7 @@ func (executor *RedirectExecutor) ExecutorComQuery(ctx *proto.Context) (proto.Re
}
log.Debugf("ComQuery: %s", query)

resourcePool := resource.GetDataSourceManager().GetMasterResourcePool(ctx.MasterDataSource[0])
resourcePool := resource.GetDataSourceManager().GetMasterResourcePool(executor.dataSources[0].Master)
switch act.(type) {
case *ast.BeginStmt:
r, err = resourcePool.Get(ctx)
Expand Down Expand Up @@ -167,7 +174,7 @@ func (executor *RedirectExecutor) ExecutorComPrepareExecute(ctx *proto.Context)
var err error
r, ok := executor.localTransactionMap[ctx.ConnectionID]
if !ok {
resourcePool := resource.GetDataSourceManager().GetMasterResourcePool(ctx.MasterDataSource[0])
resourcePool := resource.GetDataSourceManager().GetMasterResourcePool(executor.dataSources[0].Master)
r, err = resourcePool.Get(ctx)
defer func() {
resourcePool.Put(r)
Expand All @@ -188,7 +195,7 @@ func (executor *RedirectExecutor) ExecutorComPrepareExecute(ctx *proto.Context)
}

func (executor *RedirectExecutor) ConnectionClose(ctx *proto.Context) {
resourcePool := resource.GetDataSourceManager().GetMasterResourcePool(ctx.MasterDataSource[0])
resourcePool := resource.GetDataSourceManager().GetMasterResourcePool(executor.dataSources[0].Master)
r, ok := executor.localTransactionMap[ctx.ConnectionID]
if ok {
defer func() {
Expand Down
9 changes: 0 additions & 9 deletions pkg/mysql/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,6 @@ const initClientConnStatus = mysql.ServerStatusAutocommit
type ServerConfig struct {
Users map[string]string `yaml:"users" json:"users"`
ServerVersion string `yaml:"server_version" json:"server_version"`
MetaDataSource string `yaml:"meta_data_source" json:"meta_data_source"`
MasterDataSource []string `yaml:"master_data_source" json:"master_data_source"`
SlaveDataSource []string `yaml:"slave_data_source" json:"slave_data_source"`
}

type Listener struct {
Expand Down Expand Up @@ -159,9 +156,6 @@ func (l *Listener) handle(conn net.Conn, connectionID uint32) {
l.executor.ConnectionClose(&proto.Context{
Context: context.Background(),
ConnectionID: l.connectionID,
MasterDataSource: l.conf.MasterDataSource,
SlaveDataSource: l.conf.SlaveDataSource,
MetaDataSource: l.conf.MetaDataSource,
})
}()

Expand Down Expand Up @@ -193,9 +187,6 @@ func (l *Listener) handle(conn net.Conn, connectionID uint32) {
Context: context.Background(),
ConnectionID: l.connectionID,
Data: data,
MasterDataSource: l.conf.MasterDataSource,
SlaveDataSource: l.conf.SlaveDataSource,
MetaDataSource: l.conf.MetaDataSource,
}
err = l.ExecuteCommand(c, ctx)
if err != nil {
Expand Down
39 changes: 34 additions & 5 deletions pkg/proto/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@
package proto

import (
"bytes"
"context"
"fmt"
"github.com/pkg/errors"
)

import (
Expand All @@ -40,10 +43,6 @@ type (
Data []byte

Stmt *Stmt

MetaDataSource string
MasterDataSource []string
SlaveDataSource []string
}

Listener interface {
Expand All @@ -61,7 +60,7 @@ type (

// PostFilter
PostFilter interface {
PostHandle(ctx Context)
PostHandle(ctx Context, result Result)
}

// Executor
Expand Down Expand Up @@ -101,3 +100,33 @@ type (
GetMetaResourcePool(name string) *pools.ResourcePool
}
)

const (
SingleDB ExecuteMode = iota
ReadWriteSplitting
Sharding
)

func (m *ExecuteMode) UnmarshalText(text []byte) error {
if m == nil {
return errors.New("can't unmarshal a nil *ExecuteMode")
}
if !m.unmarshalText(bytes.ToLower(text)) {
return fmt.Errorf("unrecognized execute mode: %q", text)
}
return nil
}

func (m *ExecuteMode) unmarshalText(text []byte) bool {
switch string(text) {
case "singledb":
*m = SingleDB
case "readwritesplitting":
*m = ReadWriteSplitting
case "sharding":
*m = Sharding
default:
return false
}
return true
}

0 comments on commit 0b5cf11

Please sign in to comment.