Skip to content

Commit

Permalink
feat: implement simple sharding over runtime API (arana-db#74)
Browse files Browse the repository at this point in the history
* [WIP] feat: implement simple sharding over runtime API

* fix: split init sql files

* refactor: rename xxcontext and xxast

* fix: pr review
  • Loading branch information
jjeffcaii authored Mar 9, 2022
1 parent f5b6833 commit 03f1778
Show file tree
Hide file tree
Showing 52 changed files with 1,957 additions and 479 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
# Output of the go coverage tool, specifically when used with LiteIDE
*.out

/coverage.txt

# Dependency directories (remove the comment below to include it)
# vendor/
.idea/
Expand Down
1 change: 1 addition & 0 deletions .licenserc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ header: # `header` section is configurations for source codes license header.
- ".errcheck-exclude"
- ".golangci.yml"
- '.pre-commit-config.yaml'
- '.github'
comment: on-failure # on what condition license-eye will comment on the pull request, `on-failure`, `always`, `never`.

# license-location-threshold specifies the index threshold where the license header can be located,
Expand Down
4 changes: 3 additions & 1 deletion docker/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ services:
environment:
MYSQL_ROOT_PASSWORD: "123456"
volumes:
- ./scripts/:/docker-entrypoint-initdb.d/:rw
- ./scripts/init.sql:/docker-entrypoint-initdb.d/0.sql
- ./scripts/sequence.sql:/docker-entrypoint-initdb.d/1.sql
- ./scripts/sharding.sql:/docker-entrypoint-initdb.d/2.sql
command: ['mysqld', '--character-set-server=utf8mb4', '--collation-server=utf8mb4_unicode_ci']
healthcheck:
test: ["CMD", "mysqladmin" ,"ping", "-h", "127.0.0.1"]
Expand Down
15 changes: 15 additions & 0 deletions docker/scripts/sequence.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
CREATE DATABASE IF NOT EXISTS employees;
USE employees;

CREATE TABLE IF NOT EXISTS `sequence`
(
`id` BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,
`name` VARCHAR(64) NOT NULL,
`value` BIGINT NOT NULL,
`step` INT NOT NULL DEFAULT 10000,
`created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
`modified_at` TIMESTAMP NOT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `uk_name` (`name`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8;
36 changes: 36 additions & 0 deletions docker/scripts/sharding.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
CREATE DATABASE IF NOT EXISTS employees;
USE employees;

DELIMITER //
CREATE PROCEDURE sp_create_tab()
BEGIN
SET @str = ' (
`id` BIGINT(20) UNSIGNED NOT NULL AUTO_INCREMENT,
`uid` BIGINT(20) UNSIGNED NOT NULL,
`name` VARCHAR(255) NOT NULL,
`score` DECIMAL(6,2) DEFAULT ''0'',
`nickname` VARCHAR(255) DEFAULT NULL,
`gender` TINYINT(4) NULL,
`birth_year` SMALLINT(5) UNSIGNED DEFAULT ''0'',
`created_at` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
`modified_at` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`id`),
UNIQUE KEY `uk_uid` (`uid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
';

SET @j = 0;
WHILE @j < 32
DO
SET @table = CONCAT('student_', LPAD(@j, 4, '0'));
SET @ddl = CONCAT('CREATE TABLE IF NOT EXISTS ', @table, @str);
PREPARE ddl FROM @ddl;
EXECUTE ddl;
SET @j = @j + 1;
END WHILE;
END
//

DELIMITER ;
CALL sp_create_tab;
DROP PROCEDURE sp_create_tab;
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ require (
github.com/kr/pretty v0.3.0 // indirect
github.com/lestrrat-go/strftime v1.0.5
github.com/natefinch/lumberjack v2.0.0+incompatible
github.com/olekukonko/tablewriter v0.0.5
github.com/pkg/errors v0.9.1
github.com/rogpeppe/go-internal v1.8.0 // indirect
github.com/spf13/cobra v1.2.1
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,8 @@ github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNx
github.com/mattn/go-isatty v0.0.4/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=
github.com/mattn/go-runewidth v0.0.2/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU=
github.com/mattn/go-runewidth v0.0.9 h1:Lm995f3rfxdpd6TSmuVCHVb/QhupuXlYr8sCI/QdE+0=
github.com/mattn/go-runewidth v0.0.9/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI=
github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
Expand Down Expand Up @@ -403,6 +405,8 @@ github.com/oklog/oklog v0.3.2/go.mod h1:FCV+B7mhrz4o+ueLpx+KqkyXRGMWOYEvfiXtdGtb
github.com/oklog/run v1.0.0/go.mod h1:dlhp/R75TPv97u0XWUtDeV/lRKWPKSdTuV0TZvrmrQA=
github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U=
github.com/olekukonko/tablewriter v0.0.0-20170122224234-a0225b3f23b5/go.mod h1:vsDQFd/mU46D+Z4whnwzcISnGGzXWMclvtLoiIKAKIo=
github.com/olekukonko/tablewriter v0.0.5 h1:P2Ga83D34wi1o9J6Wh1mRuqd4mF/x/lgBS7N7AbDhec=
github.com/olekukonko/tablewriter v0.0.5/go.mod h1:hPp6KlRPjbx+hW8ykQs1w3UBbZlj6HuIJcUGPhkA7kY=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
Expand Down
58 changes: 22 additions & 36 deletions pkg/executor/redirect.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ package executor

import (
"bytes"
"fmt"
)

import (
Expand All @@ -34,6 +33,7 @@ import (
"github.com/dubbogo/arana/pkg/mysql"
"github.com/dubbogo/arana/pkg/proto"
"github.com/dubbogo/arana/pkg/resource"
"github.com/dubbogo/arana/pkg/runtime"
"github.com/dubbogo/arana/pkg/selector"
"github.com/dubbogo/arana/pkg/util/log"
"github.com/dubbogo/arana/third_party/pools"
Expand Down Expand Up @@ -170,13 +170,17 @@ func (executor *RedirectExecutor) ExecutorComQuery(ctx *proto.Context) (proto.Re
var err error

p := parser.New()
query := string(ctx.Data[1:])
query := ctx.GetQuery()
act, err := p.ParseOneStmt(query, "", "")
if err != nil {
return nil, 0, err
}
log.Debugf("ComQuery: %s", query)

ctx.Stmt = &proto.Stmt{
StmtNode: act,
}

resourcePool := resource.GetDataSourceManager().GetMasterResourcePool(executor.dataSources[0].Master.Name)
switch act.(type) {
case *ast.BeginStmt:
Expand Down Expand Up @@ -296,49 +300,31 @@ func (executor *RedirectExecutor) doPostFilter(ctx *proto.Context, result proto.
}

func (executor *RedirectExecutor) slaveComQueryExecute(ctx *proto.Context, query string) (proto.Result, uint16, error) {

dsNo := executor.dbSelector.GetDataSourceNo()
resourcePool := resource.GetDataSourceManager().GetSlaveResourcePool(executor.dataSources[0].Slaves[dsNo].Name)
r, err := resourcePool.Get(ctx)
defer func() {
resourcePool.Put(r)
}()
if err != nil {
return nil, 0, err
}

backendConn := r.(*mysql.BackendConnection)

var (
rt runtime.Runtime
result proto.Result
warn uint16
err error
)
executor.doPreFilter(ctx)
result, warn, err := backendConn.ExecuteWithWarningCount(query, true)
if rt, err = runtime.Load(ctx.Schema); err == nil {
result, warn, err = rt.Execute(ctx)
}
executor.doPostFilter(ctx, result)
return result, warn, err
}

func (executor *RedirectExecutor) slaveComStmtExecute(ctx *proto.Context) (proto.Result, uint16, error) {
var (
r pools.Resource
backendConn *mysql.BackendConnection
err error
rt runtime.Runtime
result proto.Result
warn uint16
err error
)
dsNo := executor.dbSelector.GetDataSourceNo()
fmt.Println(dsNo)
resourcePool := resource.GetDataSourceManager().GetSlaveResourcePool(executor.dataSources[0].Slaves[dsNo].Name)
r, err = resourcePool.Get(ctx)
defer func() {
resourcePool.Put(r)
}()
if err != nil {
return nil, 0, err
}

backendConn = r.(*mysql.BackendConnection)

query := ctx.Stmt.StmtNode.Text()
log.Infof(query)

executor.doPreFilter(ctx)
result, warn, err := backendConn.PrepareQuery(query, ctx.Data)
if rt, err = runtime.Load(ctx.Schema); err == nil {
result, warn, err = rt.Execute(ctx)
}
executor.doPostFilter(ctx, result)
return result, warn, err
}
6 changes: 3 additions & 3 deletions pkg/mysql/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ import (
)

import (
"github.com/dubbogo/arana/pkg/constants/mysql"
"github.com/dubbogo/arana/pkg/mysql/errors"
"github.com/stretchr/testify/assert"
)

import (
"github.com/stretchr/testify/assert"
"github.com/dubbogo/arana/pkg/constants/mysql"
"github.com/dubbogo/arana/pkg/mysql/errors"
)

func TestDSNParse(t *testing.T) {
Expand Down
3 changes: 3 additions & 0 deletions pkg/mysql/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ type Conn struct {
// If there are any ongoing reads or writes, they may get interrupted.
conn net.Conn

// Schema is the current database name.
Schema string

// ConnectionID is set:
// - at Connect() time for clients, with the value returned by
// the server.
Expand Down
6 changes: 3 additions & 3 deletions pkg/mysql/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ import (
)

import (
"github.com/dubbogo/arana/pkg/constants/mysql"
errors2 "github.com/dubbogo/arana/pkg/mysql/errors"
"github.com/stretchr/testify/assert"
)

import (
"github.com/stretchr/testify/assert"
"github.com/dubbogo/arana/pkg/constants/mysql"
errors2 "github.com/dubbogo/arana/pkg/mysql/errors"
)

var (
Expand Down
8 changes: 8 additions & 0 deletions pkg/mysql/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,14 @@ type Result struct {
Rows []proto.Row
}

func (res *Result) GetFields() []proto.Field {
return res.Fields
}

func (res *Result) GetRows() []proto.Row {
return res.Rows
}

func (res *Result) LastInsertId() (uint64, error) {
return res.InsertId, nil
}
Expand Down
Loading

0 comments on commit 03f1778

Please sign in to comment.