forked from pingcap/tidb
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathkv.go
568 lines (512 loc) · 20 KB
/
kv.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
// Copyright 2015 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package kv
import (
"context"
"crypto/tls"
"sync"
"time"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/memory"
)
// Transaction options
const (
// BinlogInfo contains the binlog data and client.
BinlogInfo Option = iota + 1
// SchemaChecker is used for checking schema-validity.
SchemaChecker
// IsolationLevel sets isolation level for current transaction. The default level is SI.
IsolationLevel
// Priority marks the priority of this transaction.
Priority
// NotFillCache makes this request do not touch the LRU cache of the underlying storage.
NotFillCache
// SyncLog decides whether the WAL(write-ahead log) of this request should be synchronized.
SyncLog
// KeyOnly retrieve only keys, it can be used in scan now.
KeyOnly
// Pessimistic is defined for pessimistic lock
Pessimistic
// SnapshotTS is defined to set snapshot ts.
SnapshotTS
// Set replica read
ReplicaRead
// Set task ID
TaskID
// InfoSchema is schema version used by txn startTS.
InfoSchema
// CollectRuntimeStats is used to enable collect runtime stats.
CollectRuntimeStats
// SchemaAmender is used to amend mutations for pessimistic transactions
SchemaAmender
// SampleStep skips 'SampleStep - 1' number of keys after each returned key.
SampleStep
// CommitHook is a callback function called right after the transaction gets committed
CommitHook
// EnableAsyncCommit indicates whether async commit is enabled
EnableAsyncCommit
// Enable1PC indicates whether one-phase commit is enabled
Enable1PC
// GuaranteeLinearizability indicates whether to guarantee linearizability at the cost of an extra tso request before prewrite
GuaranteeLinearizability
// TxnScope indicates which @@txn_scope this transaction will work with.
TxnScope
// StalenessReadOnly indicates whether the transaction is staleness read only transaction
IsStalenessReadOnly
// MatchStoreLabels indicates the labels the store should be matched
MatchStoreLabels
)
// Priority value for transaction priority.
const (
PriorityNormal = iota
PriorityLow
PriorityHigh
)
// UnCommitIndexKVFlag uses to indicate the index key/value is no need to commit.
// This is used in the situation of the index key/value was unchanged when do update.
// Usage:
// 1. For non-unique index: normally, the index value is '0'.
// Change the value to '1' indicate the index key/value is no need to commit.
// 2. For unique index: normally, the index value is the record handle ID, 8 bytes.
// Append UnCommitIndexKVFlag to the value indicate the index key/value is no need to commit.
const UnCommitIndexKVFlag byte = '1'
// MaxTxnTimeUse is the max time a Txn may use (in ms) from its begin to commit.
// We use it to abort the transaction to guarantee GC worker will not influence it.
const MaxTxnTimeUse = 24 * 60 * 60 * 1000
// IsoLevel is the transaction's isolation level.
type IsoLevel int
const (
// SI stands for 'snapshot isolation'.
SI IsoLevel = iota
// RC stands for 'read committed'.
RC
)
// ReplicaReadType is the type of replica to read data from
type ReplicaReadType byte
const (
// ReplicaReadLeader stands for 'read from leader'.
ReplicaReadLeader ReplicaReadType = 1 << iota
// ReplicaReadFollower stands for 'read from follower'.
ReplicaReadFollower
// ReplicaReadMixed stands for 'read from leader and follower and learner'.
ReplicaReadMixed
)
// IsFollowerRead checks if leader is going to be used to read data.
func (r ReplicaReadType) IsFollowerRead() bool {
// In some cases the default value is 0, which should be treated as `ReplicaReadLeader`.
return r != ReplicaReadLeader && r != 0
}
// Those limits is enforced to make sure the transaction can be well handled by TiKV.
var (
// TxnEntrySizeLimit is limit of single entry size (len(key) + len(value)).
TxnEntrySizeLimit uint64 = config.DefTxnEntrySizeLimit
// TxnTotalSizeLimit is limit of the sum of all entry size.
TxnTotalSizeLimit uint64 = config.DefTxnTotalSizeLimit
)
// Getter is the interface for the Get method.
type Getter interface {
// Get gets the value for key k from kv store.
// If corresponding kv pair does not exist, it returns nil and ErrNotExist.
Get(ctx context.Context, k Key) ([]byte, error)
}
// Retriever is the interface wraps the basic Get and Seek methods.
type Retriever interface {
Getter
// Iter creates an Iterator positioned on the first entry that k <= entry's key.
// If such entry is not found, it returns an invalid Iterator with no error.
// It yields only keys that < upperBound. If upperBound is nil, it means the upperBound is unbounded.
// The Iterator must be Closed after use.
Iter(k Key, upperBound Key) (Iterator, error)
// IterReverse creates a reversed Iterator positioned on the first entry which key is less than k.
// The returned iterator will iterate from greater key to smaller key.
// If k is nil, the returned iterator will be positioned at the last key.
// TODO: Add lower bound limit
IterReverse(k Key) (Iterator, error)
}
// Mutator is the interface wraps the basic Set and Delete methods.
type Mutator interface {
// Set sets the value for key k as v into kv store.
// v must NOT be nil or empty, otherwise it returns ErrCannotSetNilValue.
Set(k Key, v []byte) error
// Delete removes the entry for key k from kv store.
Delete(k Key) error
}
// StagingHandle is the reference of a staging buffer.
type StagingHandle int
var (
// InvalidStagingHandle is an invalid handler, MemBuffer will check handler to ensure safety.
InvalidStagingHandle StagingHandle = 0
// LastActiveStagingHandle is an special handler which always point to the last active staging buffer.
LastActiveStagingHandle StagingHandle = -1
)
// RetrieverMutator is the interface that groups Retriever and Mutator interfaces.
type RetrieverMutator interface {
Retriever
Mutator
}
// MemBufferIterator is an Iterator with KeyFlags related functions.
type MemBufferIterator interface {
Iterator
HasValue() bool
Flags() KeyFlags
UpdateFlags(...FlagsOp)
Handle() MemKeyHandle
}
// MemBuffer is an in-memory kv collection, can be used to buffer write operations.
type MemBuffer interface {
RetrieverMutator
// RLock locks the MemBuffer for shared read.
// In the most case, MemBuffer will only used by single goroutine,
// but it will be read by multiple goroutine when combined with executor.UnionScanExec.
// To avoid race introduced by executor.UnionScanExec, MemBuffer expose read lock for it.
RLock()
// RUnlock unlocks the MemBuffer.
RUnlock()
// GetFlags returns the latest flags associated with key.
GetFlags(Key) (KeyFlags, error)
// IterWithFlags returns a MemBufferIterator.
IterWithFlags(k Key, upperBound Key) MemBufferIterator
// IterReverseWithFlags returns a reversed MemBufferIterator.
IterReverseWithFlags(k Key) MemBufferIterator
// SetWithFlags put key-value into the last active staging buffer with the given KeyFlags.
SetWithFlags(Key, []byte, ...FlagsOp) error
// UpdateFlags update the flags associated with key.
UpdateFlags(Key, ...FlagsOp)
// DeleteWithFlags delete key with the given KeyFlags
DeleteWithFlags(Key, ...FlagsOp) error
GetKeyByHandle(MemKeyHandle) []byte
GetValueByHandle(MemKeyHandle) ([]byte, bool)
// Reset reset the MemBuffer to initial states.
Reset()
// DiscardValues releases the memory used by all values.
// NOTE: any operation need value will panic after this function.
DiscardValues()
// Staging create a new staging buffer inside the MemBuffer.
// Subsequent writes will be temporarily stored in this new staging buffer.
// When you think all modifications looks good, you can call `Release` to public all of them to the upper level buffer.
Staging() StagingHandle
// Release publish all modifications in the latest staging buffer to upper level.
Release(StagingHandle)
// Cleanup cleanup the resources referenced by the StagingHandle.
// If the changes are not published by `Release`, they will be discarded.
Cleanup(StagingHandle)
// InspectStage used to inspect the value updates in the given stage.
InspectStage(StagingHandle, func(Key, KeyFlags, []byte))
// SelectValueHistory select the latest value which makes `predicate` returns true from the modification history.
SelectValueHistory(key Key, predicate func(value []byte) bool) ([]byte, error)
// SnapshotGetter returns a Getter for a snapshot of MemBuffer.
SnapshotGetter() Getter
// SnapshotIter returns a Iterator for a snapshot of MemBuffer.
SnapshotIter(k, upperbound Key) Iterator
// Size returns sum of keys and values length.
Size() int
// Len returns the number of entries in the DB.
Len() int
// Dirty returns whether the root staging buffer is updated.
Dirty() bool
}
// Transaction defines the interface for operations inside a Transaction.
// This is not thread safe.
type Transaction interface {
RetrieverMutator
// Size returns sum of keys and values length.
Size() int
// Len returns the number of entries in the DB.
Len() int
// Reset reset the Transaction to initial states.
Reset()
// Commit commits the transaction operations to KV store.
Commit(context.Context) error
// Rollback undoes the transaction operations to KV store.
Rollback() error
// String implements fmt.Stringer interface.
String() string
// LockKeys tries to lock the entries with the keys in KV store.
LockKeys(ctx context.Context, lockCtx *LockCtx, keys ...Key) error
// SetOption sets an option with a value, when val is nil, uses the default
// value of this option.
SetOption(opt Option, val interface{})
// GetOption returns the option
GetOption(opt Option) interface{}
// DelOption deletes an option.
DelOption(opt Option)
// IsReadOnly checks if the transaction has only performed read operations.
IsReadOnly() bool
// StartTS returns the transaction start timestamp.
StartTS() uint64
// Valid returns if the transaction is valid.
// A transaction become invalid after commit or rollback.
Valid() bool
// GetMemBuffer return the MemBuffer binding to this transaction.
GetMemBuffer() MemBuffer
// GetSnapshot returns the Snapshot binding to this transaction.
GetSnapshot() Snapshot
// GetUnionStore returns the UnionStore binding to this transaction.
GetUnionStore() UnionStore
// SetVars sets variables to the transaction.
SetVars(vars *Variables)
// GetVars gets variables from the transaction.
GetVars() *Variables
// BatchGet gets kv from the memory buffer of statement and transaction, and the kv storage.
// Do not use len(value) == 0 or value == nil to represent non-exist.
// If a key doesn't exist, there shouldn't be any corresponding entry in the result map.
BatchGet(ctx context.Context, keys []Key) (map[string][]byte, error)
IsPessimistic() bool
// CacheIndexName caches the index name.
// PresumeKeyNotExists will use this to help decode error message.
CacheTableInfo(id int64, info *model.TableInfo)
// GetIndexName returns the cached index name.
// If there is no such index already inserted through CacheIndexName, it will return UNKNOWN.
GetTableInfo(id int64) *model.TableInfo
}
// LockCtx contains information for LockKeys method.
type LockCtx struct {
Killed *uint32
ForUpdateTS uint64
LockWaitTime int64
WaitStartTime time.Time
PessimisticLockWaited *int32
LockKeysDuration *int64
LockKeysCount *int32
ReturnValues bool
Values map[string]ReturnedValue
ValuesLock sync.Mutex
LockExpired *uint32
Stats *execdetails.LockKeysDetails
}
// ReturnedValue pairs the Value and AlreadyLocked flag for PessimisticLock return values result.
type ReturnedValue struct {
Value []byte
AlreadyLocked bool
}
// Client is used to send request to KV layer.
type Client interface {
// Send sends request to KV layer, returns a Response.
Send(ctx context.Context, req *Request, vars *Variables, sessionMemTracker *memory.Tracker, enabledRateLimitAction bool) Response
// IsRequestTypeSupported checks if reqType and subType is supported.
IsRequestTypeSupported(reqType, subType int64) bool
}
// ReqTypes.
const (
ReqTypeSelect = 101
ReqTypeIndex = 102
ReqTypeDAG = 103
ReqTypeAnalyze = 104
ReqTypeChecksum = 105
ReqSubTypeBasic = 0
ReqSubTypeDesc = 10000
ReqSubTypeGroupBy = 10001
ReqSubTypeTopN = 10002
ReqSubTypeSignature = 10003
ReqSubTypeAnalyzeIdx = 10004
ReqSubTypeAnalyzeCol = 10005
)
// StoreType represents the type of a store.
type StoreType uint8
const (
// TiKV means the type of a store is TiKV.
TiKV StoreType = iota
// TiFlash means the type of a store is TiFlash.
TiFlash
// TiDB means the type of a store is TiDB.
TiDB
// UnSpecified means the store type is unknown
UnSpecified = 255
)
// Name returns the name of store type.
func (t StoreType) Name() string {
if t == TiFlash {
return "tiflash"
} else if t == TiDB {
return "tidb"
} else if t == TiKV {
return "tikv"
}
return "unspecified"
}
// Request represents a kv request.
type Request struct {
// Tp is the request type.
Tp int64
StartTs uint64
Data []byte
KeyRanges []KeyRange
// Concurrency is 1, if it only sends the request to a single storage unit when
// ResponseIterator.Next is called. If concurrency is greater than 1, the request will be
// sent to multiple storage units concurrently.
Concurrency int
// IsolationLevel is the isolation level, default is SI.
IsolationLevel IsoLevel
// Priority is the priority of this KV request, its value may be PriorityNormal/PriorityLow/PriorityHigh.
Priority int
// memTracker is used to trace and control memory usage in co-processor layer.
MemTracker *memory.Tracker
// KeepOrder is true, if the response should be returned in order.
KeepOrder bool
// Desc is true, if the request is sent in descending order.
Desc bool
// NotFillCache makes this request do not touch the LRU cache of the underlying storage.
NotFillCache bool
// SyncLog decides whether the WAL(write-ahead log) of this request should be synchronized.
SyncLog bool
// Streaming indicates using streaming API for this request, result in that one Next()
// call would not corresponds to a whole region result.
Streaming bool
// ReplicaRead is used for reading data from replicas, only follower is supported at this time.
ReplicaRead ReplicaReadType
// StoreType represents this request is sent to the which type of store.
StoreType StoreType
// Cacheable is true if the request can be cached. Currently only deterministic DAG requests can be cached.
Cacheable bool
// SchemaVer is for any schema-ful storage to validate schema correctness if necessary.
SchemaVar int64
// BatchCop indicates whether send batch coprocessor request to tiflash.
BatchCop bool
// TaskID is an unique ID for an execution of a statement
TaskID uint64
// TiDBServerID is the specified TiDB serverID to execute request. `0` means all TiDB instances.
TiDBServerID uint64
// IsStaleness indicates whether the request read staleness data
IsStaleness bool
// MatchStoreLabels indicates the labels the store should be matched
MatchStoreLabels []*metapb.StoreLabel
}
// ResultSubset represents a result subset from a single storage unit.
// TODO: Find a better interface for ResultSubset that can reuse bytes.
type ResultSubset interface {
// GetData gets the data.
GetData() []byte
// GetStartKey gets the start key.
GetStartKey() Key
// MemSize returns how many bytes of memory this result use for tracing memory usage.
MemSize() int64
// RespTime returns the response time for the request.
RespTime() time.Duration
}
// Response represents the response returned from KV layer.
type Response interface {
// Next returns a resultSubset from a single storage unit.
// When full result set is returned, nil is returned.
Next(ctx context.Context) (resultSubset ResultSubset, err error)
// Close response.
Close() error
}
// Snapshot defines the interface for the snapshot fetched from KV store.
type Snapshot interface {
Retriever
// BatchGet gets a batch of values from snapshot.
BatchGet(ctx context.Context, keys []Key) (map[string][]byte, error)
// SetOption sets an option with a value, when val is nil, uses the default
// value of this option. Only ReplicaRead is supported for snapshot
SetOption(opt Option, val interface{})
// DelOption deletes an option.
DelOption(opt Option)
}
// BatchGetter is the interface for BatchGet.
type BatchGetter interface {
// BatchGet gets a batch of values.
BatchGet(ctx context.Context, keys []Key) (map[string][]byte, error)
}
// Driver is the interface that must be implemented by a KV storage.
type Driver interface {
// Open returns a new Storage.
// The path is the string for storage specific format.
Open(path string) (Storage, error)
}
// TransactionOption indicates the option when beginning a transaction
type TransactionOption struct {
TxnScope string
StartTS *uint64
PrevSec *uint64
}
// SetStartTs set startTS
func (to TransactionOption) SetStartTs(startTS uint64) TransactionOption {
to.StartTS = &startTS
return to
}
// SetPrevSec set prevSec
func (to TransactionOption) SetPrevSec(prevSec uint64) TransactionOption {
to.PrevSec = &prevSec
return to
}
// SetTxnScope set txnScope
func (to TransactionOption) SetTxnScope(txnScope string) TransactionOption {
to.TxnScope = txnScope
return to
}
// Storage defines the interface for storage.
// Isolation should be at least SI(SNAPSHOT ISOLATION)
type Storage interface {
// Begin a global transaction
Begin() (Transaction, error)
// Begin a transaction with given option
BeginWithOption(option TransactionOption) (Transaction, error)
// GetSnapshot gets a snapshot that is able to read any data which data is <= ver.
// if ver is MaxVersion or > current max committed version, we will use current version for this snapshot.
GetSnapshot(ver Version) Snapshot
// GetClient gets a client instance.
GetClient() Client
// GetMPPClient gets a mpp client instance.
GetMPPClient() MPPClient
// Close store
Close() error
// UUID return a unique ID which represents a Storage.
UUID() string
// CurrentVersion returns current max committed version with the given txnScope (local or global).
CurrentVersion(txnScope string) (Version, error)
// GetOracle gets a timestamp oracle client.
GetOracle() oracle.Oracle
// SupportDeleteRange gets the storage support delete range or not.
SupportDeleteRange() (supported bool)
// Name gets the name of the storage engine
Name() string
// Describe returns of brief introduction of the storage
Describe() string
// ShowStatus returns the specified status of the storage
ShowStatus(ctx context.Context, key string) (interface{}, error)
// GetMemCache return memory manager of the storage.
GetMemCache() MemManager
}
// EtcdBackend is used for judging a storage is a real TiKV.
type EtcdBackend interface {
EtcdAddrs() ([]string, error)
TLSConfig() *tls.Config
StartGCWorker() error
}
// FnKeyCmp is the function for iterator the keys
type FnKeyCmp func(key Key) bool
// Iterator is the interface for a iterator on KV store.
type Iterator interface {
Valid() bool
Key() Key
Value() []byte
Next() error
Close()
}
// SplittableStore is the kv store which supports split regions.
type SplittableStore interface {
SplitRegions(ctx context.Context, splitKey [][]byte, scatter bool, tableID *int64) (regionID []uint64, err error)
WaitScatterRegionFinish(ctx context.Context, regionID uint64, backOff int) error
CheckRegionInScattering(regionID uint64) (bool, error)
}
// Used for pessimistic lock wait time
// these two constants are special for lock protocol with tikv
// 0 means always wait, -1 means nowait, others meaning lock wait in milliseconds
var (
LockAlwaysWait = int64(0)
LockNoWait = int64(-1)
)