Skip to content

Commit

Permalink
fixed: data scan miss expresion, close #15
Browse files Browse the repository at this point in the history
  • Loading branch information
wentaojin committed Jun 26, 2024
1 parent 49db1d2 commit 5608d6b
Show file tree
Hide file tree
Showing 11 changed files with 147 additions and 55 deletions.
30 changes: 23 additions & 7 deletions database/oracle/taskflow/csv_migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -499,10 +499,10 @@ func (cmt *CsvMigrateTask) InitCsvMigrateTask(databaseS database.IDatabase, dbVe
return err
}
var (
includeTables []string
excludeTables []string
databaseTables []string // task tables
globalScn string
includeTables []string
excludeTables []string
databaseTaskTables []string // task tables
globalScn string
)
databaseTableTypeMap := make(map[string]string)

Expand All @@ -515,11 +515,27 @@ func (cmt *CsvMigrateTask) InitCsvMigrateTask(databaseS database.IDatabase, dbVe
}
}

databaseTables, err = databaseS.FilterDatabaseTable(schemaRoute.SchemaNameS, includeTables, excludeTables)
databaseFilterTables, err := databaseS.FilterDatabaseTable(schemaRoute.SchemaNameS, includeTables, excludeTables)
if err != nil {
return err
}

// rule case field
for _, t := range databaseFilterTables {
var tabName string
// the according target case field rule convert
if strings.EqualFold(cmt.Task.CaseFieldRuleS, constant.ParamValueStructMigrateCaseFieldRuleLower) {
tabName = stringutil.StringLower(t)
}
if strings.EqualFold(cmt.Task.CaseFieldRuleS, constant.ParamValueStructMigrateCaseFieldRuleUpper) {
tabName = stringutil.StringUpper(t)
}
if strings.EqualFold(cmt.Task.CaseFieldRuleS, constant.ParamValueStructMigrateCaseFieldRuleOrigin) {
tabName = t
}
databaseTaskTables = append(databaseTaskTables, tabName)
}

