Skip to content

Commit

Permalink
*: let TempIndex support encode/decode partitionID flag (pingcap#57017)
Browse files Browse the repository at this point in the history
  • Loading branch information
Defined2014 authored Nov 1, 2024
1 parent 999befe commit 5da9d1a
Show file tree
Hide file tree
Showing 6 changed files with 78 additions and 24 deletions.
2 changes: 1 addition & 1 deletion pkg/ddl/index_merge_tmp.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ func (w *mergeIndexWorker) fetchTempIndexVals(

// Extract the operations on the original index and replay them later.
for _, elem := range tempIdxVal {
if elem.KeyVer == tables.TempIndexKeyTypeMerge || elem.KeyVer == tables.TempIndexKeyTypeDelete {
if elem.KeyVer == tablecodec.TempIndexKeyTypeMerge || elem.KeyVer == tablecodec.TempIndexKeyTypeDelete {
// For 'm' version kvs, they are double-written.
// For 'd' version kvs, they are written in the delete-only state and can be dropped safely.
continue
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/ingest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ go_test(
embed = [":ingest"],
flaky = True,
race = "on",
shard_count = 21,
shard_count = 22,
deps = [
"//pkg/config",
"//pkg/ddl/ingest/testutil",
Expand Down
27 changes: 27 additions & 0 deletions pkg/ddl/ingest/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -470,3 +470,30 @@ func TestAddGlobalIndexInIngest(t *testing.T) {
require.Equal(t, rsGlobalIndex1.String(), rsTable.String())
require.Equal(t, rsGlobalIndex1.String(), rsGlobalIndex2.String())
}

func TestAddGlobalIndexInIngestWithUpdate(t *testing.T) {
store := testkit.CreateMockStore(t)
defer ingesttestutil.InjectMockBackendMgr(t, store)()

tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")

tk.MustExec("drop table if exists t")
tk.MustExec("create table t (a int, b int) partition by hash(a) partitions 5")
tk.MustExec("insert into t (a, b) values (1, 1), (2, 2), (3, 3)")
var i atomic.Int32
i.Store(3)
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/onJobUpdated", func(job *model.Job) {
tk2 := testkit.NewTestKit(t, store)
tmp := i.Add(1)
_, err := tk2.Exec(fmt.Sprintf("insert into test.t values (%d, %d)", tmp, tmp))
assert.Nil(t, err)

_, err = tk2.Exec(fmt.Sprintf("update test.t set b = b + 11, a = b where b = %d", tmp-1))
assert.Nil(t, err)
})
tk.MustExec("alter table t add unique index idx(b) global")
rsGlobalIndex := tk.MustQuery("select *,_tidb_rowid from t use index(idx)").Sort()
rsTable := tk.MustQuery("select *,_tidb_rowid from t use index()").Sort()
require.Equal(t, rsGlobalIndex.String(), rsTable.String())
}
4 changes: 2 additions & 2 deletions pkg/kv/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -683,9 +683,9 @@ func (m *MemAwareHandleMap[V]) Range(fn func(h Handle, val V) bool) {
return
}
}
for _, v := range m.partitionInts {
for pid, v := range m.partitionInts {
for h, val := range v.M {
if !fn(IntHandle(h), val) {
if !fn(NewPartitionHandle(pid, IntHandle(h)), val) {
return
}
}
Expand Down
27 changes: 10 additions & 17 deletions pkg/table/tables/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func (c *index) create(sctx table.MutateContext, txn kv.Transaction, indexedValu
)
if !opt.FromBackFill() {
key, tempKey, keyVer = GenTempIdxKeyByState(c.idxInfo, key)
if keyVer == TempIndexKeyTypeBackfill || keyVer == TempIndexKeyTypeDelete {
if keyVer == tablecodec.TempIndexKeyTypeBackfill || keyVer == tablecodec.TempIndexKeyTypeDelete {
key, tempKey = tempKey, nil
keyIsTempIdxKey = true
}
Expand Down Expand Up @@ -409,6 +409,10 @@ func (c *index) Delete(ctx table.MutateContext, txn kv.Transaction, indexedValue
}

tempValElem := tablecodec.TempIndexValueElem{Handle: h, KeyVer: tempKeyVer, Delete: true, Distinct: distinct}
if c.idxInfo.Global {
tempValElem.Global = true
tempValElem.Handle = kv.NewPartitionHandle(c.phyTblID, h)
}
if distinct {
if len(key) > 0 {
okToDelete := true
Expand Down Expand Up @@ -479,40 +483,29 @@ func (c *index) GenIndexKVIter(ec errctx.Context, loc *time.Location, indexedVal
return table.NewPlainIndexKVGenerator(c, ec, loc, h, handleRestoreData, indexedValue)
}

const (
// TempIndexKeyTypeNone means the key is not a temporary index key.
TempIndexKeyTypeNone byte = 0
// TempIndexKeyTypeDelete indicates this value is written in the delete-only stage.
TempIndexKeyTypeDelete byte = 'd'
// TempIndexKeyTypeBackfill indicates this value is written in the backfill stage.
TempIndexKeyTypeBackfill byte = 'b'
// TempIndexKeyTypeMerge indicates this value is written in the merge stage.
TempIndexKeyTypeMerge byte = 'm'
)

// GenTempIdxKeyByState is used to get the key version and the temporary key.
// The tempKeyVer means the temp index key/value version.
func GenTempIdxKeyByState(indexInfo *model.IndexInfo, indexKey kv.Key) (key, tempKey kv.Key, tempKeyVer byte) {
if indexInfo.State != model.StatePublic {
switch indexInfo.BackfillState {
case model.BackfillStateInapplicable:
return indexKey, nil, TempIndexKeyTypeNone
return indexKey, nil, tablecodec.TempIndexKeyTypeNone
case model.BackfillStateRunning:
// Write to the temporary index.
tablecodec.IndexKey2TempIndexKey(indexKey)
if indexInfo.State == model.StateDeleteOnly {
return nil, indexKey, TempIndexKeyTypeDelete
return nil, indexKey, tablecodec.TempIndexKeyTypeDelete
}
return nil, indexKey, TempIndexKeyTypeBackfill
return nil, indexKey, tablecodec.TempIndexKeyTypeBackfill
case model.BackfillStateReadyToMerge, model.BackfillStateMerging:
// Double write
tmp := make([]byte, len(indexKey))
copy(tmp, indexKey)
tablecodec.IndexKey2TempIndexKey(tmp)
return indexKey, tmp, TempIndexKeyTypeMerge
return indexKey, tmp, tablecodec.TempIndexKeyTypeMerge
}
}
return indexKey, nil, TempIndexKeyTypeNone
return indexKey, nil, tablecodec.TempIndexKeyTypeNone
}

func (c *index) Exist(ec errctx.Context, loc *time.Location, txn kv.Transaction, indexedValue []types.Datum, h kv.Handle) (bool, kv.Handle, error) {
Expand Down
40 changes: 37 additions & 3 deletions pkg/tablecodec/tablecodec.go
Original file line number Diff line number Diff line change
Expand Up @@ -1307,16 +1307,32 @@ func (v TempIndexValue) FilterOverwritten() TempIndexValue {
// A temp index value element is encoded as one of:
// - [flag 1 byte][value_length 2 bytes ] [value value_len bytes] [key_version 1 byte] {distinct normal}
// - [flag 1 byte][value value_len bytes] [key_version 1 byte] {non-distinct normal}
// - [flag 1 byte][handle_length 2 bytes] [handle handle_len bytes] [key_version 1 byte] {distinct deleted}
// - [flag 1 byte][handle_length 2 bytes] [handle handle_len bytes] [partitionIdFlag 1 byte] [partitionID 8 bytes] [key_version 1 byte] {distinct deleted}
// - [flag 1 byte] [key_version 1 byte] {non-distinct deleted}
type TempIndexValueElem struct {
Value []byte
Handle kv.Handle
KeyVer byte
Delete bool
Distinct bool

// Global means it's a global Index, for partitioned tables. Currently only used in `distinct` + `deleted` scenarios.
Global bool
}

const (
// TempIndexKeyTypeNone means the key is not a temporary index key.
TempIndexKeyTypeNone byte = 0
// TempIndexKeyTypeDelete indicates this value is written in the delete-only stage.
TempIndexKeyTypeDelete byte = 'd'
// TempIndexKeyTypeBackfill indicates this value is written in the backfill stage.
TempIndexKeyTypeBackfill byte = 'b'
// TempIndexKeyTypeMerge indicates this value is written in the merge stage.
TempIndexKeyTypeMerge byte = 'm'
// TempIndexKeyTypePartitionIDFlag indicates the following value is partition id.
TempIndexKeyTypePartitionIDFlag byte = 'p'
)

// Encode encodes the temp index value.
func (v *TempIndexValueElem) Encode(buf []byte) []byte {
if v.Delete {
Expand All @@ -1331,13 +1347,21 @@ func (v *TempIndexValueElem) Encode(buf []byte) []byte {
hEncoded = handle.Encoded()
hLen = uint16(len(hEncoded))
}
// flag + handle length + handle + temp key version
// flag + handle length + handle + [partition id] + temp key version
if buf == nil {
buf = make([]byte, 0, hLen+4)
l := hLen + 4
if v.Global {
l += 9
}
buf = make([]byte, 0, l)
}
buf = append(buf, byte(TempIndexValueFlagDeleted))
buf = append(buf, byte(hLen>>8), byte(hLen))
buf = append(buf, hEncoded...)
if v.Global {
buf = append(buf, TempIndexKeyTypePartitionIDFlag)
buf = append(buf, codec.EncodeInt(nil, v.Handle.(kv.PartitionHandle).PartitionID)...)
}
buf = append(buf, v.KeyVer)
return buf
}
Expand Down Expand Up @@ -1415,6 +1439,16 @@ func (v *TempIndexValueElem) DecodeOne(b []byte) (remain []byte, err error) {
v.Handle, _ = kv.NewCommonHandle(b[:hLen])
}
b = b[hLen:]
if b[0] == TempIndexKeyTypePartitionIDFlag {
v.Global = true
var pid int64
_, pid, err = codec.DecodeInt(b[1:9])
if err != nil {
return nil, err
}
v.Handle = kv.NewPartitionHandle(pid, v.Handle)
b = b[9:]
}
v.KeyVer = b[0]
b = b[1:]
v.Distinct = true
Expand Down

0 comments on commit 5da9d1a

Please sign in to comment.