Skip to content

Commit

Permalink
Showing 7 changed files with 180 additions and 30 deletions.
9 changes: 8 additions & 1 deletion ddl/ddl_worker.go
Original file line number Diff line number Diff line change
@@ -935,7 +935,14 @@ func updateSchemaVersion(t *meta.Meta, job *model.Job) (int64, error) {
}
}
case model.ActionAlterTableAlterPartition:
diff.TableID = job.CtxVars[0].(int64)
diff.TableID = job.TableID
if len(job.CtxVars) > 0 {
diff.AffectedOpts = []*model.AffectedOption{
{
TableID: job.CtxVars[0].(int64),
},
}
}
default:
diff.TableID = job.TableID
}
40 changes: 25 additions & 15 deletions ddl/partition.go
Original file line number Diff line number Diff line change
@@ -1563,10 +1563,10 @@ func truncateTableByReassignPartitionIDs(t *meta.Meta, tblInfo *model.TableInfo)
return nil
}

func onAlterTablePartition(t *meta.Meta, job *model.Job) (int64, error) {
func onAlterTablePartition(t *meta.Meta, job *model.Job) (ver int64, err error) {
var partitionID int64
bundle := &placement.Bundle{}
err := job.DecodeArgs(&partitionID, bundle)
err = job.DecodeArgs(&partitionID, bundle)
if err != nil {
job.State = model.JobStateCancelled
return 0, errors.Trace(err)
@@ -1583,20 +1583,30 @@ func onAlterTablePartition(t *meta.Meta, job *model.Job) (int64, error) {
return 0, errors.Trace(table.ErrUnknownPartition.GenWithStackByArgs("drop?", tblInfo.Name.O))
}

err = infosync.PutRuleBundles(nil, []*placement.Bundle{bundle})
if err != nil {
job.State = model.JobStateCancelled
return 0, errors.Wrapf(err, "failed to notify PD the placement rules")
}

// used by ApplyDiff in updateSchemaVersion
job.CtxVars = []interface{}{partitionID}
ver, err := updateSchemaVersion(t, job)
if err != nil {
return ver, errors.Trace(err)
pstate := ptInfo.GetStateByID(partitionID)
switch pstate {
case model.StatePublic:
ptInfo.SetStateByID(partitionID, model.StateGlobalTxnOnly)
ver, err = updateVersionAndTableInfo(t, job, tblInfo, true)
if err != nil {
return ver, errors.Trace(err)
}
job.SchemaState = model.StateGlobalTxnOnly
case model.StateGlobalTxnOnly:
err = infosync.PutRuleBundles(nil, []*placement.Bundle{bundle})
if err != nil {
job.State = model.JobStateCancelled
return 0, errors.Wrapf(err, "failed to notify PD the placement rules")
}
ptInfo.SetStateByID(partitionID, model.StatePublic)
// used by ApplyDiff in updateSchemaVersion
job.CtxVars = []interface{}{partitionID}
ver, err = updateVersionAndTableInfo(t, job, tblInfo, true)
if err != nil {
return ver, errors.Trace(err)
}
job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo)
}

job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo)
return ver, nil
}

112 changes: 112 additions & 0 deletions ddl/placement_sql_test.go
Original file line number Diff line number Diff line change
@@ -21,6 +21,7 @@ import (
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/ddl/placement"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/util/testkit"
)

@@ -523,3 +524,114 @@ add placement policy
_, err = tk.Exec("commit")
c.Assert(err, IsNil)
}