// clear the csv migrate task table
migrateGroupTasks, err := model.GetIDataMigrateTaskRW().FindDataMigrateTaskGroupByTaskSchemaTable(cmt.Ctx, cmt.Task.TaskName)
if err != nil {
Expand All @@ -529,7 +545,7 @@ func (cmt *CsvMigrateTask) InitCsvMigrateTask(databaseS database.IDatabase, dbVe

if len(migrateGroupTasks) > 0 {
taskTablesMap := make(map[string]struct{})
for _, t := range databaseTables {
for _, t := range databaseTaskTables {
taskTablesMap[t] = struct{}{}
}
for _, smt := range migrateGroupTasks {
Expand Down Expand Up @@ -622,7 +638,7 @@ func (cmt *CsvMigrateTask) InitCsvMigrateTask(databaseS database.IDatabase, dbVe
g, gCtx := errgroup.WithContext(cmt.Ctx)
g.SetLimit(int(cmt.TaskParams.TableThread))

for _, taskJob := range databaseTables {
for _, taskJob := range databaseTaskTables {
sourceTable := taskJob
g.Go(func() error {
select {
Expand Down
32 changes: 24 additions & 8 deletions database/oracle/taskflow/data_compare.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,11 +422,11 @@ func (dmt *DataCompareTask) InitDataCompareTask(databaseS, databaseT database.ID
return err
}
var (
includeTables []string
excludeTables []string
databaseTables []string // task tables
globalScnS string
globalScnT string
includeTables []string
excludeTables []string
databaseTaskTables []string // task tables
globalScnS string
globalScnT string
)
databaseTableTypeMap := make(map[string]string)

Expand All @@ -439,11 +439,27 @@ func (dmt *DataCompareTask) InitDataCompareTask(databaseS, databaseT database.ID
}
}

databaseTables, err = databaseS.FilterDatabaseTable(schemaRoute.SchemaNameS, includeTables, excludeTables)
databaseFilterTables, err := databaseS.FilterDatabaseTable(schemaRoute.SchemaNameS, includeTables, excludeTables)
if err != nil {
return err
}

// rule case field
for _, t := range databaseFilterTables {
var tabName string
// the according target case field rule convert
if strings.EqualFold(dmt.Task.CaseFieldRuleS, constant.ParamValueStructMigrateCaseFieldRuleLower) {
tabName = stringutil.StringLower(t)
}
if strings.EqualFold(dmt.Task.CaseFieldRuleS, constant.ParamValueStructMigrateCaseFieldRuleUpper) {
tabName = stringutil.StringUpper(t)
}
if strings.EqualFold(dmt.Task.CaseFieldRuleS, constant.ParamValueStructMigrateCaseFieldRuleOrigin) {
tabName = t
}
databaseTaskTables = append(databaseTaskTables, tabName)
}

// clear the data compare task table
// repeatInitTableMap used for store the struct_migrate_task table name has be finished, avoid repeated initialization
migrateGroupTasks, err := model.GetIDataCompareTaskRW().FindDataCompareTaskGroupByTaskSchemaTable(dmt.Ctx, dmt.Task.TaskName)
Expand All @@ -454,7 +470,7 @@ func (dmt *DataCompareTask) InitDataCompareTask(databaseS, databaseT database.ID

if len(migrateGroupTasks) > 0 {
taskTablesMap := make(map[string]struct{})
for _, t := range databaseTables {
for _, t := range databaseTaskTables {
taskTablesMap[t] = struct{}{}
}
for _, smt := range migrateGroupTasks {
Expand Down Expand Up @@ -558,7 +574,7 @@ func (dmt *DataCompareTask) InitDataCompareTask(databaseS, databaseT database.ID
g, gCtx := errgroup.WithContext(dmt.Ctx)
g.SetLimit(int(dmt.TaskParams.TableThread))

for _, taskJob := range databaseTables {
for _, taskJob := range databaseTaskTables {
sourceTable := taskJob
g.Go(func() error {
select {
Expand Down
36 changes: 29 additions & 7 deletions database/oracle/taskflow/data_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,10 +403,10 @@ func (dst *DataScanTask) initDataScanTask(databaseS database.IDatabase, dbVersio
return err
}
var (
includeTables []string
excludeTables []string
databaseTables []string // task tables
globalScn string
includeTables []string
excludeTables []string
databaseTaskTables []string // task tables
globalScn string
)
databaseTableTypeMap := make(map[string]string)

Expand All @@ -419,11 +419,27 @@ func (dst *DataScanTask) initDataScanTask(databaseS database.IDatabase, dbVersio
}
}

databaseTables, err = databaseS.FilterDatabaseTable(schemaNameS, includeTables, excludeTables)
databaseFilterTables, err := databaseS.FilterDatabaseTable(schemaNameS, includeTables, excludeTables)
if err != nil {
return err
}

// rule case field
for _, t := range databaseFilterTables {
var tabName string
// the according target case field rule convert
if strings.EqualFold(dst.Task.CaseFieldRuleS, constant.ParamValueStructMigrateCaseFieldRuleLower) {
tabName = stringutil.StringLower(t)
}
if strings.EqualFold(dst.Task.CaseFieldRuleS, constant.ParamValueStructMigrateCaseFieldRuleUpper) {
tabName = stringutil.StringUpper(t)
}
if strings.EqualFold(dst.Task.CaseFieldRuleS, constant.ParamValueStructMigrateCaseFieldRuleOrigin) {
tabName = t
}
databaseTaskTables = append(databaseTaskTables, tabName)
}

// clear the data scan task table
// repeatInitTableMap used for store the data_scan_task table name has be finished, avoid repeated initialization
migrateGroupTasks, err := model.GetIDataScanTaskRW().FindDataScanTaskGroupByTaskSchemaTable(dst.Ctx, dst.Task.TaskName)
Expand All @@ -434,7 +450,7 @@ func (dst *DataScanTask) initDataScanTask(databaseS database.IDatabase, dbVersio

if len(migrateGroupTasks) > 0 {
taskTablesMap := make(map[string]struct{})
for _, t := range databaseTables {
for _, t := range databaseTaskTables {
taskTablesMap[t] = struct{}{}
}
for _, mt := range migrateGroupTasks {
Expand Down Expand Up @@ -528,7 +544,7 @@ func (dst *DataScanTask) initDataScanTask(databaseS database.IDatabase, dbVersio
g, gCtx := errgroup.WithContext(dst.Ctx)
g.SetLimit(int(dst.TaskParams.TableThread))

for _, taskJob := range databaseTables {
for _, taskJob := range databaseTaskTables {
sourceTable := taskJob
g.Go(func() error {
select {
Expand Down Expand Up @@ -570,6 +586,12 @@ func (dst *DataScanTask) initDataScanTask(databaseS database.IDatabase, dbVersio
return err
}

// If the database table ColumnDetailS and GroupColumnS return ""
// it means that the database table does not have a number data type field, ignore and skip init
if strings.EqualFold(attsRule.ColumnDetailS, "") && strings.EqualFold(attsRule.GroupColumnS, "") {
return nil
}

var whereRange string
size, err := stringutil.StrconvFloatBitSize(attsRule.TableSamplerateS, 64)
if err != nil {
Expand Down
30 changes: 23 additions & 7 deletions database/oracle/taskflow/stmt_migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -474,10 +474,10 @@ func (stm *StmtMigrateTask) initStmtMigrateTask(databaseS, databaseT database.ID
return err
}
var (
includeTables []string
excludeTables []string
databaseTables []string // task tables
globalScn string
includeTables []string
excludeTables []string
databaseTaskTables []string // task tables
globalScn string
)
databaseTableTypeMap := make(map[string]string)

Expand All @@ -490,11 +490,27 @@ func (stm *StmtMigrateTask) initStmtMigrateTask(databaseS, databaseT database.ID
}
}

databaseTables, err = databaseS.FilterDatabaseTable(schemaRoute.SchemaNameS, includeTables, excludeTables)
databaseFilterTables, err := databaseS.FilterDatabaseTable(schemaRoute.SchemaNameS, includeTables, excludeTables)
if err != nil {
return err
}

// rule case field
for _, t := range databaseFilterTables {
var tabName string
// the according target case field rule convert
if strings.EqualFold(stm.Task.CaseFieldRuleS, constant.ParamValueStructMigrateCaseFieldRuleLower) {
tabName = stringutil.StringLower(t)
}
if strings.EqualFold(stm.Task.CaseFieldRuleS, constant.ParamValueStructMigrateCaseFieldRuleUpper) {
tabName = stringutil.StringUpper(t)
}
if strings.EqualFold(stm.Task.CaseFieldRuleS, constant.ParamValueStructMigrateCaseFieldRuleOrigin) {
tabName = t
}
databaseTaskTables = append(databaseTaskTables, tabName)
}

// clear the stmt migrate task table
// repeatInitTableMap used for store the struct_migrate_task table name has be finished, avoid repeated initialization
migrateGroupTasks, err := model.GetIDataMigrateTaskRW().FindDataMigrateTaskGroupByTaskSchemaTable(stm.Ctx, stm.Task.TaskName)
Expand All @@ -505,7 +521,7 @@ func (stm *StmtMigrateTask) initStmtMigrateTask(databaseS, databaseT database.ID

if len(migrateGroupTasks) > 0 {
taskTablesMap := make(map[string]struct{})
for _, t := range databaseTables {
for _, t := range databaseTaskTables {
taskTablesMap[t] = struct{}{}
}
for _, mt := range migrateGroupTasks {
Expand Down Expand Up @@ -599,7 +615,7 @@ func (stm *StmtMigrateTask) initStmtMigrateTask(databaseS, databaseT database.ID
g, gCtx := errgroup.WithContext(stm.Ctx)
g.SetLimit(int(stm.TaskParams.TableThread))

for _, taskJob := range databaseTables {
for _, taskJob := range databaseTaskTables {
sourceTable := taskJob
g.Go(func() error {
select {
Expand Down
33 changes: 25 additions & 8 deletions database/oracle/taskflow/struct_compare.go
Original file line number Diff line number Diff line change
Expand Up @@ -520,9 +520,9 @@ func (dmt *StructCompareTask) initStructCompareTask(databaseS, databaseT databas
return err
}
var (
includeTables []string
excludeTables []string
databaseTables []string // task tables
includeTables []string
excludeTables []string
databaseTaskTables []string // task tables
)
databaseTableTypeMap := make(map[string]string)

Expand All @@ -535,10 +535,27 @@ func (dmt *StructCompareTask) initStructCompareTask(databaseS, databaseT databas
}
}

databaseTables, err = databaseS.FilterDatabaseTable(schemaRoute.SchemaNameS, includeTables, excludeTables)
databaseFilterTables, err := databaseS.FilterDatabaseTable(schemaRoute.SchemaNameS, includeTables, excludeTables)
if err != nil {
return err
}

// rule case field
for _, t := range databaseFilterTables {
var tabName string
// the according target case field rule convert
if strings.EqualFold(dmt.Task.CaseFieldRuleS, constant.ParamValueStructMigrateCaseFieldRuleLower) {
tabName = stringutil.StringLower(t)
}
if strings.EqualFold(dmt.Task.CaseFieldRuleS, constant.ParamValueStructMigrateCaseFieldRuleUpper) {
tabName = stringutil.StringUpper(t)
}
if strings.EqualFold(dmt.Task.CaseFieldRuleS, constant.ParamValueStructMigrateCaseFieldRuleOrigin) {
tabName = t
}
databaseTaskTables = append(databaseTaskTables, tabName)
}

databaseTableTypeMap, err = databaseS.GetDatabaseTableType(schemaRoute.SchemaNameS)
if err != nil {
return err
Expand Down Expand Up @@ -571,7 +588,7 @@ func (dmt *StructCompareTask) initStructCompareTask(databaseS, databaseT databas
}

var panicTables []string
for _, t := range databaseTables {
for _, t := range databaseTaskTables {
if _, ok := tableRouteRuleTNew[t]; !ok {
panicTables = append(panicTables, t)
}
Expand All @@ -590,7 +607,7 @@ func (dmt *StructCompareTask) initStructCompareTask(databaseS, databaseT databas
repeatInitTableMap := make(map[string]struct{})
if len(migrateTasks) > 0 {
taskTablesMap := make(map[string]struct{})
for _, t := range databaseTables {
for _, t := range databaseTaskTables {
taskTablesMap[t] = struct{}{}
}
for _, smt := range migrateTasks {
Expand All @@ -613,7 +630,7 @@ func (dmt *StructCompareTask) initStructCompareTask(databaseS, databaseT databas
// database tables
// init database table
// get table column route rule
for _, sourceTable := range databaseTables {
for _, sourceTable := range databaseTaskTables {
initStructInfos, err := model.GetIStructCompareTaskRW().GetStructCompareTaskTable(dmt.Ctx, &task.StructCompareTask{
TaskName: dmt.Task.TaskName,
SchemaNameS: schemaRoute.SchemaNameS,
Expand Down Expand Up @@ -665,7 +682,7 @@ func (dmt *StructCompareTask) initStructCompareTask(databaseS, databaseT databas
&task.StructCompareSummary{
TaskName: dmt.Task.TaskName,
SchemaNameS: schemaRoute.SchemaNameS,
TableTotals: uint64(len(databaseTables)),
TableTotals: uint64(len(databaseTaskTables)),
})
if err != nil {
return err
Expand Down
2 changes: 0 additions & 2 deletions example/csv_migrate_task.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@ case-field-rule-t = "0"
schema-name-s = "MARVIN"
schema-name-t = "STEVEN"
include-table-s = ["MARVIN00","MARVIN01","MARVIN_COLUMN_T"]
#include-table-s = ["PM_TC_PROCESS_CODE","MARVIN00","MARVIN01"]
exclude-table-s = []
#exclude-table-s = ["P4","WUCHAO_TEST_BLOB"]

[[schema-route-rule.table-route-rules]]
table-name-s = "MARVIN00"
Expand Down
1 change: 0 additions & 1 deletion example/data_compare_task00.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ include-table-s = ["MARVIN_COLUMN_T"]
#include-table-s = ["MARVIN00","MARVIN01","MARVIN05"]
#include-table-s = ["PM_TC_PROCESS_CODE","MARVIN00","MARVIN01"]
exclude-table-s = []
#exclude-table-s = ["P4","WUCHAO_TEST_BLOB"]

[[schema-route-rule.table-route-rules]]
table-name-s = "MARVIN00"
Expand Down
1 change: 0 additions & 1 deletion example/data_compare_task01.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ include-table-s = ["MARVIN_COLUMN_T"]
#include-table-s = ["MARVIN00","MARVIN01","MARVIN05"]
#include-table-s = ["PM_TC_PROCESS_CODE","MARVIN00","MARVIN01"]
exclude-table-s = []
#exclude-table-s = ["P4","WUCHAO_TEST_BLOB"]

[[schema-route-rule.table-route-rules]]
table-name-s = "MARVIN00"
Expand Down
8 changes: 3 additions & 5 deletions example/data_scan_task.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,9 @@ case-field-rule-s = "0"

[schema-route-rule]
schema-name-s = "MARVIN"
#include-table-s = ["MARVIN00","MARVIN01","MARVIN05","MARVIN_COLUMN_T"]
include-table-s = ["MARVIN00","MARVIN01"]
#include-table-s = ["PM_TC_PROCESS_CODE","MARVIN00","MARVIN01"]
include-table-s = ["MARVIN10"]
#include-table-s = ["MARVIN00","MARVIN01"]
exclude-table-s = []
#exclude-table-s = ["P4","WUCHAO_TEST_BLOB"]

[[data-scan-rules]]
table-name-s = "MARVIN00"
Expand All @@ -27,7 +25,7 @@ table-samplerate-s = 100
# 采样扫描下用于表执行并发数
table-thread = 100
# 数据写入批量大小
batch-size = 1
batch-size = 500
# 全表扫描
# 任务 chunk 数,固定动作,一旦确认,不能更改,除非设置 enable-checkpoint = false,重新导出导入
# 1、代表每张表每并发处理多少行数
Expand Down
1 change: 0 additions & 1 deletion example/stmt_migrate_task.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ schema-name-t = "STEVEN"
include-table-s = ["MARVIN_COLUMN_T"]
#include-table-s = ["PM_TC_PROCESS_CODE","MARVIN00","MARVIN01"]
exclude-table-s = []
#exclude-table-s = ["P4","WUCHAO_TEST_BLOB"]

[[schema-route-rule.table-route-rules]]
table-name-s = "MARVIN00"
Expand Down
Loading

0 comments on commit 5608d6b

Please sign in to comment.