Skip to content

Commit

Permalink
lightning: merge conflict record tables for preprocess duplicate dete…
Browse files Browse the repository at this point in the history
…ction and post-import conflict detection (pingcap#52307)

ref pingcap#52306
  • Loading branch information
lyzx2001 authored Apr 15, 2024
1 parent eff809f commit 113b633
Show file tree
Hide file tree
Showing 11 changed files with 114 additions and 98 deletions.
9 changes: 2 additions & 7 deletions br/OWNERS
Original file line number Diff line number Diff line change
@@ -1,10 +1,5 @@
# See the OWNERS docs at https://go.k8s.io/owners
options:
no_parent_owners: true
filters:
"(tidb-lightning\\.toml)$":
approvers:
- sig-critical-approvers-tidb-lightning
".*":
approvers:
- sig-approvers-br
approvers:
- sig-approvers-br
2 changes: 1 addition & 1 deletion lightning/tests/lightning_config_max_error/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ run_sql 'DROP DATABASE IF EXISTS lightning_task_info'
run_sql 'DROP DATABASE IF EXISTS mytest'
run_lightning --backend tidb --config "${mydir}/tidb-limit-record.toml" 2>&1 | grep "\`lightning_task_info\`.\`conflict_records\`" | grep -q "5"
run_sql 'SELECT COUNT(*) FROM lightning_task_info.conflict_records'
check_contains "COUNT(*): 1"
check_contains "COUNT(*): 5"

# Check conflict.threshold
run_sql 'DROP DATABASE IF EXISTS lightning_task_info'
Expand Down

This file was deleted.

14 changes: 1 addition & 13 deletions lightning/tests/lightning_duplicate_detection_new/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -77,19 +77,7 @@ if [ "$files_left" -ne "0" ];then
fi
rm -rf "$TEST_DIR/$TEST_NAME.sorted"

# 4. Test limit error records.
cleanup
run_lightning --backend local --config "$CUR/local-limit-error-records.toml" --log-file "$LOG_FILE"
run_sql "SELECT count(*) FROM test.dup_detect"
check_contains "count(*): 174"
run_sql "SELECT count(*) FROM lightning_task_info.conflict_records"
check_contains "count(*): 50"
run_sql "SELECT count(*) FROM lightning_task_info.conflict_records WHERE error LIKE '%PRIMARY%'"
check_contains "count(*): 49"
run_sql "SELECT count(*) FROM lightning_task_info.conflict_records WHERE error LIKE '%uniq_col6_col7%'"
check_contains "count(*): 1"

# 5. Test fail after duplicate detection.
# 4. Test fail after duplicate detection.
cleanup

export GO_FAILPOINTS="github.com/pingcap/tidb/lightning/pkg/importer/FailAfterDuplicateDetection=return()"
Expand Down
9 changes: 9 additions & 0 deletions lightning/tests/lightning_duplicate_resolution_merge/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,12 @@ check_contains 'count(*): 16'

run_sql 'select count(*) from lightning_task_info.conflict_error_v2'
check_contains 'count(*): 4'

run_sql 'select count(*) from lightning_task_info.conflict_view'
check_contains 'count(*): 20'

run_sql 'select count(*) from lightning_task_info.conflict_view where is_precheck_conflict = 1'
check_contains 'count(*): 16'

run_sql 'select count(*) from lightning_task_info.conflict_view where is_precheck_conflict = 0'
check_contains 'count(*): 4'
10 changes: 6 additions & 4 deletions br/tidb-lightning.toml → lightning/tidb-lightning.toml
Original file line number Diff line number Diff line change
Expand Up @@ -97,17 +97,19 @@ driver = "file"
# - "replace": When encountering conflicting primary or unique key records, TiDB Lightning retains the latest data and overwrites the old data.
# The conflicting data are recorded in the `lightning_task_info.conflict_error_v2` table (recording conflicting data detected by post-import conflict detection in the physical import mode)
# and the `conflict_records` table (recording conflicting data detected by preprocess conflict detection in both logical and physical import modes) of the target TiDB cluster.
# If you turn on both preprocess and post-import conflict detection in physical import mode, the conflicting data can be checked in `lightning_task_info.conflict_view` view.
# You can manually insert the correct records into the target table based on your application requirements. Note that the target TiKV must be v5.2.0 or later versions.
# - "ignore": When encountering conflicting primary or unique key records, TiDB Lightning retains the old data and ignores the new data. This option can only be used in the logical import mode.
strategy = ""
# Controls whether to enable preprocess conflict detection, which check conflicts in the data before importing it to TiDB. In scenarios where the ratio of conflict records is greater than or equal to 1%, it is recommended to enable preprocess conflict detection for better performance in conflict detection.
# In other scenarios, it is recommended to disable it. The default value is false, indicating that TiDB Lightning only checks conflicts after the import. If you set it to true, TiDB Lightning checks conflicts both before and after the import. This parameter is experimental, and it can be used only in the physical import mode.
# precheck-conflict-before-import = false
# Controls the maximum number of conflict errors that can be handled when strategy is "replace" or "ignore". You can set it only when strategy is "replace" or "ignore". The default value is 9223372036854775807, which means that almost all errors are tolerant.
# threshold = 9223372036854775807
# Controls the maximum number of records in the `conflict_records` table. The default value is 100. In the physical import mode, if the strategy is "replace", the conflict records that are overwritten are recorded.
# Controls the maximum number of conflict errors that can be handled when strategy is "replace" or "ignore". You can set it only when strategy is "replace" or "ignore". The default value is 10000.
# threshold = 10000
# Controls the maximum number of records in the `conflict_records` table. The default value is 10000. In the physical import mode, if the strategy is "replace", the conflict records that are overwritten are recorded.
# In the logical import mode, if the strategy is "ignore", the conflict records that are ignored are recorded; if the strategy is "replace", the conflict records can not be recorded.
# max-record-rows = 100
# Starting from v8.1.0, max-record-rows will be assigned the value of threshold, regardless the user input. max-record-rows will be deprecated in the future.
# max-record-rows = 10000

[tikv-importer]
# Delivery backend, can be "importer", "local" or "tidb".
Expand Down
3 changes: 1 addition & 2 deletions pkg/ddl/ingest/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package ingest

import (
"context"
"math"
"net"
"path/filepath"
"strconv"
Expand Down Expand Up @@ -64,7 +63,7 @@ func genConfig(ctx context.Context, memRoot MemRoot, jobID int64, unique bool, r
cfg.Checkpoint.Enable = true
if unique {
cfg.Conflict.Strategy = lightning.ErrorOnDup
cfg.Conflict.Threshold = math.MaxInt64
cfg.Conflict.Threshold = lightning.DefaultRecordDuplicateThreshold
} else {
cfg.Conflict.Strategy = lightning.NoneOnDup
}
Expand Down
53 changes: 18 additions & 35 deletions pkg/lightning/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,9 @@ const (
defaultLogicalImportBatchRows = 65536

// defaultMetaSchemaName is the default database name used to store lightning metadata
defaultMetaSchemaName = "lightning_metadata"
defaultTaskInfoSchemaName = "lightning_task_info"
defaultMaxRecordRows = 100
defaultMetaSchemaName = "lightning_metadata"
defaultTaskInfoSchemaName = "lightning_task_info"
DefaultRecordDuplicateThreshold = 10000

// autoDiskQuotaLocalReservedSpeed is the estimated size increase per
// millisecond per write thread the local backend may gain on all engines.
Expand Down Expand Up @@ -1339,7 +1339,7 @@ type Conflict struct {

// adjust assigns default values and check illegal values. The arguments must be
// adjusted before calling this function.
func (c *Conflict) adjust(i *TikvImporter, l *Lightning) error {
func (c *Conflict) adjust(i *TikvImporter) error {
strategyConfigFrom := "conflict.strategy"
if c.Strategy == NoneOnDup {
if i.OnDuplicate == NoneOnDup && i.Backend == BackendTiDB {
Expand Down Expand Up @@ -1378,48 +1378,31 @@ func (c *Conflict) adjust(i *TikvImporter, l *Lightning) error {

if c.Threshold < 0 {
switch c.Strategy {
case ErrorOnDup:
case ErrorOnDup, NoneOnDup:
c.Threshold = 0
case IgnoreOnDup, ReplaceOnDup:
c.Threshold = math.MaxInt64
case NoneOnDup:
c.Threshold = 0
if i.Backend == BackendLocal && c.Strategy != NoneOnDup {
c.Threshold = math.MaxInt64
}
c.Threshold = DefaultRecordDuplicateThreshold
}
}
if c.Threshold > 0 && c.Strategy == ErrorOnDup {
return common.ErrInvalidConfig.GenWithStack(
`conflict.threshold cannot be set when use conflict.strategy = "error"`)
}

if c.MaxRecordRows < 0 {
maxErr := l.MaxError
// Compatible with the old behavior that records all syntax,charset,type errors.
maxAccepted := max(maxErr.Syntax.Load(), maxErr.Charset.Load(), maxErr.Type.Load())
if maxAccepted < defaultMaxRecordRows {
maxAccepted = defaultMaxRecordRows
}
if maxAccepted > c.Threshold {
maxAccepted = c.Threshold
}
if c.Strategy == ReplaceOnDup && i.Backend == BackendTiDB {
// due to we use batch insert, we can't know which row is duplicated.
maxAccepted = 0
if c.Strategy == ReplaceOnDup && i.Backend == BackendTiDB {
// due to we use batch insert, we can't know which row is duplicated.
if c.MaxRecordRows >= 0 {
// only warn when it is set by user.
log.L().Warn(`Cannot record duplication (conflict.max-record-rows > 0) when use tikv-importer.backend = \"tidb\" and conflict.strategy = \"replace\".
The value of conflict.max-record-rows has been converted to 0.`)
}
c.MaxRecordRows = maxAccepted
c.MaxRecordRows = 0
} else {
// only check it when it is set by user.
if c.MaxRecordRows > c.Threshold {
return common.ErrInvalidConfig.GenWithStack(
"conflict.max-record-rows (%d) cannot be larger than conflict.threshold (%d)",
c.MaxRecordRows, c.Threshold)
}
if c.Strategy == ReplaceOnDup && i.Backend == BackendTiDB {
return common.ErrInvalidConfig.GenWithStack(
`cannot record duplication (conflict.max-record-rows > 0) when use tikv-importer.backend = "tidb" and conflict.strategy = "replace"`)
if c.MaxRecordRows >= 0 {
// only warn when it is set by user.
log.L().Warn("Setting conflict.max-record-rows does not take affect. The value of conflict.max-record-rows has been converted to conflict.threshold.")
}
c.MaxRecordRows = c.Threshold
}
return nil
}
Expand Down Expand Up @@ -1622,7 +1605,7 @@ func (cfg *Config) Adjust(ctx context.Context) error {
if err = cfg.Routes.adjust(&cfg.Mydumper); err != nil {
return err
}
return cfg.Conflict.adjust(&cfg.TikvImporter, &cfg.App)
return cfg.Conflict.adjust(&cfg.TikvImporter)
}

// AdjustForDDL acts like Adjust, but DDL will not use some functionalities so
Expand Down
33 changes: 20 additions & 13 deletions pkg/lightning/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1042,16 +1042,21 @@ func TestAdjustMaxRecordRows(t *testing.T) {

cfg := NewConfig()
assignMinimalLegalValue(cfg)
cfg.Conflict.Threshold = 9999

cfg.Conflict.MaxRecordRows = -1
cfg.Conflict.Strategy = ReplaceOnDup
require.NoError(t, cfg.Adjust(ctx))
require.Equal(t, int64(100), cfg.Conflict.MaxRecordRows)
require.EqualValues(t, 10000, cfg.Conflict.MaxRecordRows)

cfg.Conflict.MaxRecordRows = -1
cfg.App.MaxError.Syntax.Store(1000)
cfg.Conflict.Threshold = 9999
require.NoError(t, cfg.Adjust(ctx))
require.EqualValues(t, 9999, cfg.Conflict.MaxRecordRows)

cfg.Conflict.MaxRecordRows = 1000
cfg.Conflict.Threshold = 100
require.NoError(t, cfg.Adjust(ctx))
require.Equal(t, int64(1000), cfg.Conflict.MaxRecordRows)
require.EqualValues(t, 100, cfg.Conflict.MaxRecordRows)
}

func TestRemoveAllowAllFiles(t *testing.T) {
Expand Down Expand Up @@ -1290,35 +1295,37 @@ func TestAdjustConflict(t *testing.T) {

require.NoError(t, dra.FromStringValue("REPLACE"))
cfg.Conflict.Strategy = dra
require.NoError(t, cfg.Conflict.adjust(&cfg.TikvImporter, &cfg.App))
require.Equal(t, int64(math.MaxInt64), cfg.Conflict.Threshold)
require.NoError(t, cfg.Conflict.adjust(&cfg.TikvImporter))
require.EqualValues(t, 10000, cfg.Conflict.Threshold)

require.NoError(t, dra.FromStringValue("IGNORE"))
cfg.Conflict.Strategy = dra
require.ErrorContains(t, cfg.Conflict.adjust(&cfg.TikvImporter, &cfg.App), `conflict.strategy cannot be set to "ignore" when use tikv-importer.backend = "local"`)
require.ErrorContains(t, cfg.Conflict.adjust(&cfg.TikvImporter), `conflict.strategy cannot be set to "ignore" when use tikv-importer.backend = "local"`)

cfg.Conflict.Strategy = ErrorOnDup
cfg.Conflict.Threshold = 1
require.ErrorContains(t, cfg.Conflict.adjust(&cfg.TikvImporter, &cfg.App), `conflict.threshold cannot be set when use conflict.strategy = "error"`)
require.ErrorContains(t, cfg.Conflict.adjust(&cfg.TikvImporter), `conflict.threshold cannot be set when use conflict.strategy = "error"`)

cfg.TikvImporter.Backend = BackendTiDB
cfg.Conflict.Strategy = ReplaceOnDup
cfg.Conflict.MaxRecordRows = -1
require.NoError(t, cfg.Conflict.adjust(&cfg.TikvImporter, &cfg.App))
require.Equal(t, int64(0), cfg.Conflict.MaxRecordRows)
require.NoError(t, cfg.Conflict.adjust(&cfg.TikvImporter))
require.EqualValues(t, 0, cfg.Conflict.MaxRecordRows)

cfg.TikvImporter.Backend = BackendLocal
cfg.Conflict.Threshold = 1
cfg.Conflict.MaxRecordRows = 1
require.NoError(t, cfg.Conflict.adjust(&cfg.TikvImporter, &cfg.App))
require.NoError(t, cfg.Conflict.adjust(&cfg.TikvImporter))
cfg.Conflict.MaxRecordRows = 2
require.ErrorContains(t, cfg.Conflict.adjust(&cfg.TikvImporter, &cfg.App), `conflict.max-record-rows (2) cannot be larger than conflict.threshold (1)`)
require.NoError(t, cfg.Conflict.adjust(&cfg.TikvImporter))
require.EqualValues(t, 1, cfg.Conflict.MaxRecordRows)

cfg.TikvImporter.Backend = BackendTiDB
cfg.Conflict.Strategy = ReplaceOnDup
cfg.Conflict.Threshold = 1
cfg.Conflict.MaxRecordRows = 1
require.ErrorContains(t, cfg.Conflict.adjust(&cfg.TikvImporter, &cfg.App), `cannot record duplication (conflict.max-record-rows > 0) when use tikv-importer.backend = "tidb" and conflict.strategy = "replace"`)
require.NoError(t, cfg.Conflict.adjust(&cfg.TikvImporter))
require.EqualValues(t, 0, cfg.Conflict.MaxRecordRows)
}

func TestAdjustBlockSize(t *testing.T) {
Expand Down
55 changes: 39 additions & 16 deletions pkg/lightning/errormanager/errormanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,10 @@ const (
typeErrorTableName = "type_error_v1"
// ConflictErrorTableName is the table name for duplicate detection.
ConflictErrorTableName = "conflict_error_v2"
// DupRecordTable is the table name to record duplicate data that displayed to user.
DupRecordTable = "conflict_records"
// DupRecordTableName is the table name to record duplicate data that displayed to user.
DupRecordTableName = "conflict_records"
// ConflictViewName is the view name for presenting the union information of ConflictErrorTable and DupRecordTable.
ConflictViewName = "conflict_view"

createSyntaxErrorTable = `
CREATE TABLE IF NOT EXISTS %s.` + syntaxErrorTableName + ` (
Expand Down Expand Up @@ -100,8 +102,8 @@ const (
);
`

createDupRecordTable = `
CREATE TABLE IF NOT EXISTS %s.` + DupRecordTable + ` (
createDupRecordTableName = `
CREATE TABLE IF NOT EXISTS %s.` + DupRecordTableName + ` (
task_id bigint NOT NULL,
create_time datetime(6) NOT NULL DEFAULT now(6),
table_name varchar(261) NOT NULL,
Expand All @@ -114,6 +116,16 @@ const (
);
`

createConflictView = `
CREATE OR REPLACE VIEW %s.` + ConflictViewName + `
AS SELECT 0 AS is_precheck_conflict, task_id, create_time, table_name, index_name, key_data, row_data,
raw_key, raw_value, raw_handle, raw_row, is_data_kv, NULL AS path, NULL AS offset, NULL AS error, NULL AS row_id
FROM %s.` + ConflictErrorTableName + `
UNION ALL SELECT 1 AS is_precheck_conflict, task_id, create_time, table_name, NULL AS index_name, NULL AS key_data,
row_data, NULL AS raw_key, NULL AS raw_value, NULL AS raw_handle, NULL AS raw_row, NULL AS is_data_kv, path,
offset, error, row_id FROM %s.` + DupRecordTableName + `;
`

insertIntoTypeError = `
INSERT INTO %s.` + typeErrorTableName + `
(task_id, table_name, path, offset, error, row_data)
Expand Down Expand Up @@ -156,7 +168,7 @@ const (
`

insertIntoDupRecord = `
INSERT INTO %s.` + DupRecordTable + `
INSERT INTO %s.` + DupRecordTableName + `
(task_id, table_name, path, offset, error, row_id, row_data)
VALUES (?, ?, ?, ?, ?, ?, ?);
`
Expand Down Expand Up @@ -250,10 +262,10 @@ func (em *ErrorManager) Init(ctx context.Context) error {
sqls = append(sqls, [2]string{"create type error table", createTypeErrorTable})
}
if em.conflictV1Enabled {
sqls = append(sqls, [2]string{"create conflict error v2 table", createConflictErrorTable})
sqls = append(sqls, [2]string{"create conflict error table", createConflictErrorTable})
}
if em.conflictV2Enabled {
sqls = append(sqls, [2]string{"create duplicate records table", createDupRecordTable})
sqls = append(sqls, [2]string{"create duplicate records table", createDupRecordTableName})
}

// No need to create task info schema if no error is allowed.
Expand All @@ -269,6 +281,14 @@ func (em *ErrorManager) Init(ctx context.Context) error {
}
}

// TODO: return VIEW to users regardless of the lightning configuration
if em.conflictV1Enabled && em.conflictV2Enabled {
err := exec.Exec(ctx, "create conflict view", strings.TrimSpace(common.SprintfWithIdentifiers(createConflictView, em.schema, em.schema, em.schema)))
if err != nil {
return err
}
}

return nil
}

Expand Down Expand Up @@ -904,12 +924,14 @@ func (em *ErrorManager) LogErrorDetails() {
// TODO: add charset table name
em.logger.Warn(fmtErrMsg(errCnt, "data charset", ""))
}
if errCnt := em.conflictError(); errCnt > 0 {
if em.conflictV1Enabled {
errCnt := em.conflictError()
if errCnt > 0 {
if em.conflictV1Enabled && em.conflictV2Enabled {
em.logger.Warn(fmtErrMsg(errCnt, "conflict", ConflictViewName))
} else if em.conflictV1Enabled {
em.logger.Warn(fmtErrMsg(errCnt, "conflict", ConflictErrorTableName))
}
if em.conflictV2Enabled {
em.logger.Warn(fmtErrMsg(errCnt, "conflict", DupRecordTable))
} else if em.conflictV2Enabled {
em.logger.Warn(fmtErrMsg(errCnt, "conflict", DupRecordTableName))
}
}
}
Expand Down Expand Up @@ -952,11 +974,12 @@ func (em *ErrorManager) Output() string {
}
if errCnt := em.conflictError(); errCnt > 0 {
count++
if em.conflictV1Enabled {
if em.conflictV1Enabled && em.conflictV2Enabled {
t.AppendRow(table.Row{count, "Unique Key Conflict", errCnt, em.fmtTableName(ConflictViewName)})
} else if em.conflictV1Enabled {
t.AppendRow(table.Row{count, "Unique Key Conflict", errCnt, em.fmtTableName(ConflictErrorTableName)})
}
if em.conflictV2Enabled {
t.AppendRow(table.Row{count, "Unique Key Conflict", errCnt, em.fmtTableName(DupRecordTable)})
} else if em.conflictV2Enabled {
t.AppendRow(table.Row{count, "Unique Key Conflict", errCnt, em.fmtTableName(DupRecordTableName)})
}
}

Expand Down
Loading

0 comments on commit 113b633

Please sign in to comment.