Skip to content

Commit

Permalink
*: Support set dist sql concurrency by set system variable (pingcap#1795
Browse files Browse the repository at this point in the history
)
  • Loading branch information
shenli authored Oct 17, 2016
1 parent 6842efd commit efc96a7
Show file tree
Hide file tree
Showing 10 changed files with 451 additions and 170 deletions.
132 changes: 114 additions & 18 deletions bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package tidb
import (
"fmt"
"runtime/debug"
"strconv"
"strings"
"time"

Expand All @@ -28,6 +29,7 @@ import (
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/mysql"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/util/types"
)

const (
Expand Down Expand Up @@ -122,15 +124,28 @@ func bootstrap(s Session) {
log.Fatal(err)
}
if b {
err = upgrade(s)
if err != nil {
log.Fatal(err)
}
return
}
doDDLWorks(s)
doDMLWorks(s)
}

const (
bootstrappedVar = "bootstrapped"
// The variable name in mysql.TiDB table.
// It is used for checking if the store is boostrapped by any TiDB server.
bootstrappedVar = "bootstrapped"
// The variable value in mysql.TiDB table for bootstrappedVar.
// If the value true, the store is already boostrapped by a TiDB server.
bootstrappedVarTrue = "True"
// The variable name in mysql.TiDB table.
// It is used for getting the version of the TiDB server which bootstrapped the store.
tidbServerVersionVar = "tidb_server_version" //
// Const for TiDB server version 2.
version2 = 2
)

func checkBootstrapped(s Session) (bool, error) {
Expand All @@ -140,43 +155,119 @@ func checkBootstrapped(s Session) (bool, error) {
log.Fatal(err)
}
// Check bootstrapped variable value in TiDB table.
v, err := checkBootstrappedVar(s)
d, err := getTiDBVar(s, bootstrappedVar)
if err != nil {
if infoschema.ErrTableNotExists.Equal(err) {
return false, nil
}
return false, errors.Trace(err)
}
return v, nil
isBootstrapped := d.GetString() == bootstrappedVarTrue
if isBootstrapped {
// Make sure that doesn't affect the following operations.
if err = s.CommitTxn(); err != nil {
return false, errors.Trace(err)
}
}
return isBootstrapped, nil
}

func checkBootstrappedVar(s Session) (bool, error) {
// Get variable value from mysql.tidb table.
// Those variables are used by TiDB server.
func getTiDBVar(s Session, name string) (types.Datum, error) {
sql := fmt.Sprintf(`SELECT VARIABLE_VALUE FROM %s.%s WHERE VARIABLE_NAME="%s"`,
mysql.SystemDB, mysql.TiDBTable, bootstrappedVar)
mysql.SystemDB, mysql.TiDBTable, name)
rs, err := s.Execute(sql)
if err != nil {
if infoschema.ErrTableNotExists.Equal(err) {
return false, nil
}
return false, errors.Trace(err)
return types.Datum{}, errors.Trace(err)
}

if len(rs) != 1 {
return false, errors.New("Wrong number of Recordset")
return types.Datum{}, errors.New("Wrong number of Recordset")
}
r := rs[0]
row, err := r.Next()
if err != nil || row == nil {
return false, errors.Trace(err)
return types.Datum{}, errors.Trace(err)
}
return row.Data[0], nil
}

isBootstrapped := row.Data[0].GetString() == bootstrappedVarTrue
if isBootstrapped {
// Make sure that doesn't affect the following operations.
// When the system is boostrapped by low version TiDB server, we should do some upgrade works.
// For example, add new system variables into mysql.global_variables table.
func upgrade(s Session) error {
ver, err := getBootstrapVersion(s)
if err != nil {
return errors.Trace(err)
}
if ver >= currentBootstrapVersion {
// It is already bootstrapped/upgraded by a higher version TiDB server.
if err1 := s.CommitTxn(); err1 != nil {
// Make sure that doesn't affect the following operations.
return errors.Trace(err1)
}
return nil
}
// Do upgrade works then update bootstrap version.
if ver < version2 {
upgradeToVer2(s)
}
updateBootstrapVer(s)
_, err = s.Execute("COMMIT")

if err = s.CommitTxn(); err != nil {
return false, errors.Trace(err)
if err != nil {
time.Sleep(1 * time.Second)
// Check if TiDB is already upgraded.
v, err1 := getBootstrapVersion(s)
if err1 != nil {
log.Fatal(err1)
}
if v >= currentBootstrapVersion {
// It is already bootstrapped/upgraded by a higher version TiDB server.
if err1 := s.CommitTxn(); err1 != nil {
// Make sure that doesn't affect the following operations.
return errors.Trace(err1)
}
return nil
}
log.Errorf("[Upgrade] upgrade from %d to %d error", ver, currentBootstrapVersion)
log.Fatal(err)
}
return nil
}

return isBootstrapped, nil
// Update to version 2.
func upgradeToVer2(s Session) {
// Version 2 add two system variable for DistSQL concurrency controlling.
// Insert distsql related system variable.
distSQLVars := []string{variable.DistSQLScanConcurrencyVar, variable.DistSQLJoinConcurrencyVar}
values := make([]string, 0, len(distSQLVars))
for _, v := range distSQLVars {
value := fmt.Sprintf(`("%s", "%s")`, v, variable.SysVars[v].Value)
values = append(values, value)
}
sql := fmt.Sprintf("INSERT IGNORE INTO %s.%s VALUES %s;", mysql.SystemDB, mysql.GlobalVariablesTable,
strings.Join(values, ", "))
mustExecute(s, sql)
}

// Update boostrap version variable in mysql.TiDB table.
func updateBootstrapVer(s Session) {
// Update bootstrap version.
sql := fmt.Sprintf(`INSERT INTO %s.%s VALUES ("%s", "%d", "TiDB bootstrap version.") ON DUPLICATE KEY UPDATE VARIABLE_VALUE="%d"`,
mysql.SystemDB, mysql.TiDBTable, tidbServerVersionVar, currentBootstrapVersion, currentBootstrapVersion)
mustExecute(s, sql)
}

// Gets bootstrap version from mysql.tidb table;
func getBootstrapVersion(s Session) (int64, error) {
d, err := getTiDBVar(s, tidbServerVersionVar)
if err != nil {
return 0, errors.Trace(err)
}
if d.IsNull() {
return 0, nil
}
return strconv.ParseInt(d.GetString(), 10, 64)
}

// Execute DDL statements in bootstrap stage.
Expand Down Expand Up @@ -222,6 +313,11 @@ func doDMLWorks(s Session) {
ON DUPLICATE KEY UPDATE VARIABLE_VALUE="%s"`,
mysql.SystemDB, mysql.TiDBTable, bootstrappedVar, bootstrappedVarTrue, bootstrappedVarTrue)
mustExecute(s, sql)

sql = fmt.Sprintf(`INSERT INTO %s.%s VALUES("%s", "%d", "Bootstrap version. Do not delete")`,
mysql.SystemDB, mysql.TiDBTable, tidbServerVersionVar, currentBootstrapVersion)
mustExecute(s, sql)

_, err := s.Execute("COMMIT")
if err != nil {
time.Sleep(1 * time.Second)
Expand Down
Loading

0 comments on commit efc96a7

Please sign in to comment.