Skip to content

Commit

Permalink
ddl, store: Remove placement rules when the GC worker removes partiti…
Browse files Browse the repository at this point in the history
…ons (pingcap#20575)
  • Loading branch information
djshow832 authored Oct 22, 2020
1 parent 66b241e commit 67f78d0
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 26 deletions.
27 changes: 3 additions & 24 deletions ddl/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package ddl
import (
"bytes"
"context"
"encoding/hex"
"fmt"
"strconv"
"strings"
Expand Down Expand Up @@ -46,7 +45,6 @@ import (
"github.com/pingcap/tidb/types"
tidbutil "github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/sqlexec"
"go.uber.org/zap"
Expand Down Expand Up @@ -1019,19 +1017,13 @@ func getTableInfoWithDroppingPartitions(t *model.TableInfo) *model.TableInfo {
return nt
}

func buildPlacementDropBundle(partitionID int64) *placement.Bundle {
return &placement.Bundle{
ID: placement.GroupID(partitionID),
}
}

func dropRuleBundles(d *ddlCtx, physicalTableIDs []int64) error {
if d.infoHandle != nil {
bundles := make([]*placement.Bundle, 0, len(physicalTableIDs))
for _, ID := range physicalTableIDs {
oldBundle, ok := d.infoHandle.Get().BundleByName(placement.GroupID(ID))
if ok && !oldBundle.IsEmpty() {
bundles = append(bundles, buildPlacementDropBundle(ID))
bundles = append(bundles, placement.BuildPlacementDropBundle(ID))
}
}
err := infosync.PutRuleBundles(nil, bundles)
Expand Down Expand Up @@ -1154,19 +1146,6 @@ func (w *worker) onDropTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (
return ver, errors.Trace(err)
}

func buildPlacementTruncateBundle(oldBundle *placement.Bundle, newID int64) *placement.Bundle {
newBundle := oldBundle.Clone()
newBundle.ID = placement.GroupID(newID)
startKey := hex.EncodeToString(codec.EncodeBytes(nil, tablecodec.GenTablePrefix(newID)))
endKey := hex.EncodeToString(codec.EncodeBytes(nil, tablecodec.GenTablePrefix(newID+1)))
for _, rule := range newBundle.Rules {
rule.GroupID = newBundle.ID
rule.StartKeyHex = startKey
rule.EndKeyHex = endKey
}
return newBundle
}

// onTruncateTablePartition truncates old partition meta.
func onTruncateTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (int64, error) {
var ver int64
Expand Down Expand Up @@ -1226,8 +1205,8 @@ func onTruncateTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (int64, e
for i, oldID := range oldIDs {
oldBundle, ok := d.infoHandle.Get().BundleByName(placement.GroupID(oldID))
if ok && !oldBundle.IsEmpty() {
bundles = append(bundles, buildPlacementDropBundle(oldID))
bundles = append(bundles, buildPlacementTruncateBundle(oldBundle, newPartitions[i].ID))
bundles = append(bundles, placement.BuildPlacementDropBundle(oldID))
bundles = append(bundles, placement.BuildPlacementTruncateBundle(oldBundle, newPartitions[i].ID))
}
}

Expand Down
24 changes: 24 additions & 0 deletions ddl/placement/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,13 @@
package placement

import (
"encoding/hex"
"fmt"
"strings"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/util/codec"
)

func checkLabelConstraint(label string) (LabelConstraint, error) {
Expand Down Expand Up @@ -75,3 +78,24 @@ func CheckLabelConstraints(labels []string) ([]LabelConstraint, error) {
func GroupID(id int64) string {
return fmt.Sprintf("TIDB_DDL_%d", id)
}

// BuildPlacementDropBundle builds the bundle to drop placement rules.
func BuildPlacementDropBundle(partitionID int64) *Bundle {
return &Bundle{
ID: GroupID(partitionID),
}
}

// BuildPlacementTruncateBundle builds the bundle to copy placement rules from old id to new id.
func BuildPlacementTruncateBundle(oldBundle *Bundle, newID int64) *Bundle {
newBundle := oldBundle.Clone()
newBundle.ID = GroupID(newID)
startKey := hex.EncodeToString(codec.EncodeBytes(nil, tablecodec.GenTablePrefix(newID)))
endKey := hex.EncodeToString(codec.EncodeBytes(nil, tablecodec.GenTablePrefix(newID+1)))
for _, rule := range newBundle.Rules {
rule.GroupID = newBundle.ID
rule.StartKeyHex = startKey
rule.EndKeyHex = endKey
}
return newBundle
}
4 changes: 2 additions & 2 deletions ddl/placement_rule_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ func (s *testPlacementSuite) TestPlacementBuildDrop(c *C) {
},
}
for _, t := range tests {
out := buildPlacementDropBundle(t.input)
out := placement.BuildPlacementDropBundle(t.input)
c.Assert(t.output, DeepEquals, out)
}
}
Expand Down Expand Up @@ -300,7 +300,7 @@ func (s *testPlacementSuite) TestPlacementBuildTruncate(c *C) {
},
}
for _, t := range tests {
out := buildPlacementTruncateBundle(bundle, t.input)
out := placement.BuildPlacementTruncateBundle(bundle, t.input)
c.Assert(t.output, DeepEquals, out)
c.Assert(bundle.ID, Equals, placement.GroupID(-1))
c.Assert(bundle.Rules, HasLen, 1)
Expand Down
71 changes: 71 additions & 0 deletions store/tikv/gcworker/gc_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"bytes"
"container/heap"
"context"
"encoding/json"
"fmt"
"math"
"os"
Expand All @@ -31,10 +32,13 @@ import (
"github.com/pingcap/kvproto/pkg/errorpb"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/ddl/placement"
"github.com/pingcap/tidb/ddl/util"
"github.com/pingcap/tidb/domain/infosync"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/privilege"
"github.com/pingcap/tidb/session"
Expand All @@ -43,6 +47,7 @@ import (
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
tidbutil "github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/admin"
"github.com/pingcap/tidb/util/logutil"
pd "github.com/tikv/pd/client"
"go.uber.org/zap"
Expand Down Expand Up @@ -655,6 +660,17 @@ func (w *GCWorker) deleteRanges(ctx context.Context, safePoint uint64, concurren
zap.Error(err))
metrics.GCUnsafeDestroyRangeFailuresCounterVec.WithLabelValues("save").Inc()
}

pid, err := w.doGCPlacementRules(r)
if err != nil {
logutil.Logger(ctx).Error("[gc worker] gc placement rules failed on range",
zap.String("uuid", w.uuid),
zap.Int64("jobID", r.JobID),
zap.Int64("elementID", r.ElementID),
zap.Int64("pid", pid),
zap.Error(err))
continue
}
}
logutil.Logger(ctx).Info("[gc worker] finish delete ranges",
zap.String("uuid", w.uuid),
Expand Down Expand Up @@ -1773,6 +1789,61 @@ func (w *GCWorker) saveValueToSysTable(key, value string) error {
return errors.Trace(err)
}

// GC placement rules when the partitions are removed by the GC worker.
// Placement rules cannot be removed immediately after drop table / truncate table,
// because the tables can be flashed back or recovered.
func (w *GCWorker) doGCPlacementRules(dr util.DelRangeTask) (pid int64, err error) {
// Get the job from the job history
var historyJob *model.Job
failpoint.Inject("mockHistoryJobForGC", func(v failpoint.Value) {
args, err1 := json.Marshal([]interface{}{kv.Key{}, []int64{int64(v.(int))}})
if err1 != nil {
return
}
historyJob = &model.Job{
ID: dr.JobID,
Type: model.ActionDropTable,
RawArgs: args,
}
})
if historyJob == nil {
err = kv.RunInNewTxn(w.store, false, func(txn kv.Transaction) error {
var err1 error
t := meta.NewMeta(txn)
historyJob, err1 = t.GetHistoryDDLJob(dr.JobID)
return err1
})
if err != nil {
return
}
if historyJob == nil {
return 0, admin.ErrDDLJobNotFound.GenWithStackByArgs(dr.JobID)
}
}

// Get the partition ID from the job and DelRangeTask.
switch historyJob.Type {
case model.ActionDropTable, model.ActionTruncateTable:
var physicalTableIDs []int64
var startKey kv.Key
if err = historyJob.DecodeArgs(&startKey, &physicalTableIDs); err != nil {
return
}
// If it's a partitioned table, then the element ID is the partition ID.
if len(physicalTableIDs) > 0 {
pid = dr.ElementID
}
}
// Not drop table / truncate table or not a partitioned table, no need to GC placement rules.
if pid == 0 {
return
}
// Notify PD to drop the placement rules, even if there may be no placement rules.
bundles := []*placement.Bundle{placement.BuildPlacementDropBundle(pid)}
err = infosync.PutRuleBundles(nil, bundles)
return
}

// RunGCJob sends GC command to KV. It is exported for kv api, do not use it with GCWorker at the same time.
func RunGCJob(ctx context.Context, s tikv.Storage, pd pd.Client, safePoint uint64, identifier string, concurrency int) error {
gcWorker := &GCWorker{
Expand Down
17 changes: 17 additions & 0 deletions store/tikv/gcworker/gc_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -544,6 +544,11 @@ const (
)

func (s *testGCWorkerSuite) testDeleteRangesFailureImpl(c *C, failType int) {
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/gcworker/mockHistoryJobForGC", "return(1)"), IsNil)
defer func() {
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/gcworker/mockHistoryJobForGC"), IsNil)
}()

// Put some delete range tasks.
se := createSession(s.gcWorker.store)
defer se.Close()
Expand Down Expand Up @@ -1469,3 +1474,15 @@ func (s *testGCWorkerSuite) TestPhyscailScanLockDeadlock(c *C) {
c.Fatal("physicalScanAndResolveLocks blocks")
}
}

func (s *testGCWorkerSuite) TestGCPlacementRules(c *C) {
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/gcworker/mockHistoryJobForGC", "return(1)"), IsNil)
defer func() {
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/gcworker/mockHistoryJobForGC"), IsNil)
}()

dr := util.DelRangeTask{JobID: 1, ElementID: 1}
pid, err := s.gcWorker.doGCPlacementRules(dr)
c.Assert(pid, Equals, int64(1))
c.Assert(err, IsNil)
}

0 comments on commit 67f78d0

Please sign in to comment.