Skip to content

Commit

Permalink
lightning: allow configure the desired size and number of rows of eac…
Browse files Browse the repository at this point in the history
…h INSERT statement for logical mode (pingcap#46997)

close pingcap#46607
  • Loading branch information
kennytm authored Feb 18, 2024
1 parent f47c310 commit 2d57455
Show file tree
Hide file tree
Showing 13 changed files with 125 additions and 126 deletions.
5 changes: 0 additions & 5 deletions br/pkg/lightning/backend/encode/encode.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,6 @@ type SessionOptions struct {

// Rows represents a collection of encoded rows.
type Rows interface {
// SplitIntoChunks splits the rows into multiple consecutive parts, each
// part having total byte size less than `splitSize`. The meaning of "byte
// size" should be consistent with the value used in `Row.ClassifyAndAppend`.
SplitIntoChunks(splitSize int) []Rows

// Clear returns a new collection with empty content. It may share the
// capacity with the current instance. The typical usage is `x = x.Clear()`.
Clear() Rows
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/backend/kv/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ go_test(
embed = [":kv"],
flaky = True,
race = "on",
shard_count = 19,
shard_count = 18,
deps = [
"//br/pkg/lightning/backend/encode",
"//br/pkg/lightning/common",
Expand Down
31 changes: 0 additions & 31 deletions br/pkg/lightning/backend/kv/sql2kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,37 +326,6 @@ func (kvs *Pairs) ClassifyAndAppend(
*indices = indexKVs
}

// SplitIntoChunks splits the key-value pairs into chunks.
func (kvs *Pairs) SplitIntoChunks(splitSize int) []encode.Rows {
if len(kvs.Pairs) == 0 {
return nil
}

res := make([]encode.Rows, 0, 1)
i := 0
cumSize := 0
for j, pair := range kvs.Pairs {
size := len(pair.Key) + len(pair.Val)
if i < j && cumSize+size > splitSize {
res = append(res, &Pairs{Pairs: kvs.Pairs[i:j]})
i = j
cumSize = 0
}
cumSize += size
}

if i == 0 {
res = append(res, kvs)
} else {
res = append(res, &Pairs{
Pairs: kvs.Pairs[i:],
BytesBuf: kvs.BytesBuf,
MemBuf: kvs.MemBuf,
})
}
return res
}

// Clear clears the key-value pairs.
func (kvs *Pairs) Clear() encode.Rows {
if kvs.BytesBuf != nil {
Expand Down
47 changes: 0 additions & 47 deletions br/pkg/lightning/backend/kv/sql2kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -569,53 +569,6 @@ func TestShardRowId(t *testing.T) {
require.Equal(t, tbl.Allocators(lkv.GetSession4test(encoder).GetSessionVars()).Get(autoid.RowIDAllocType).Base(), int64(32))
}

func TestSplitIntoChunks(t *testing.T) {
pairs := []common.KvPair{
{
Key: []byte{1, 2, 3},
Val: []byte{4, 5, 6},
},
{
Key: []byte{7, 8},
Val: []byte{9, 0},
},
{
Key: []byte{1, 2, 3, 4},
Val: []byte{5, 6, 7, 8},
},
{
Key: []byte{9, 0},
Val: []byte{1, 2},
},
}

splitBy10 := lkv.MakeRowsFromKvPairs(pairs).SplitIntoChunks(10)
require.Equal(t, splitBy10, []encode.Rows{
lkv.MakeRowsFromKvPairs(pairs[0:2]),
lkv.MakeRowsFromKvPairs(pairs[2:3]),
lkv.MakeRowsFromKvPairs(pairs[3:4]),
})

splitBy12 := lkv.MakeRowsFromKvPairs(pairs).SplitIntoChunks(12)
require.Equal(t, splitBy12, []encode.Rows{
lkv.MakeRowsFromKvPairs(pairs[0:2]),
lkv.MakeRowsFromKvPairs(pairs[2:4]),
})

splitBy1000 := lkv.MakeRowsFromKvPairs(pairs).SplitIntoChunks(1000)
require.Equal(t, splitBy1000, []encode.Rows{
lkv.MakeRowsFromKvPairs(pairs[0:4]),
})

splitBy1 := lkv.MakeRowsFromKvPairs(pairs).SplitIntoChunks(1)
require.Equal(t, splitBy1, []encode.Rows{
lkv.MakeRowsFromKvPairs(pairs[0:1]),
lkv.MakeRowsFromKvPairs(pairs[1:2]),
lkv.MakeRowsFromKvPairs(pairs[2:3]),
lkv.MakeRowsFromKvPairs(pairs[3:4]),
})
}

func TestClassifyAndAppend(t *testing.T) {
kvs := lkv.MakeRowFromKvPairs([]common.KvPair{
{
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/backend/tidb/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ go_test(
timeout = "short",
srcs = ["tidb_test.go"],
flaky = True,
shard_count = 14,
shard_count = 15,
deps = [
":tidb",
"//br/pkg/lightning/backend",
Expand Down
40 changes: 19 additions & 21 deletions br/pkg/lightning/backend/tidb/tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,11 @@ type tidbBackend struct {
// view should be the same.
onDuplicate string
errorMgr *errormanager.ErrorManager
// maxChunkSize and maxChunkRows are the target size and number of rows of each INSERT SQL
// statement to be sent to downstream. Sometimes we want to reduce the txn size to avoid
// affecting the cluster too much.
maxChunkSize uint64
maxChunkRows int
}

var _ backend.Backend = (*tidbBackend)(nil)
Expand All @@ -283,9 +288,10 @@ var _ backend.Backend = (*tidbBackend)(nil)
func NewTiDBBackend(
ctx context.Context,
db *sql.DB,
conflict config.Conflict,
cfg *config.Config,
errorMgr *errormanager.ErrorManager,
) backend.Backend {
conflict := cfg.Conflict
var onDuplicate string
switch conflict.Strategy {
case config.ErrorOnDup:
Expand All @@ -305,10 +311,12 @@ func NewTiDBBackend(
onDuplicate = config.ErrorOnDup
}
return &tidbBackend{
db: db,
conflictCfg: conflict,
onDuplicate: onDuplicate,
errorMgr: errorMgr,
db: db,
conflictCfg: conflict,
onDuplicate: onDuplicate,
errorMgr: errorMgr,
maxChunkSize: uint64(cfg.TikvImporter.LogicalImportBatchSize),
maxChunkRows: cfg.TikvImporter.LogicalImportBatchRows,
}
}

Expand All @@ -329,18 +337,17 @@ func (row tidbRow) ClassifyAndAppend(data *encode.Rows, checksum *verification.K
checksum.Add(&cs)
}

func (rows tidbRows) SplitIntoChunks(splitSizeInt int) []encode.Rows {
func (rows tidbRows) splitIntoChunks(splitSize uint64, splitRows int) []tidbRows {
if len(rows) == 0 {
return nil
}

res := make([]encode.Rows, 0, 1)
res := make([]tidbRows, 0, 1)
i := 0
cumSize := uint64(0)
splitSize := uint64(splitSizeInt)

for j, row := range rows {
if i < j && cumSize+row.Size() > splitSize {
if i < j && (cumSize+row.Size() > splitSize || j-i >= splitRows) {
res = append(res, rows[i:j])
i = j
cumSize = 0
Expand Down Expand Up @@ -581,13 +588,6 @@ func (*tidbBackend) RetryImportDelay() time.Duration {
return 0
}

func (*tidbBackend) MaxChunkSize() int {
failpoint.Inject("FailIfImportedSomeRows", func() {
failpoint.Return(1)
})
return 1048576
}

func (*tidbBackend) ShouldPostProcess() bool {
return true
}
Expand All @@ -611,7 +611,7 @@ func (*tidbBackend) ImportEngine(context.Context, uuid.UUID, int64, int64) error
func (be *tidbBackend) WriteRows(ctx context.Context, tableName string, columnNames []string, rows encode.Rows) error {
var err error
rowLoop:
for _, r := range rows.SplitIntoChunks(be.MaxChunkSize()) {
for _, r := range rows.(tidbRows).splitIntoChunks(be.maxChunkSize, be.maxChunkRows) {
for i := 0; i < writeRowsMaxRetryTimes; i++ {
// Write in the batch mode first.
err = be.WriteBatchRowsToDB(ctx, tableName, columnNames, r)
Expand Down Expand Up @@ -648,8 +648,7 @@ type stmtTask struct {
// WriteBatchRowsToDB write rows in batch mode, which will insert multiple rows like this:
//
// insert into t1 values (111), (222), (333), (444);
func (be *tidbBackend) WriteBatchRowsToDB(ctx context.Context, tableName string, columnNames []string, r encode.Rows) error {
rows := r.(tidbRows)
func (be *tidbBackend) WriteBatchRowsToDB(ctx context.Context, tableName string, columnNames []string, rows tidbRows) error {
insertStmt := be.checkAndBuildStmt(rows, tableName, columnNames)
if insertStmt == nil {
return nil
Expand Down Expand Up @@ -682,8 +681,7 @@ func (be *tidbBackend) checkAndBuildStmt(rows tidbRows, tableName string, column
// insert into t1 values (444);
//
// See more details in br#1366: https://github.com/pingcap/br/issues/1366
func (be *tidbBackend) WriteRowsToDB(ctx context.Context, tableName string, columnNames []string, r encode.Rows) error {
rows := r.(tidbRows)
func (be *tidbBackend) WriteRowsToDB(ctx context.Context, tableName string, columnNames []string, rows tidbRows) error {
insertStmt := be.checkAndBuildStmt(rows, tableName, columnNames)
if insertStmt == nil {
return nil
Expand Down
90 changes: 71 additions & 19 deletions br/pkg/lightning/backend/tidb/tidb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func createMysqlSuite(t *testing.T) *mysqlSuite {
cfg.Conflict.Strategy = config.ReplaceOnDup
cfg.Conflict.Threshold = math.MaxInt64
cfg.Conflict.MaxRecordRows = 100
backendObj := tidb.NewTiDBBackend(context.Background(), db, cfg.Conflict, errormanager.New(nil, cfg, log.L()))
backendObj := tidb.NewTiDBBackend(context.Background(), db, cfg, errormanager.New(nil, cfg, log.L()))
return &mysqlSuite{
dbHandle: db,
mockDB: mock,
Expand Down Expand Up @@ -166,7 +166,7 @@ func TestWriteRowsIgnoreOnDup(t *testing.T) {
cfg.Conflict.Strategy = config.IgnoreOnDup
cfg.Conflict.Threshold = math.MaxInt64
cfg.Conflict.MaxRecordRows = 0
ignoreBackend := tidb.NewTiDBBackend(ctx, s.dbHandle, cfg.Conflict, errormanager.New(nil, cfg, logger))
ignoreBackend := tidb.NewTiDBBackend(ctx, s.dbHandle, cfg, errormanager.New(nil, cfg, logger))
engine, err := backend.MakeEngineManager(ignoreBackend).OpenEngine(ctx, &backend.EngineConfig{}, "`foo`.`bar`", 1)
require.NoError(t, err)

Expand All @@ -193,7 +193,7 @@ func TestWriteRowsIgnoreOnDup(t *testing.T) {
// test conflict.strategy == ignore and not 0 conflict.max-record-rows will use ErrorOnDup

cfg.Conflict.MaxRecordRows = 10
ignoreBackend = tidb.NewTiDBBackend(ctx, s.dbHandle, cfg.Conflict, errormanager.New(nil, cfg, logger))
ignoreBackend = tidb.NewTiDBBackend(ctx, s.dbHandle, cfg, errormanager.New(nil, cfg, logger))
engine, err = backend.MakeEngineManager(ignoreBackend).OpenEngine(ctx, &backend.EngineConfig{}, "`foo`.`bar`", 1)
require.NoError(t, err)

Expand Down Expand Up @@ -246,7 +246,7 @@ func TestWriteRowsErrorOnDup(t *testing.T) {
cfg.Conflict.Strategy = config.ErrorOnDup
cfg.Conflict.Threshold = math.MaxInt64
cfg.Conflict.MaxRecordRows = 0
ignoreBackend := tidb.NewTiDBBackend(ctx, s.dbHandle, cfg.Conflict, errormanager.New(nil, cfg, logger))
ignoreBackend := tidb.NewTiDBBackend(ctx, s.dbHandle, cfg, errormanager.New(nil, cfg, logger))
engine, err := backend.MakeEngineManager(ignoreBackend).OpenEngine(ctx, &backend.EngineConfig{}, "`foo`.`bar`", 1)
require.NoError(t, err)

Expand Down Expand Up @@ -536,9 +536,7 @@ func TestWriteRowsErrorNoRetry(t *testing.T) {
cfg.Conflict.Strategy = config.ErrorOnDup
cfg.Conflict.Threshold = 0
cfg.Conflict.MaxRecordRows = 0
ignoreBackend := tidb.NewTiDBBackend(context.Background(), s.dbHandle, cfg.Conflict,
errormanager.New(s.dbHandle, cfg, log.L()),
)
ignoreBackend := tidb.NewTiDBBackend(context.Background(), s.dbHandle, cfg, errormanager.New(s.dbHandle, cfg, log.L()))
encBuilder := tidb.NewEncodingBuilder()
dataRows := encodeRowsTiDB(t, encBuilder, s.tbl)
ctx := context.Background()
Expand Down Expand Up @@ -602,9 +600,7 @@ func TestWriteRowsErrorDowngradingAll(t *testing.T) {
cfg.Conflict.MaxRecordRows = 10
cfg.App.TaskInfoSchemaName = "tidb_lightning_errors"
cfg.App.MaxError.Type = *atomic.NewInt64(10)
ignoreBackend := tidb.NewTiDBBackend(context.Background(), s.dbHandle, cfg.Conflict,
errormanager.New(s.dbHandle, cfg, log.L()),
)
ignoreBackend := tidb.NewTiDBBackend(context.Background(), s.dbHandle, cfg, errormanager.New(s.dbHandle, cfg, log.L()))
encBuilder := tidb.NewEncodingBuilder()
dataRows := encodeRowsTiDB(t, encBuilder, s.tbl)
ctx := context.Background()
Expand Down Expand Up @@ -657,9 +653,7 @@ func TestWriteRowsErrorDowngradingExceedThreshold(t *testing.T) {
cfg.Conflict.MaxRecordRows = 10
cfg.App.TaskInfoSchemaName = "tidb_lightning_errors"
cfg.App.MaxError.Type = *atomic.NewInt64(3)
ignoreBackend := tidb.NewTiDBBackend(context.Background(), s.dbHandle, cfg.Conflict,
errormanager.New(s.dbHandle, cfg, log.L()),
)
ignoreBackend := tidb.NewTiDBBackend(context.Background(), s.dbHandle, cfg, errormanager.New(s.dbHandle, cfg, log.L()))
encBuilder := tidb.NewEncodingBuilder()
dataRows := encodeRowsTiDB(t, encBuilder, s.tbl)
ctx := context.Background()
Expand Down Expand Up @@ -699,9 +693,7 @@ func TestWriteRowsRecordOneError(t *testing.T) {
cfg.Conflict.Threshold = 0
cfg.Conflict.MaxRecordRows = 0
cfg.App.TaskInfoSchemaName = "tidb_lightning_errors"
ignoreBackend := tidb.NewTiDBBackend(context.Background(), s.dbHandle, cfg.Conflict,
errormanager.New(s.dbHandle, cfg, log.L()),
)
ignoreBackend := tidb.NewTiDBBackend(context.Background(), s.dbHandle, cfg, errormanager.New(s.dbHandle, cfg, log.L()))
encBuilder := tidb.NewEncodingBuilder()
dataRows := encodeRowsTiDB(t, encBuilder, s.tbl)
ctx := context.Background()
Expand All @@ -728,9 +720,7 @@ func TestDuplicateThreshold(t *testing.T) {
cfg.Conflict.Strategy = config.IgnoreOnDup
cfg.Conflict.Threshold = 5
cfg.Conflict.MaxRecordRows = 0
ignoreBackend := tidb.NewTiDBBackend(context.Background(), s.dbHandle, cfg.Conflict,
errormanager.New(s.dbHandle, cfg, log.L()),
)
ignoreBackend := tidb.NewTiDBBackend(context.Background(), s.dbHandle, cfg, errormanager.New(s.dbHandle, cfg, log.L()))
encBuilder := tidb.NewEncodingBuilder()
dataRows := encodeRowsTiDB(t, encBuilder, s.tbl)
ctx := context.Background()
Expand Down Expand Up @@ -851,3 +841,65 @@ func TestEncodeRowForRecord(t *testing.T) {
}, []int{0, -1, -1, -1, -1, -1, -1, -1, 1, 2, 3, -1, -1, -1})
require.Equal(t, row, "(5, \"test test\", \x00\x00\x00\xab\xcd\xef)")
}

// TestLogicalImportBatch tests that each INSERT statement is limited by both
// logical-import-batch-size and logical-import-batch-rows configurations. Here
// we ensure each INSERT statement has up to 5 rows *and* ~30 bytes of values.
func TestLogicalImportBatch(t *testing.T) {
s := createMysqlSuite(t)
defer s.TearDownTest(t)

s.mockDB.
ExpectExec("\\QINSERT INTO `foo`.`bar`(`a`) VALUES(1),(2),(4),(8),(16)\\E").
WillReturnResult(sqlmock.NewResult(5, 5))
s.mockDB.
ExpectExec("\\QINSERT INTO `foo`.`bar`(`a`) VALUES(32),(64),(128),(256),(512)\\E").
WillReturnResult(sqlmock.NewResult(5, 5))
s.mockDB.
ExpectExec("\\QINSERT INTO `foo`.`bar`(`a`) VALUES(1024),(2048),(4096),(8192)\\E").
WillReturnResult(sqlmock.NewResult(4, 4))
s.mockDB.
ExpectExec("\\QINSERT INTO `foo`.`bar`(`a`) VALUES(16384),(32768),(65536),(131072)\\E").
WillReturnResult(sqlmock.NewResult(4, 4))
s.mockDB.
ExpectExec("\\QINSERT INTO `foo`.`bar`(`a`) VALUES(262144)\\E").
WillReturnResult(sqlmock.NewResult(1, 1))

ctx := context.Background()
logger := log.L()

cfg := config.NewConfig()
cfg.Conflict.Strategy = config.ErrorOnDup
cfg.TikvImporter.LogicalImportBatchSize = 30
cfg.TikvImporter.LogicalImportBatchRows = 5
ignoreBackend := tidb.NewTiDBBackend(ctx, s.dbHandle, cfg, errormanager.New(nil, cfg, logger))
encBuilder := tidb.NewEncodingBuilder()
encoder, err := encBuilder.NewEncoder(context.Background(), &encode.EncodingConfig{
Path: "1.csv",
Table: s.tbl,
Logger: log.L(),
})
require.NoError(t, err)

dataRows := encBuilder.MakeEmptyRows()
dataChecksum := verification.MakeKVChecksum(0, 0, 0)
indexRows := encBuilder.MakeEmptyRows()
indexChecksum := verification.MakeKVChecksum(0, 0, 0)
for i := int64(0); i < 19; i++ { // encode rows 1, 2, 4, 8, ..., 262144.
row, err := encoder.Encode(
[]types.Datum{types.NewIntDatum(1 << i)},
i,
[]int{0, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1},
8*i,
)
require.NoError(t, err)
row.ClassifyAndAppend(&dataRows, &dataChecksum, &indexRows, &indexChecksum)
}

engine, err := backend.MakeEngineManager(ignoreBackend).OpenEngine(ctx, &backend.EngineConfig{}, "`foo`.`bar`", 1)
require.NoError(t, err)
writer, err := engine.LocalWriter(ctx, &backend.LocalWriterConfig{TableName: "`foo`.`bar`"})
require.NoError(t, err)
err = writer.AppendRows(ctx, []string{"a"}, dataRows)
require.NoError(t, err)
}
Loading

0 comments on commit 2d57455

Please sign in to comment.