Skip to content

Commit

Permalink
session: Check the partition placement constraint of local transactio…
Browse files Browse the repository at this point in the history
…ns (pingcap#21039)

Signed-off-by: Song Gao <[email protected]>
  • Loading branch information
Yisaer authored Nov 17, 2020
1 parent 7e0821f commit 863117b
Show file tree
Hide file tree
Showing 8 changed files with 201 additions and 2 deletions.
3 changes: 3 additions & 0 deletions ddl/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,9 @@ var (
// ErrInvalidPlacementSpec is returned when add/alter an invalid placement rule
ErrInvalidPlacementSpec = dbterror.ClassDDL.NewStd(mysql.ErrInvalidPlacementSpec)

// ErrInvalidPlacementPolicyCheck is returned when txn_scope and commit data changing do not meet the placement policy
ErrInvalidPlacementPolicyCheck = dbterror.ClassDDL.NewStd(mysql.ErrPlacementPolicyCheck)

// ErrMultipleDefConstInListPart returns multiple definition of same constant in list partitioning.
ErrMultipleDefConstInListPart = dbterror.ClassDDL.NewStd(mysql.ErrMultipleDefConstInListPart)

Expand Down
4 changes: 4 additions & 0 deletions ddl/placement/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,7 @@ const (
// RuleIndexIndex is the index for a rule of index.
RuleIndexIndex
)

// DCLabelKey indicates the key of label which represents the dc for Store.
// FIXME: currently we assumes "zone" is the dcLabel key in Store
const DCLabelKey = "zone"
142 changes: 142 additions & 0 deletions ddl/placement_sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/ddl/placement"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/util/testkit"
)

Expand Down Expand Up @@ -326,3 +327,144 @@ func (s *testDBSuite1) TestPlacementPolicyCache(c *C) {
tk.MustExec("truncate table t1")
tk.MustQuery("select * from information_schema.placement_policy").Check(testkit.Rows())
}

func (s *testSerialDBSuite) TestTxnScopeConstraint(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t1")
defer tk.MustExec("drop table if exists t1")

tk.MustExec(`create table t1 (c int)
PARTITION BY RANGE (c) (
PARTITION p0 VALUES LESS THAN (6),
PARTITION p1 VALUES LESS THAN (11),
PARTITION p2 VALUES LESS THAN (16),
PARTITION p3 VALUES LESS THAN (21)
);`)

bundles := make(map[string]*placement.Bundle)
is := s.dom.InfoSchema()
is.MockBundles(bundles)

tb, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t1"))
c.Assert(err, IsNil)
partDefs := tb.Meta().GetPartitionInfo().Definitions

for _, def := range partDefs {
if def.Name.String() == "p0" {
groupID := placement.GroupID(def.ID)
bundles[groupID] = &placement.Bundle{
ID: groupID,
Rules: []*placement.Rule{
{
GroupID: groupID,
Role: placement.Leader,
Count: 1,
LabelConstraints: []placement.LabelConstraint{
{
Key: placement.DCLabelKey,
Op: placement.In,
Values: []string{"sh"},
},
},
},
},
}
} else if def.Name.String() == "p2" {
groupID := placement.GroupID(def.ID)
bundles[groupID] = &placement.Bundle{
ID: groupID,
Rules: []*placement.Rule{
{
GroupID: groupID,
Role: placement.Follower,
Count: 3,
LabelConstraints: []placement.LabelConstraint{
{
Key: placement.DCLabelKey,
Op: placement.In,
Values: []string{"sh"},
},
},
},
},
}

}
}

testCases := []struct {
name string
sql string
txnScope string
disableAutoCommit bool
err error
}{
{
name: "Insert into PARTITION p0 with global txnScope",
sql: "insert into t1 (c) values (1)",
txnScope: "global",
err: nil,
},
{
name: "insert into PARTITION p0 with wrong txnScope",
sql: "insert into t1 (c) values (1)",
txnScope: "bj",
err: fmt.Errorf(".*out of txn_scope.*"),
},
{
name: "insert into PARTITION p1 with local txnScope",
sql: "insert into t1 (c) values (10)",
txnScope: "bj",
err: fmt.Errorf(".*don't have placement policies with txn_scope.*"),
},
{
name: "insert into PARTITION p1 with global txnScope",
sql: "insert into t1 (c) values (10)",
txnScope: "global",
err: nil,
},
{
name: "insert into PARTITION p2 with local txnScope",
sql: "insert into t1 (c) values (15)",
txnScope: "bj",
err: fmt.Errorf(".*leader placement policy is not defined.*"),
},
{
name: "insert into PARTITION p2 with global txnScope",
sql: "insert into t1 (c) values (15)",
txnScope: "global",
err: nil,
},
{
name: "insert into PARTITION p0 with wrong txnScope and autocommit off",
sql: "insert into t1 (c) values (1)",
txnScope: "bj",
disableAutoCommit: true,
err: fmt.Errorf(".*out of txn_scope.*"),
},
}

for _, testcase := range testCases {
c.Log(testcase.name)
se, err := session.CreateSession4Test(s.store)
c.Check(err, IsNil)
tk.Se = se
tk.MustExec("use test")
tk.MustExec(fmt.Sprintf("set @@txn_scope = %v", testcase.txnScope))
if testcase.disableAutoCommit {
tk.MustExec("set @@autocommit = 0")
tk.MustExec(testcase.sql)
_, err = tk.Exec("commit")
} else {
_, err = tk.Exec(testcase.sql)
}
if testcase.err == nil {
c.Assert(err, IsNil)
} else {
c.Assert(err, NotNil)
c.Assert(err.Error(), Matches, testcase.err.Error())
fmt.Println(err.Error())
}
}
}
1 change: 1 addition & 0 deletions errno/errcode.go
Original file line number Diff line number Diff line change
Expand Up @@ -1023,6 +1023,7 @@ const (
ErrTableOptionInsertMethodUnsupported = 8233
ErrInvalidPlacementSpec = 8234
ErrDDLReorgElementNotExist = 8235
ErrPlacementPolicyCheck = 8236

// TiKV/PD errors.
ErrPDServerTimeout = 9001
Expand Down
1 change: 1 addition & 0 deletions errno/errname.go
Original file line number Diff line number Diff line change
Expand Up @@ -1028,6 +1028,7 @@ var MySQLErrName = map[uint16]*mysql.ErrMessage{
ErrBRIEExportFailed: mysql.Message("Export failed: %s", nil),

ErrInvalidPlacementSpec: mysql.Message("Invalid placement policy '%s': %s", nil),
ErrPlacementPolicyCheck: mysql.Message("Placement policy didn't meet the constraint, reason: %s", nil),

// TiKV/PD errors.
ErrPDServerTimeout: mysql.Message("PD server timeout", nil),
Expand Down
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,11 @@ error = '''
Invalid placement policy '%s': %s
'''

["ddl:8236"]
error = '''
Placement policy didn't meet the constraint, reason: %s
'''

["domain:8027"]
error = '''
Information schema is out of date: schema failed to update in 1 lease, please make sure TiDB can connect to TiKV
Expand Down
44 changes: 42 additions & 2 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ import (
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/bindinfo"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/ddl/placement"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/executor"
"github.com/pingcap/tidb/infoschema"
Expand Down Expand Up @@ -473,14 +475,19 @@ func (s *session) doCommitWithRetry(ctx context.Context) error {
// If the transaction is invalid, maybe it has already been rolled back by the client.
return nil
}
var err error
err = s.checkPlacementPolicyBeforeCommit()
if err != nil {
return err
}
txnSize := s.txn.Size()
isPessimistic := s.txn.IsPessimistic()
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("session.doCommitWitRetry", opentracing.ChildOf(span.Context()))
defer span1.Finish()
ctx = opentracing.ContextWithSpan(ctx, span1)
}
err := s.doCommit(ctx)
err = s.doCommit(ctx)
if err != nil {
commitRetryLimit := s.sessionVars.RetryLimit
if !s.sessionVars.TxnCtx.CouldRetry {
Expand Down Expand Up @@ -550,7 +557,6 @@ func (s *session) CommitTxn(ctx context.Context) error {
failpoint.Return(err)
}
})

s.sessionVars.TxnCtx.Cleanup()
return err
}
Expand Down Expand Up @@ -1624,6 +1630,7 @@ func (s *session) NewTxn(ctx context.Context) error {
CreateTime: time.Now(),
StartTS: txn.StartTS(),
ShardStep: int(s.sessionVars.ShardAllocateStep),
TxnScope: s.GetSessionVars().TxnScope,
}
return nil
}
Expand Down Expand Up @@ -2298,6 +2305,7 @@ func (s *session) PrepareTxnCtx(ctx context.Context) {
SchemaVersion: is.SchemaMetaVersion(),
CreateTime: time.Now(),
ShardStep: int(s.sessionVars.ShardAllocateStep),
TxnScope: s.GetSessionVars().TxnScope,
}
if !s.sessionVars.IsAutocommit() || s.sessionVars.RetryInfo.Retrying {
if s.sessionVars.TxnMode == ast.Pessimistic {
Expand Down Expand Up @@ -2433,3 +2441,35 @@ func (s *session) recordOnTransactionExecution(err error, counter int, duration
}
}
}

func (s *session) checkPlacementPolicyBeforeCommit() error {
var err error
txnScope := s.GetSessionVars().TxnCtx.TxnScope
if txnScope == "" {
txnScope = config.DefTxnScope
}
if txnScope != config.DefTxnScope {
is := infoschema.GetInfoSchema(s)
for physicalTableID := range s.GetSessionVars().TxnCtx.TableDeltaMap {
bundle, ok := is.BundleByName(placement.GroupID(physicalTableID))
if !ok {
err = ddl.ErrInvalidPlacementPolicyCheck.GenWithStackByArgs(
fmt.Sprintf("table or partition %v don't have placement policies with txn_scope %v",
physicalTableID, txnScope))
break
}
dcLocation, ok := placement.GetLeaderDCByBundle(bundle, placement.DCLabelKey)
if !ok {
err = ddl.ErrInvalidPlacementPolicyCheck.GenWithStackByArgs(
fmt.Sprintf("table or partition %v's leader placement policy is not defined", physicalTableID))
break
}
if dcLocation != txnScope {
err = ddl.ErrInvalidPlacementPolicyCheck.GenWithStackByArgs(
fmt.Sprintf("table or partition %v's leader location %v is out of txn_scope %v", physicalTableID, dcLocation, txnScope))
break
}
}
}
return err
}
3 changes: 3 additions & 0 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,9 @@ type TransactionContext struct {
Isolation string
LockExpire uint32
ForUpdate uint32

// TxnScope stores the value of 'txn_scope'.
TxnScope string
}

// GetShard returns the shard prefix for the next `count` rowids.
Expand Down

0 comments on commit 863117b

Please sign in to comment.