Skip to content

Commit

Permalink
br: Consider partition table's placement for br (pingcap#33139)
Browse files Browse the repository at this point in the history
  • Loading branch information
lcwangchao authored Mar 16, 2022
1 parent 80ca342 commit 33e7632
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 47 deletions.
4 changes: 2 additions & 2 deletions br/pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ func BuildBackupRangeAndSchema(
if !isFullBackup {
// according to https://github.com/pingcap/tidb/issues/32290.
// ignore placement policy when not in full backup
tableInfo.PlacementPolicyRef = nil
tableInfo.ClearPlacement()
}

if tableInfo.PKIsHandle && tableInfo.ContainsAutoRandomBits() {
Expand Down Expand Up @@ -478,7 +478,7 @@ func WriteBackupDDLJobs(metaWriter *metautil.MetaWriter, store kv.Storage, lastB
}
if job.BinlogInfo.TableInfo != nil {
// ignore all placement policy info during incremental backup for now.
job.BinlogInfo.TableInfo.PlacementPolicyRef = nil
job.BinlogInfo.TableInfo.ClearPlacement()
}
jobBytes, err := json.Marshal(job)
if err != nil {
Expand Down
14 changes: 6 additions & 8 deletions br/pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,21 +459,19 @@ func (rc *Client) CreateDatabase(ctx context.Context, db *model.DBInfo) error {
log.Info("skip create database", zap.Stringer("database", db.Name))
return nil
}

if !rc.supportPolicy {
log.Info("set placementPolicyRef to nil when target tidb not support policy",
zap.Stringer("database", db.Name))
db.PlacementPolicyRef = nil
}
if db.PlacementPolicyRef != nil && rc.policyMap != nil {
if policy, ok := rc.policyMap.Load(db.PlacementPolicyRef.Name.L); ok {
err := rc.db.CreatePlacementPolicy(ctx, policy.(*model.PolicyInfo))
if err != nil {
return errors.Trace(err)
}
// delete policy in cache after restore succeed
rc.policyMap.Delete(db.PlacementPolicyRef.Name.L)

if db.PlacementPolicyRef != nil {
if err := rc.db.ensurePlacementPolicy(ctx, db.PlacementPolicyRef.Name, rc.policyMap); err != nil {
return errors.Trace(err)
}
}

return rc.db.CreateDatabase(ctx, db)
}

Expand Down
74 changes: 50 additions & 24 deletions br/pkg/restore/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,18 +263,13 @@ func (db *DB) CreateTables(ctx context.Context, tables []*metautil.Table,
m := map[string][]*model.TableInfo{}
for _, table := range tables {
m[table.DB.Name.L] = append(m[table.DB.Name.L], table.Info)
if table.Info.PlacementPolicyRef != nil && policyMap != nil {
if !supportPolicy {
log.Info("set placementPolicyRef to nil when target tidb not support policy",
zap.Stringer("table", table.Info.Name), zap.Stringer("db", table.DB.Name))
table.Info.PlacementPolicyRef = nil
} else if p, exists := policyMap.Load(table.Info.PlacementPolicyRef.Name.L); exists {
err := db.CreatePlacementPolicy(ctx, p.(*model.PolicyInfo))
if err != nil {
return errors.Trace(err)
}
// delete policy in cache after restore table succeed.
policyMap.Delete(table.Info.PlacementPolicyRef.Name.L)
if !supportPolicy {
log.Info("set placementPolicyRef to nil when target tidb not support policy",
zap.Stringer("table", table.Info.Name), zap.Stringer("db", table.DB.Name))
table.Info.ClearPlacement()
} else {
if err := db.ensureTablePlacementPolicies(ctx, table.Info, policyMap); err != nil {
return errors.Trace(err)
}
}
}
Expand All @@ -295,20 +290,16 @@ func (db *DB) CreateTables(ctx context.Context, tables []*metautil.Table,
// CreateTable executes a CREATE TABLE SQL.
func (db *DB) CreateTable(ctx context.Context, table *metautil.Table,
ddlTables map[UniqueTableName]bool, supportPolicy bool, policyMap *sync.Map) error {
if table.Info.PlacementPolicyRef != nil && policyMap != nil {
if !supportPolicy {
log.Info("set placementPolicyRef to nil when target tidb not support policy",
zap.Stringer("table", table.Info.Name), zap.Stringer("db", table.DB.Name))
table.Info.PlacementPolicyRef = nil
} else if p, exists := policyMap.Load(table.Info.PlacementPolicyRef.Name.L); exists {
err := db.CreatePlacementPolicy(ctx, p.(*model.PolicyInfo))
if err != nil {
return errors.Trace(err)
}
// delete policy in cache after restore table succeed.
policyMap.Delete(table.Info.PlacementPolicyRef.Name.L)
if !supportPolicy {
log.Info("set placementPolicyRef to nil when target tidb not support policy",
zap.Stringer("table", table.Info.Name), zap.Stringer("db", table.DB.Name))
table.Info.ClearPlacement()
} else {
if err := db.ensureTablePlacementPolicies(ctx, table.Info, policyMap); err != nil {
return errors.Trace(err)
}
}

err := db.se.CreateTable(ctx, table.DB.Name, table.Info)
if err != nil {
log.Error("create table failed",
Expand All @@ -331,6 +322,41 @@ func (db *DB) Close() {
db.se.Close()
}

func (db *DB) ensurePlacementPolicy(ctx context.Context, policyName model.CIStr, policies *sync.Map) error {
if policies == nil {
return nil
}

if policy, ok := policies.LoadAndDelete(policyName.L); ok {
return db.CreatePlacementPolicy(ctx, policy.(*model.PolicyInfo))
}

// This means policy already created
return nil
}

func (db *DB) ensureTablePlacementPolicies(ctx context.Context, tableInfo *model.TableInfo, policies *sync.Map) error {
if tableInfo.PlacementPolicyRef != nil {
if err := db.ensurePlacementPolicy(ctx, tableInfo.PlacementPolicyRef.Name, policies); err != nil {
return err
}
}

if tableInfo.Partition != nil {
for _, def := range tableInfo.Partition.Definitions {
if def.PlacementPolicyRef == nil {
continue
}

if err := db.ensurePlacementPolicy(ctx, def.PlacementPolicyRef.Name, policies); err != nil {
return err
}
}
}

return nil
}

// FilterDDLJobs filters ddl jobs.
func FilterDDLJobs(allDDLJobs []*model.Job, tables []*metautil.Table) (ddlJobs []*model.Job) {
// Sort the ddl jobs by schema version in descending order.
Expand Down
49 changes: 36 additions & 13 deletions br/tests/br_tidb_placement_policy/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,12 @@ rm -rf $PROGRESS_FILE

run_sql "create schema $DB;"
run_sql "create placement policy fivereplicas followers=4;"
run_sql "create placement policy tworeplicas followers=1;"

# generate 30 tables with 1 row content with policy fivereplicas;.
i=1
while [ $i -le $TABLES_COUNT ]; do
run_sql "create table $DB.sbtest$i(id int primary key, k int not null, c char(120) not null, pad char(60) not null) placement policy=fivereplicas;"
run_sql "create table $DB.sbtest$i(id int primary key, k int not null, c char(120) not null, pad char(60) not null) placement policy=fivereplicas partition by range(id) (partition p0 values less than (10) placement policy tworeplicas, partition p1 values less than MAXVALUE)"
run_sql "insert into $DB.sbtest$i values ($i, $i, '$i', '$i');"
i=$(($i+1))
done
Expand All @@ -48,20 +49,28 @@ run_br backup full --log-file $BACKUPMETAV1_LOG -s "local://$TEST_DIR/$DB" --pd
# clear data and policy fore restore.
run_sql "DROP DATABASE $DB;"
run_sql "DROP PLACEMENT POLICY fivereplicas;"
run_sql "DROP PLACEMENT POLICY tworeplicas;"

# restore with tidb-placement-policy
echo "restore with tidb-placement start..."
run_br restore db --db $DB -s "local://$TEST_DIR/${DB}v2" --pd $PD_ADDR

policy_name=$(run_sql "use $DB; show placement;" | grep "POLICY" | awk '{print $2}')
if [ "$policy_name" -ne "fivereplicas" ];then
policy_count=$(run_sql "use $DB; show placement;" | grep "POLICY fivereplicas" | wc -l)
if [ "$policy_count" -ne "1" ];then
echo "TEST: [$TEST_NAME] failed! due to policy restore failed"
exit 1
fi

policy_count=$(run_sql "use $DB; show placement;" | grep "POLICY tworeplicas" | wc -l)
if [ "$policy_count" -ne "1" ];then
echo "TEST: [$TEST_NAME] failed! due to policy restore failed"
exit 1
fi

# clear data and policy for restore.
run_sql "DROP DATABASE $DB;"
run_sql "DROP PLACEMENT POLICY fivereplicas;"
run_sql "DROP PLACEMENT POLICY tworeplicas;"

# restore without tidb-placement-policy
echo "restore without tidb-placement start..."
Expand All @@ -79,16 +88,18 @@ run_sql "DROP DATABASE $DB;"
echo "test backup db can ignore placement policy"
run_sql "create schema $DB;"
run_sql "create placement policy fivereplicas followers=4;"
run_sql "create placement policy tworeplicas followers=1;"

# generate one table with one row content with policy fivereplicas;.
run_sql "create table $DB.sbtest(id int primary key, k int not null, c char(120) not null, pad char(60) not null) placement policy=fivereplicas;"
run_sql "create table $DB.sbtest(id int primary key, k int not null, c char(120) not null, pad char(60) not null) placement policy=fivereplicas partition by range(id) (partition p0 values less than (10) placement policy tworeplicas, partition p1 values less than MAXVALUE);"
run_sql "insert into $DB.sbtest values ($i, $i, '$i', '$i');"

run_br backup db --db $DB -s "local://$TEST_DIR/${DB}_db" --pd $PD_ADDR

# clear data and policy for restore.
run_sql "DROP DATABASE $DB;"
run_sql "DROP PLACEMENT POLICY fivereplicas;"
run_sql "DROP PLACEMENT POLICY tworeplicas;"

# restore should success and no policy have been restored.
run_br restore db --db $DB -s "local://$TEST_DIR/${DB}_db" --pd $PD_ADDR
Expand All @@ -104,12 +115,13 @@ run_sql "DROP DATABASE $DB;"

echo "test only restore related placement policy..."
run_sql "create schema $DB;"
# we have two policies
# we have three policies
run_sql "create placement policy fivereplicas followers=4;"
run_sql "create placement policy tworeplicas followers=1;"
run_sql "create placement policy foureplicas followers=3;"

# generate one table with one row content with policy fivereplicas;.
run_sql "create table $DB.sbtest(id int primary key, k int not null, c char(120) not null, pad char(60) not null) placement policy=fivereplicas;"
run_sql "create table $DB.sbtest(id int primary key, k int not null, c char(120) not null, pad char(60) not null) placement policy=fivereplicas partition by range(id) (partition p0 values less than (10) placement policy tworeplicas, partition p1 values less than MAXVALUE);"
run_sql "insert into $DB.sbtest values ($i, $i, '$i', '$i');"

# backup table and policies
Expand All @@ -119,36 +131,46 @@ run_br backup full -s "local://$TEST_DIR/${DB}_related" --pd $PD_ADDR
run_sql "DROP DATABASE $DB;"
run_sql "DROP PLACEMENT POLICY fivereplicas;"
run_sql "DROP PLACEMENT POLICY tworeplicas;"
run_sql "DROP PLACEMENT POLICY foureplicas;"

# restore table
run_br restore table --db $DB --table sbtest -s "local://$TEST_DIR/${DB}_related" --pd $PD_ADDR

# verify only one policy has been restored
policy_count=$(run_sql "use $DB; show placement;" | grep "POLICY" | wc -l)
if [ "$policy_count" -ne "1" ];then
if [ "$policy_count" -ne "2" ];then
echo "TEST: [$TEST_NAME] failed! due to policy should be ignore"
exit 1
fi

# which is fivereplicas...
policy_name=$(run_sql "use $DB; show placement;" | grep "POLICY" | awk '{print $2}')
if [ "$policy_name" = "fivereplicas" ];then
# which have fivereplicas...
policy_count=$(run_sql "use $DB; show placement;" | grep "POLICY" | grep "fivereplicas" | wc -l)
if [ "$policy_count" -ne "1" ];then
echo "TEST: [$TEST_NAME] failed! due to policy restore failed"
exit 1
fi

# which have tworeplicas...
policy_count=$(run_sql "use $DB; show placement;" | grep "POLICY" | grep "tworeplicas" | wc -l)
if [ "$policy_count" -ne "1" ];then
echo "TEST: [$TEST_NAME] failed! due to policy restore failed"
exit 1
fi

# clear data and policies for next case.
run_sql "DROP DATABASE $DB;"
run_sql "DROP PLACEMENT POLICY fivereplicas;"
run_sql "DROP PLACEMENT POLICY tworeplicas;"

echo "test restore all placement policies..."
run_sql "create schema $DB;"
# we have two policies
# we have three policies
run_sql "create placement policy fivereplicas followers=4;"
run_sql "create placement policy tworeplicas followers=1;"
run_sql "create placement policy foureplicas followers=3;"

# generate one table with one row content with policy fivereplicas;.
run_sql "create table $DB.sbtest(id int primary key, k int not null, c char(120) not null, pad char(60) not null) placement policy=fivereplicas;"
run_sql "create table $DB.sbtest(id int primary key, k int not null, c char(120) not null, pad char(60) not null) placement policy=fivereplicas partition by range(id) (partition p0 values less than (10) placement policy tworeplicas, partition p1 values less than MAXVALUE);"
run_sql "insert into $DB.sbtest values ($i, $i, '$i', '$i');"

# backup table and policies
Expand All @@ -158,13 +180,14 @@ run_br backup full -s "local://$TEST_DIR/${DB}_all" --pd $PD_ADDR
run_sql "DROP DATABASE $DB;"
run_sql "DROP PLACEMENT POLICY fivereplicas;"
run_sql "DROP PLACEMENT POLICY tworeplicas;"
run_sql "DROP PLACEMENT POLICY foureplicas;"

# restore table
run_br restore full -f "$DB.sbtest" -s "local://$TEST_DIR/${DB}_all" --pd $PD_ADDR

# verify all policies have been restored even we only restore one table during tableFilter.
policy_count=$(run_sql "use $DB; show placement;" | grep "POLICY" | wc -l)
if [ "$policy_count" -ne "2" ];then
if [ "$policy_count" -ne "3" ];then
echo "TEST: [$TEST_NAME] failed! due to policy should be ignore"
exit 1
fi
11 changes: 11 additions & 0 deletions parser/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -637,6 +637,17 @@ func (t *TableInfo) IsLocked() bool {
return t.Lock != nil && len(t.Lock.Sessions) > 0
}

// ClearPlacement clears all table and partitions' placement settings
func (t *TableInfo) ClearPlacement() {
t.PlacementPolicyRef = nil
if t.Partition != nil {
for i := range t.Partition.Definitions {
def := &t.Partition.Definitions[i]
def.PlacementPolicyRef = nil
}
}
}

// NewExtraHandleColInfo mocks a column info for extra handle column.
func NewExtraHandleColInfo() *ColumnInfo {
colInfo := &ColumnInfo{
Expand Down

0 comments on commit 33e7632

Please sign in to comment.