Skip to content

Commit

Permalink
feat: implement sharding update (arana-db#107)
Browse files Browse the repository at this point in the history
* feat: implement sharding update

* fix
  • Loading branch information
jjeffcaii authored Mar 30, 2022
1 parent 2b6582d commit 9614004
Show file tree
Hide file tree
Showing 9 changed files with 270 additions and 1 deletion.
1 change: 1 addition & 0 deletions .licenserc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ header: # `header` section is configurations for source codes license header.
- '.gitignore'
- '.gitmodules'
- 'makefile'
- 'justfile'
- 'docker'
- 'pkg/resolver/mysql/constants.go' # with two license: apache and Vitess
- 'pkg/resolver/mysql/encoding.go'
Expand Down
15 changes: 15 additions & 0 deletions justfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
set dotenv-load := true

alias r := run
alias c := cli

default:
@just --list

run:
@go run ./cmd/... start -c ./docker/conf/config.yaml

cli:
@mycli -h127.0.0.1 -P13306 -udksl employees -p123456
cli-raw:
@mycli -h127.0.0.1 -uroot employees -p123456
8 changes: 7 additions & 1 deletion pkg/executor/redirect.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,12 @@ func (executor *RedirectExecutor) ExecutorComQuery(ctx *proto.Context) (proto.Re
} else {
res, warn, err = rt.Execute(ctx)
}
case *ast.UpdateStmt:
if tx, ok := executor.getTx(ctx); ok {
res, warn, err = tx.Execute(ctx)
} else {
res, warn, err = rt.Execute(ctx)
}
default:
// TODO: mark direct flag temporarily, remove when write-mode is supported for runtime
ctx.Context = rcontext.WithDirect(ctx.Context)
Expand Down Expand Up @@ -219,7 +225,7 @@ func (executor *RedirectExecutor) ExecutorComStmtExecute(ctx *proto.Context) (pr
}

switch ctx.Stmt.StmtNode.(type) {
case *ast.SelectStmt, *ast.InsertStmt:
case *ast.SelectStmt, *ast.InsertStmt, *ast.UpdateStmt:
default:
ctx.Context = rcontext.WithDirect(ctx.Context)
}
Expand Down
12 changes: 12 additions & 0 deletions pkg/runtime/ast/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,18 @@ type UpdateStatement struct {
Limit *LimitNode
}

func (u *UpdateStatement) ResetTable(table string) *UpdateStatement {
ret := new(UpdateStatement)
*ret = *u

tableName := make(TableName, len(ret.Table))
copy(tableName, ret.Table)
tableName[len(tableName)-1] = table

ret.Table = tableName
return ret
}

func (u *UpdateStatement) Restore(flag RestoreFlag, sb *strings.Builder, args *[]int) error {
sb.WriteString("UPDATE ")
if u.IsEnableLowPriority() {
Expand Down
1 change: 1 addition & 0 deletions pkg/runtime/namespace/namespace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

import (
"github.com/golang/mock/gomock"

"github.com/stretchr/testify/assert"
)

Expand Down
61 changes: 61 additions & 0 deletions pkg/runtime/optimize/optimizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ func (o optimizer) doOptimize(ctx context.Context, stmt rast.Statement, args ...
return o.optimizeInsert(ctx, t, args)
case *rast.DeleteStatement:
case *rast.UpdateStatement:
return o.optimizeUpdate(ctx, t, args)
}

//TODO implement all statements
Expand Down Expand Up @@ -231,6 +232,66 @@ func (o optimizer) optimizeSelect(ctx context.Context, stmt *rast.SelectStatemen
return unionPlan, nil
}

func (o optimizer) optimizeUpdate(ctx context.Context, stmt *rast.UpdateStatement, args []interface{}) (proto.Plan, error) {
var (
ru = rcontext.Rule(ctx)
table = stmt.Table
vt *rule.VTable
ok bool
)

// non-sharding update
if vt, ok = ru.VTable(table.Suffix()); !ok {
ret := plan.NewUpdatePlan(stmt)
ret.BindArgs(args)
return ret, nil
}

var (
shards rule.DatabaseTables
fullScan = true
err error
)

// compute shards
if where := stmt.Where; where != nil {
sharder := (*Sharder)(ru)
if shards, fullScan, err = sharder.Shard(table, where, args...); err != nil {
return nil, errors.Wrap(err, "failed to update")
}
}

// exit if full-scan is disabled
if fullScan && !vt.AllowFullScan() {
return nil, errDenyFullScan
}

// must be empty shards (eg: update xxx set ... where 1 = 2 and uid = 1)
if shards.IsEmpty() {
return plan.AlwaysEmptyExecPlan{}, nil
}

// compute all sharding tables
if shards.IsFullScan() {
// init shards
shards = rule.DatabaseTables{}
// compute all tables
topology := vt.Topology()
topology.Each(func(dbIdx, tbIdx int) bool {
if d, t, ok := topology.Render(dbIdx, tbIdx); ok {
shards[d] = append(shards[d], t)
}
return true
})
}

ret := plan.NewUpdatePlan(stmt)
ret.BindArgs(args)
ret.SetShards(shards)

return ret, nil
}

func (o optimizer) optimizeInsert(ctx context.Context, stmt *rast.InsertStatement, args []interface{}) (proto.Plan, error) {
var (
ru = rcontext.Rule(ctx)
Expand Down
44 changes: 44 additions & 0 deletions pkg/runtime/plan/always.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//

package plan

import (
"context"
)

import (
"github.com/arana-db/arana/pkg/mysql"
"github.com/arana-db/arana/pkg/proto"
)

var _ proto.Plan = (*AlwaysEmptyExecPlan)(nil)

var _emptyResult mysql.Result

// AlwaysEmptyExecPlan represents an exec plan which affects nothing.
type AlwaysEmptyExecPlan struct {
}

func (a AlwaysEmptyExecPlan) Type() proto.PlanType {
return proto.PlanTypeExec
}

func (a AlwaysEmptyExecPlan) ExecIn(ctx context.Context, conn proto.VConn) (proto.Result, error) {
return &_emptyResult, nil
}
File renamed without changes.
129 changes: 129 additions & 0 deletions pkg/runtime/plan/update.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//

package plan

import (
"context"
"strings"
)

import (
"github.com/pkg/errors"

uatomic "go.uber.org/atomic"

"golang.org/x/sync/errgroup"
)

import (
"github.com/arana-db/arana/pkg/mysql"
"github.com/arana-db/arana/pkg/proto"
"github.com/arana-db/arana/pkg/proto/rule"
"github.com/arana-db/arana/pkg/runtime/ast"
"github.com/arana-db/arana/pkg/util/log"
)

var _ proto.Plan = (*UpdatePlan)(nil)

// UpdatePlan represents a plan to execute sharding-update.
type UpdatePlan struct {
basePlan
stmt *ast.UpdateStatement
shards rule.DatabaseTables
}

// NewUpdatePlan creates a sharding-update plan.
func NewUpdatePlan(stmt *ast.UpdateStatement) *UpdatePlan {
return &UpdatePlan{stmt: stmt}
}

func (up *UpdatePlan) Type() proto.PlanType {
return proto.PlanTypeExec
}

func (up *UpdatePlan) ExecIn(ctx context.Context, conn proto.VConn) (proto.Result, error) {
if up.shards == nil {
var sb strings.Builder
if err := up.stmt.Restore(ast.RestoreDefault, &sb, nil); err != nil {
return nil, err
}
return conn.Exec(ctx, "", sb.String(), up.args...)
}

var (
affects = uatomic.NewUint64(0)
cnt = uatomic.NewUint32(0)
)

var g errgroup.Group

// TODO: should wrap with tx in the future
for k, v := range up.shards {
// do copy for goroutine-safe
var (
db = k
tables = v
)
// execute concurrent for each phy database
g.Go(func() error {
var (
sb strings.Builder
args []int
res proto.Result
err error
)

sb.Grow(256)

for _, table := range tables {
if err = up.stmt.ResetTable(table).Restore(ast.RestoreDefault, &sb, &args); err != nil {
return errors.WithStack(err)
}

if res, err = conn.Exec(ctx, db, sb.String(), up.toArgs(args)...); err != nil {
return errors.WithStack(err)
}

n, _ := res.RowsAffected()
affects.Add(n)
cnt.Inc()

// cleanup
if len(args) > 0 {
args = args[:0]
}
sb.Reset()
}

return nil
})
}

if err := g.Wait(); err != nil {
return nil, err
}

log.Debugf("sharding update success: batch=%d, affects=%d", cnt.Load(), affects.Load())

return &mysql.Result{AffectedRows: affects.Load()}, nil
}

func (up *UpdatePlan) SetShards(shards rule.DatabaseTables) {
up.shards = shards
}

0 comments on commit 9614004

Please sign in to comment.