Skip to content

Commit

Permalink
ddl: scatter the regions of table when creating them (pingcap#10980)
Browse files Browse the repository at this point in the history
  • Loading branch information
jackysp authored Jul 10, 2019
1 parent ac0b9d3 commit 89baed8
Show file tree
Hide file tree
Showing 11 changed files with 185 additions and 149 deletions.
31 changes: 18 additions & 13 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
Expand Down Expand Up @@ -1323,25 +1324,29 @@ func (d *ddl) CreateTable(ctx sessionctx.Context, s *ast.CreateTableStmt) (err e

err = d.doDDLJob(ctx, job)
if err == nil {
var preSplitAndScatter func()
// do pre-split and scatter.
if tbInfo.ShardRowIDBits > 0 && tbInfo.PreSplitRegions > 0 {
preSplitAndScatter = func() {
preSplitTableShardRowIDBitsRegion(d.store, tbInfo, ctx.GetSessionVars().WaitSplitRegionFinish)
sp, ok := d.store.(kv.SplitableStore)
if ok && atomic.LoadUint32(&EnableSplitTableRegion) != 0 {
var (
preSplit func()
scatterRegion bool
)
val, err := variable.GetGlobalSystemVar(ctx.GetSessionVars(), variable.TiDBScatterRegion)
if err != nil {
logutil.BgLogger().Warn("[ddl] won't scatter region", zap.Error(err))
} else {
scatterRegion = variable.TiDBOptOn(val)
}
} else if atomic.LoadUint32(&EnableSplitTableRegion) != 0 {
pi := tbInfo.GetPartitionInfo()
if pi != nil {
preSplitAndScatter = func() { splitPartitionTableRegion(d.store, pi) }
preSplit = func() { splitPartitionTableRegion(sp, pi, scatterRegion) }
} else {
preSplitAndScatter = func() { splitTableRegion(d.store, tbInfo.ID) }
preSplit = func() { splitTableRegion(sp, tbInfo, scatterRegion) }
}
}
if preSplitAndScatter != nil {
if ctx.GetSessionVars().WaitSplitRegionFinish {
preSplitAndScatter()
if scatterRegion {
preSplit()
} else {
go preSplitAndScatter()
go preSplit()
}
}

Expand Down Expand Up @@ -3008,7 +3013,7 @@ func (d *ddl) TruncateTable(ctx sessionctx.Context, ti ast.Ident) error {
// The session will release all table locks it holds, if we don't add the new locking table id here,
// the session may forget to release the new locked table id when this ddl job was executed successfully
// but the session was killed before return.
ctx.AddTableLock(([]model.TableLockTpInfo{{SchemaID: schema.ID, TableID: newTableID, Tp: tb.Meta().Lock.Tp}}))
ctx.AddTableLock([]model.TableLockTpInfo{{SchemaID: schema.ID, TableID: newTableID, Tp: tb.Meta().Lock.Tp}})
}
err = d.doDDLJob(ctx, job)
err = d.callHookOnChanged(err)
Expand Down
119 changes: 119 additions & 0 deletions ddl/split_region.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package ddl

import (
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/util/logutil"
"go.uber.org/zap"
)

func splitPartitionTableRegion(store kv.SplitableStore, pi *model.PartitionInfo, scatter bool) {
// Max partition count is 4096, should we sample and just choose some of the partition to split?
regionIDs := make([]uint64, 0, len(pi.Definitions))
for _, def := range pi.Definitions {
regionIDs = append(regionIDs, splitRecordRegion(store, def.ID, scatter))
}
if scatter {
waitScatterRegionFinish(store, regionIDs)
}
}

func splitTableRegion(store kv.SplitableStore, tbInfo *model.TableInfo, scatter bool) {
regionIDs := make([]uint64, 0, len(tbInfo.Indices)+1)
if tbInfo.ShardRowIDBits > 0 && tbInfo.PreSplitRegions > 0 {
// Example:
// ShardRowIDBits = 5
// PreSplitRegions = 3
//
// then will pre-split 2^(3-1) = 4 regions.
//
// in this code:
// max = 1 << (tblInfo.ShardRowIDBits - 1) = 1 << (5-1) = 16
// step := int64(1 << (tblInfo.ShardRowIDBits - tblInfo.PreSplitRegions)) = 1 << (5-3) = 4;
//
// then split regionID is below:
// 4 << 59 = 2305843009213693952
// 8 << 59 = 4611686018427387904
// 12 << 59 = 6917529027641081856
//
// The 4 pre-split regions range is below:
// 0 ~ 2305843009213693952
// 2305843009213693952 ~ 4611686018427387904
// 4611686018427387904 ~ 6917529027641081856
// 6917529027641081856 ~ 9223372036854775807 ( (1 << 63) - 1 )
//
// And the max _tidb_rowid is 9223372036854775807, it won't be negative number.

// Split table region.
step := int64(1 << (tbInfo.ShardRowIDBits - tbInfo.PreSplitRegions))
// The highest bit is the symbol bit,and alloc _tidb_rowid will always be positive number.
// So we only need to split the region for the positive number.
max := int64(1 << (tbInfo.ShardRowIDBits - 1))
for p := int64(step); p < max; p += step {
recordID := p << (64 - tbInfo.ShardRowIDBits)
recordPrefix := tablecodec.GenTableRecordPrefix(tbInfo.ID)
key := tablecodec.EncodeRecordKey(recordPrefix, recordID)
regionID, err := store.SplitRegion(key, scatter)
if err != nil {
logutil.BgLogger().Warn("[ddl] pre split table region failed", zap.Int64("recordID", recordID), zap.Error(err))
} else {
regionIDs = append(regionIDs, regionID)
}
}
} else {
regionIDs = append(regionIDs, splitRecordRegion(store, tbInfo.ID, scatter))
}
regionIDs = append(regionIDs, splitIndexRegion(store, tbInfo, scatter)...)
if scatter {
waitScatterRegionFinish(store, regionIDs)
}
}

func splitRecordRegion(store kv.SplitableStore, tableID int64, scatter bool) uint64 {
tableStartKey := tablecodec.GenTablePrefix(tableID)
regionID, err := store.SplitRegion(tableStartKey, scatter)
if err != nil {
// It will be automatically split by TiKV later.
logutil.BgLogger().Warn("[ddl] split table region failed", zap.Error(err))
}
return regionID
}

func splitIndexRegion(store kv.SplitableStore, tblInfo *model.TableInfo, scatter bool) []uint64 {
regionIDs := make([]uint64, 0, len(tblInfo.Indices))
for _, idx := range tblInfo.Indices {
indexPrefix := tablecodec.EncodeTableIndexPrefix(tblInfo.ID, idx.ID)
regionID, err := store.SplitRegion(indexPrefix, scatter)
if err != nil {
logutil.BgLogger().Warn("[ddl] pre split table index region failed",
zap.Stringer("table", tblInfo.Name),
zap.Stringer("index", idx.Name),
zap.Error(err))
}
regionIDs = append(regionIDs, regionID)
}
return regionIDs
}

func waitScatterRegionFinish(store kv.SplitableStore, regionIDs []uint64) {
for _, regionID := range regionIDs {
err := store.WaitScatterRegionFinish(regionID)
if err != nil {
logutil.BgLogger().Warn("[ddl] wait scatter region failed", zap.Uint64("regionID", regionID), zap.Error(err))
}
}
}
95 changes: 0 additions & 95 deletions ddl/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ import (
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/util/gcutil"
"github.com/pingcap/tidb/util/logutil"
"go.uber.org/zap"
)

func onCreateTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error) {
Expand Down Expand Up @@ -340,99 +338,6 @@ func checkSafePoint(w *worker, snapshotTS uint64) error {
return gcutil.ValidateSnapshot(ctx, snapshotTS)
}

type splitableStore interface {
SplitRegion(splitKey kv.Key) error
SplitRegionAndScatter(splitKey kv.Key) (uint64, error)
WaitScatterRegionFinish(regionID uint64) error
}

func splitPartitionTableRegion(store kv.Storage, pi *model.PartitionInfo) {
// Max partition count is 4096, should we sample and just choose some of the partition to split?
for _, def := range pi.Definitions {
splitTableRegion(store, def.ID)
}
}

func splitTableRegion(store kv.Storage, tableID int64) {
s, ok := store.(splitableStore)
if !ok {
return
}
tableStartKey := tablecodec.GenTablePrefix(tableID)
if err := s.SplitRegion(tableStartKey); err != nil {
// It will be automatically split by TiKV later.
logutil.BgLogger().Warn("[ddl] split table region failed", zap.Error(err))
}
}

func preSplitTableShardRowIDBitsRegion(store kv.Storage, tblInfo *model.TableInfo, waitTableSplitFinish bool) {
s, ok := store.(splitableStore)
if !ok {
return
}
regionIDs := make([]uint64, 0, 1<<(tblInfo.PreSplitRegions-1)+len(tblInfo.Indices))

// Example:
// ShardRowIDBits = 5
// PreSplitRegions = 3
//
// then will pre-split 2^(3-1) = 4 regions.
//
// in this code:
// max = 1 << (tblInfo.ShardRowIDBits - 1) = 1 << (5-1) = 16
// step := int64(1 << (tblInfo.ShardRowIDBits - tblInfo.PreSplitRegions)) = 1 << (5-3) = 4;
//
// then split regionID is below:
// 4 << 59 = 2305843009213693952
// 8 << 59 = 4611686018427387904
// 12 << 59 = 6917529027641081856
//
// The 4 pre-split regions range is below:
// 0 ~ 2305843009213693952
// 2305843009213693952 ~ 4611686018427387904
// 4611686018427387904 ~ 6917529027641081856
// 6917529027641081856 ~ 9223372036854775807 ( (1 << 63) - 1 )
//
// And the max _tidb_rowid is 9223372036854775807, it won't be negative number.

// Split table region.
step := int64(1 << (tblInfo.ShardRowIDBits - tblInfo.PreSplitRegions))
// The highest bit is the symbol bit,and alloc _tidb_rowid will always be positive number.
// So we only need to split the region for the positive number.
max := int64(1 << (tblInfo.ShardRowIDBits - 1))
for p := int64(step); p < max; p += step {
recordID := p << (64 - tblInfo.ShardRowIDBits)
recordPrefix := tablecodec.GenTableRecordPrefix(tblInfo.ID)
key := tablecodec.EncodeRecordKey(recordPrefix, recordID)
regionID, err := s.SplitRegionAndScatter(key)
if err != nil {
logutil.BgLogger().Warn("[ddl] pre split table region failed", zap.Int64("recordID", recordID), zap.Error(err))
} else {
regionIDs = append(regionIDs, regionID)
}
}

// Split index region.
for _, idx := range tblInfo.Indices {
indexPrefix := tablecodec.EncodeTableIndexPrefix(tblInfo.ID, idx.ID)
regionID, err := s.SplitRegionAndScatter(indexPrefix)
if err != nil {
logutil.BgLogger().Warn("[ddl] pre split table index region failed", zap.String("index", idx.Name.L), zap.Error(err))
} else {
regionIDs = append(regionIDs, regionID)
}
}
if !waitTableSplitFinish {
return
}
for _, regionID := range regionIDs {
err := s.WaitScatterRegionFinish(regionID)
if err != nil {
logutil.BgLogger().Warn("[ddl] wait scatter region failed", zap.Uint64("regionID", regionID), zap.Error(err))
}
}
}

func getTable(store kv.Storage, schemaID int64, tblInfo *model.TableInfo) (table.Table, error) {
alloc := autoid.NewAllocator(store, tblInfo.GetDBID(schemaID), tblInfo.IsAutoIncColUnsigned())
tbl, err := table.TableFromMeta(alloc, tblInfo)
Expand Down
11 changes: 11 additions & 0 deletions executor/set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,17 @@ func (s *testSuite2) TestSetVar(c *C) {
tk.MustExec("set tidb_wait_split_region_finish = 0")
tk.MustQuery(`select @@session.tidb_wait_split_region_finish;`).Check(testkit.Rows("0"))

// test for tidb_scatter_region
tk.MustQuery(`select @@global.tidb_scatter_region;`).Check(testkit.Rows("0"))
tk.MustExec("set global tidb_scatter_region = 1")
tk.MustQuery(`select @@global.tidb_scatter_region;`).Check(testkit.Rows("1"))
tk.MustExec("set global tidb_scatter_region = 0")
tk.MustQuery(`select @@global.tidb_scatter_region;`).Check(testkit.Rows("0"))
_, err = tk.Exec("set session tidb_scatter_region = 0")
c.Assert(err, NotNil)
_, err = tk.Exec(`select @@session.tidb_scatter_region;`)
c.Assert(err, NotNil)

// test for tidb_wait_split_region_timeout
tk.MustQuery(`select @@session.tidb_wait_split_region_timeout;`).Check(testkit.Rows(strconv.Itoa(variable.DefWaitSplitRegionTimeout)))
tk.MustExec("set tidb_wait_split_region_timeout = 1")
Expand Down
13 changes: 4 additions & 9 deletions executor/split.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,10 @@ type SplitIndexRegionExec struct {
valueLists [][]types.Datum
}

type splitableStore interface {
SplitRegionAndScatter(splitKey kv.Key) (uint64, error)
WaitScatterRegionFinish(regionID uint64) error
}

// Next implements the Executor Next interface.
func (e *SplitIndexRegionExec) Next(ctx context.Context, _ *chunk.Chunk) error {
store := e.ctx.GetStore()
s, ok := store.(splitableStore)
s, ok := store.(kv.SplitableStore)
if !ok {
return nil
}
Expand All @@ -67,7 +62,7 @@ func (e *SplitIndexRegionExec) Next(ctx context.Context, _ *chunk.Chunk) error {
defer cancel()
regionIDs := make([]uint64, 0, len(splitIdxKeys))
for _, idxKey := range splitIdxKeys {
regionID, err := s.SplitRegionAndScatter(idxKey)
regionID, err := s.SplitRegion(idxKey, true)
if err != nil {
logutil.BgLogger().Warn("split table index region failed",
zap.String("table", e.tableInfo.Name.L),
Expand Down Expand Up @@ -231,7 +226,7 @@ type SplitTableRegionExec struct {
// Next implements the Executor Next interface.
func (e *SplitTableRegionExec) Next(ctx context.Context, _ *chunk.Chunk) error {
store := e.ctx.GetStore()
s, ok := store.(splitableStore)
s, ok := store.(kv.SplitableStore)
if !ok {
return nil
}
Expand All @@ -245,7 +240,7 @@ func (e *SplitTableRegionExec) Next(ctx context.Context, _ *chunk.Chunk) error {
}
regionIDs := make([]uint64, 0, len(splitKeys))
for _, key := range splitKeys {
regionID, err := s.SplitRegionAndScatter(key)
regionID, err := s.SplitRegion(key, true)
if err != nil {
logutil.BgLogger().Warn("split table region failed",
zap.String("table", e.tableInfo.Name.L),
Expand Down
6 changes: 6 additions & 0 deletions kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,3 +303,9 @@ type Iterator interface {
Next() error
Close()
}

// SplitableStore is the kv store which supports split regions.
type SplitableStore interface {
SplitRegion(splitKey Key, scatter bool) (regionID uint64, err error)
WaitScatterRegionFinish(regionID uint64) error
}
1 change: 1 addition & 0 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -698,6 +698,7 @@ var defaultSysVars = []*SysVar{
{ScopeGlobal | ScopeSession, TiDBOptJoinReorderThreshold, strconv.Itoa(DefTiDBOptJoinReorderThreshold)},
{ScopeSession, TiDBCheckMb4ValueInUTF8, BoolToIntStr(config.GetGlobalConfig().CheckMb4ValueInUTF8)},
{ScopeSession, TiDBSlowQueryFile, ""},
{ScopeGlobal, TiDBScatterRegion, BoolToIntStr(DefTiDBScatterRegion)},
{ScopeSession, TiDBWaitSplitRegionFinish, BoolToIntStr(DefTiDBWaitSplitRegionFinish)},
{ScopeSession, TiDBWaitSplitRegionTimeout, strconv.Itoa(DefWaitSplitRegionTimeout)},
{ScopeSession, TiDBLowResolutionTSO, "0"},
Expand Down
4 changes: 4 additions & 0 deletions sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,9 @@ const (
// It can be: PRIORITY_LOW, PRIORITY_NORMAL, PRIORITY_HIGH
TiDBDDLReorgPriority = "tidb_ddl_reorg_priority"

// tidb_scatter_region will scatter the regions for DDLs when it is ON.
TiDBScatterRegion = "tidb_scatter_region"

// TiDBWaitSplitRegionFinish defines the split region behaviour is sync or async.
TiDBWaitSplitRegionFinish = "tidb_wait_split_region_finish"

Expand Down Expand Up @@ -343,6 +346,7 @@ const (
DefTiDBUseFastAnalyze = false
DefTiDBSkipIsolationLevelCheck = false
DefTiDBExpensiveQueryTimeThreshold = 60 // 60s
DefTiDBScatterRegion = false
DefTiDBWaitSplitRegionFinish = true
DefWaitSplitRegionTimeout = 300 // 300s
DefTiDBEnableNoopFuncs = false
Expand Down
3 changes: 2 additions & 1 deletion sessionctx/variable/varsutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,8 @@ func ValidateSetSystemVar(vars *SessionVars, name string, value string) (string,
TiDBOptInSubqToJoinAndAgg, TiDBEnableFastAnalyze,
TiDBBatchInsert, TiDBDisableTxnAutoRetry, TiDBEnableStreaming,
TiDBBatchDelete, TiDBBatchCommit, TiDBEnableCascadesPlanner, TiDBEnableWindowFunction,
TiDBCheckMb4ValueInUTF8, TiDBLowResolutionTSO, TiDBEnableIndexMerge, TiDBEnableNoopFuncs:
TiDBCheckMb4ValueInUTF8, TiDBLowResolutionTSO, TiDBEnableIndexMerge, TiDBEnableNoopFuncs,
TiDBScatterRegion:
if strings.EqualFold(value, "ON") || value == "1" || strings.EqualFold(value, "OFF") || value == "0" {
return value, nil
}
Expand Down
Loading

0 comments on commit 89baed8

Please sign in to comment.