func (s *testDBSuite1) TestGlobalTxnState(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")

tk.MustExec(`create table t1 (c int)
PARTITION BY RANGE (c) (
PARTITION p0 VALUES LESS THAN (6),
PARTITION p1 VALUES LESS THAN (11)
);`)

bundles := make(map[string]*placement.Bundle)
is := s.dom.InfoSchema()
is.MockBundles(bundles)

tb, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t1"))
c.Assert(err, IsNil)
pid, err := tables.FindPartitionByName(tb.Meta(), "p0")
c.Assert(err, IsNil)
groupID := placement.GroupID(pid)
bundles[groupID] = &placement.Bundle{
ID: groupID,
Rules: []*placement.Rule{
{
GroupID: groupID,
Role: placement.Leader,
Count: 1,
LabelConstraints: []placement.LabelConstraint{
{
Key: placement.DCLabelKey,
Op: placement.In,
Values: []string{"bj"},
},
},
},
},
}
dbInfo := testGetSchemaByName(c, tk.Se, "test")
tk2 := testkit.NewTestKit(c, s.store)
var chkErr error
done := false
testcases := []struct {
name string
hook *ddl.TestDDLCallback
expectErr error
}{
{
name: "write partition p0 during StateGlobalTxnOnly",
hook: func() *ddl.TestDDLCallback {
hook := &ddl.TestDDLCallback{}
hook.OnJobUpdatedExported = func(job *model.Job) {
if job.Type == model.ActionAlterTableAlterPartition && job.State == model.JobStateRunning &&
job.SchemaState == model.StateGlobalTxnOnly && job.SchemaID == dbInfo.ID && done == false {
done = true
tk2.MustExec("use test")
tk2.MustExec("set @@txn_scope=bj")
_, chkErr = tk2.Exec("insert into t1 (c) values (1);")
}
}
return hook
}(),
expectErr: fmt.Errorf(".*can not be written by local transactions when its placement policy is being altered.*"),
},
// FIXME: support abort read txn during StateGlobalTxnOnly
//{
// name: "read partition p0 during middle state",
// hook: func() *ddl.TestDDLCallback {
// hook := &ddl.TestDDLCallback{}
// hook.OnJobUpdatedExported = func(job *model.Job) {
// if job.Type == model.ActionAlterTableAlterPartition && job.State == model.JobStateRunning &&
// job.SchemaState == model.StateGlobalTxnOnly && job.SchemaID == dbInfo.ID && done == false {
// done = true
// tk2.MustExec("use test")
// tk2.MustExec("set @@txn_scope=bj")
// tk2.MustExec("begin;")
// tk2.MustExec("select * from t1 where c < 6;")
// _, chkErr = tk2.Exec("commit")
// }
// }
// return hook
// }(),
// expectErr: fmt.Errorf(".*can not be written by local transactions when its placement policy is being altered.*"),
//},
}
originalHook := s.dom.DDL().GetHook()
testFunc := func(name string, hook *ddl.TestDDLCallback, expectErr error) {
c.Log(name)
done = false
s.dom.DDL().(ddl.DDLForTest).SetHook(hook)
defer func() {
s.dom.DDL().(ddl.DDLForTest).SetHook(originalHook)
}()
_, err = tk.Exec(`alter table t1 alter partition p0
alter placement policy
constraints='["+zone=bj"]'
role=leader
replicas=1`)
c.Assert(err, IsNil)
c.Assert(done, Equals, true)
if expectErr != nil {
c.Assert(chkErr, NotNil)
c.Assert(chkErr.Error(), Matches, expectErr.Error())
} else {
c.Assert(chkErr, IsNil)
}
}

for _, testcase := range testcases {
testFunc(testcase.name, testcase.hook, testcase.expectErr)
}
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -47,7 +47,7 @@ require (
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989
github.com/pingcap/kvproto v0.0.0-20201130052818-5dfa7b1325a3
github.com/pingcap/log v0.0.0-20201112100606-8f1e84a3abc8
github.com/pingcap/parser v0.0.0-20201201081851-e13818a9916a
github.com/pingcap/parser v0.0.0-20201203085211-44f6be1df1c4
github.com/pingcap/sysutil v0.0.0-20201130064824-f0c8aa6a6966
github.com/pingcap/tidb-lightning v4.0.9-0.20201106041742-a1ac97827a27+incompatible
github.com/pingcap/tidb-tools v4.0.9-0.20201127090955-2707c97b3853+incompatible
5 changes: 2 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
@@ -691,8 +691,8 @@ github.com/pingcap/log v0.0.0-20201112100606-8f1e84a3abc8 h1:M+DNpOu/I3uDmwee6vc
github.com/pingcap/log v0.0.0-20201112100606-8f1e84a3abc8/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/parser v0.0.0-20190506092653-e336082eb825/go.mod h1:1FNvfp9+J0wvc4kl8eGNh7Rqrxveg15jJoWo/a0uHwA=
github.com/pingcap/parser v0.0.0-20200422082501-7329d80eaf2c/go.mod h1:9v0Edh8IbgjGYW2ArJr19E+bvL8zKahsFp+ixWeId+4=
github.com/pingcap/parser v0.0.0-20201201081851-e13818a9916a h1:1ew23DwaNc1rJb85q3W5IpeQDQp3X+Fojv0qBo18oLk=
github.com/pingcap/parser v0.0.0-20201201081851-e13818a9916a/go.mod h1:GbEr2PgY72/4XqPZzmzstlOU/+il/wrjeTNFs6ihsSE=
github.com/pingcap/parser v0.0.0-20201203085211-44f6be1df1c4 h1:D1JuGq6UWQbqknDa6VI/6S9+i9PTrGmGe0qBChqHE7k=
github.com/pingcap/parser v0.0.0-20201203085211-44f6be1df1c4/go.mod h1:GbEr2PgY72/4XqPZzmzstlOU/+il/wrjeTNFs6ihsSE=
github.com/pingcap/pd v2.1.5+incompatible/go.mod h1:nD3+EoYes4+aNNODO99ES59V83MZSI+dFbhyr667a0E=
github.com/pingcap/pd/v4 v4.0.0-rc.1.0.20200422143320-428acd53eba2/go.mod h1:s+utZtXDznOiL24VK0qGmtoHjjXNsscJx3m1n8cC56s=
github.com/pingcap/sysutil v0.0.0-20200206130906-2bfa6dc40bcd/go.mod h1:EB/852NMQ+aRKioCpToQ94Wl7fktV+FNnxf3CX/TTXI=
@@ -781,7 +781,6 @@ github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b/go.mod h1:dA0hQrY
github.com/segmentio/kafka-go v0.1.0/go.mod h1:X6itGqS9L4jDletMsxZ7Dz+JFWxM6JHfPOCvTvk+EJo=
github.com/segmentio/kafka-go v0.2.0/go.mod h1:X6itGqS9L4jDletMsxZ7Dz+JFWxM6JHfPOCvTvk+EJo=
github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo=
github.com/sergi/go-diff v1.0.1-0.20180205163309-da645544ed44 h1:tB9NOR21++IjLyVx3/PCPhWMwqGNCMQEH96A6dMZ/gc=
github.com/sergi/go-diff v1.0.1-0.20180205163309-da645544ed44/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo=
github.com/shirou/gopsutil v2.19.10+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
github.com/shirou/gopsutil v2.20.3+incompatible h1:0JVooMPsT7A7HqEYdydp/OfjSOYSjhXV7w1hkKj/NPQ=
13 changes: 10 additions & 3 deletions infoschema/builder.go
Original file line number Diff line number Diff line change
@@ -50,9 +50,6 @@ func (b *Builder) ApplyDiff(m *meta.Meta, diff *model.SchemaDiff) ([]int64, erro
return tblIDs, nil
case model.ActionModifySchemaCharsetAndCollate:
return nil, b.applyModifySchemaCharsetAndCollate(m, diff)
case model.ActionAlterTableAlterPartition:
// TODO: enhancement: If the leader Placement Policy isn't updated, maybe we can omit the diff.
return []int64{diff.TableID}, b.applyPlacementUpdate(placement.GroupID(diff.TableID))
}
roDBInfo, ok := b.is.SchemaByID(diff.SchemaID)
if !ok {
@@ -140,6 +137,10 @@ func (b *Builder) ApplyDiff(m *meta.Meta, diff *model.SchemaDiff) ([]int64, erro
// While session 1 performs the DML operation associated with partition 1,
// the TRUNCATE operation of session 2 on partition 2 does not cause the operation of session 1 to fail.
switch diff.Type {
case model.ActionAlterTableAlterPartition:
partitionID := opt.TableID
// TODO: enhancement: If the leader Placement Policy isn't updated, maybe we can omit the diff.
return []int64{partitionID}, b.applyPlacementUpdate(placement.GroupID(partitionID))
case model.ActionTruncateTablePartition:
tblIDs = append(tblIDs, opt.OldTableID)
b.applyPlacementDelete(placement.GroupID(opt.OldTableID))
@@ -180,6 +181,12 @@ func (b *Builder) ApplyDiff(m *meta.Meta, diff *model.SchemaDiff) ([]int64, erro
}
tblIDs = append(tblIDs, affectedIDs...)
}
} else {
switch diff.Type {
case model.ActionAlterTableAlterPartition:
// If there is no AffectedOpts, It means the job is in Public -> GlobalTxnState phase
return []int64{}, nil
}
}
return tblIDs, nil
}
29 changes: 22 additions & 7 deletions session/session.go
Original file line number Diff line number Diff line change
@@ -427,6 +427,10 @@ func (s *session) doCommit(ctx context.Context) error {
if s.txn.IsReadOnly() {
return nil
}
err := s.checkPlacementPolicyBeforeCommit()
if err != nil {
return err
}

// mockCommitError and mockGetTSErrorInRetry use to test PR #8743.
failpoint.Inject("mockCommitError", func(val failpoint.Value) {
@@ -485,10 +489,6 @@ func (s *session) doCommitWithRetry(ctx context.Context) error {
return nil
}
var err error
err = s.checkPlacementPolicyBeforeCommit()
if err != nil {
return err
}
txnSize := s.txn.Size()
isPessimistic := s.txn.IsPessimistic()
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
@@ -2511,11 +2511,12 @@ func (s *session) checkPlacementPolicyBeforeCommit() error {
// Get the txnScope of the transaction we're going to commit.
txnScope := s.txn.GetUnionStore().GetOption(kv.TxnScope)
if txnScope == "" {
txnScope = config.DefTxnScope
txnScope = oracle.GlobalTxnScope
}
if txnScope != config.DefTxnScope {
if txnScope != oracle.GlobalTxnScope {
is := infoschema.GetInfoSchema(s)
for physicalTableID := range s.GetSessionVars().TxnCtx.TableDeltaMap {
deltaMap := s.GetSessionVars().TxnCtx.TableDeltaMap
for physicalTableID := range deltaMap {
bundle, ok := is.BundleByName(placement.GroupID(physicalTableID))
if !ok {
err = ddl.ErrInvalidPlacementPolicyCheck.GenWithStackByArgs(
@@ -2534,6 +2535,20 @@ func (s *session) checkPlacementPolicyBeforeCommit() error {
fmt.Sprintf("table or partition %v's leader location %v is out of txn_scope %v", physicalTableID, dcLocation, txnScope))
break
}
// FIXME: currently we assume the physicalTableID is the partition ID. In future, we should consider the situation
// if the physicalTableID belongs to a Table.
partitionID := physicalTableID
tbl, _, partitionDefInfo := is.FindTableByPartitionID(partitionID)
if tbl != nil {
tblInfo := tbl.Meta()
state := tblInfo.Partition.GetStateByID(partitionID)
if state == model.StateGlobalTxnOnly {
err = ddl.ErrInvalidPlacementPolicyCheck.GenWithStackByArgs(
fmt.Sprintf("Partition %s of table %s can not be written by local transactions when its placement policy is being altered",
tblInfo.Name, partitionDefInfo.Name))
break
}
}
}
}
return err

0 comments on commit 0f9d77b

Please sign in to comment.