Skip to content

Commit 05af90b

Browse files
authored
many bug fixes (moiot#150)
* os.Exit(1) when configuration file changed * introduce official mongo driver. mgo is not maintained anymore, and mgo have bugs when using find for range query. (sometimes it does not return any result when the query is right) * Change metric name from EnterInput to Start, and calculate Start before initiate query in mongobatch * Fix mysql batch scan when primary key contains non-acii char. * setup mongo replica set in docker test * Some minor refactoring.
1 parent acfe1a9 commit 05af90b

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

48 files changed

+694
-334
lines changed

.gitignore

+4-4
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,14 @@ cmd/scratch
77
profile
88
*.idea
99
*.meta
10-
config.toml
11-
master_config.toml
10+
11+
1212
*_dev.toml
1313
*_dev.json
1414

1515
config_dev
16-
nuclear/config_dev
17-
scanner/config_dev
16+
17+
1818

1919
*.log
2020
.schema-store

Dockerfile.mongo.setup

+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
FROM mongo:4.1
2+
3+
RUN echo "rs.initiate();" > /docker-entrypoint-initdb.d/replica-init.js

cmd/verifier/main.go

+13-11
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ import (
88
"strconv"
99
"time"
1010

11+
"github.com/moiot/gravity/pkg/config"
12+
1113
_ "github.com/go-sql-driver/mysql"
1214

1315
"sort"
@@ -31,17 +33,17 @@ import (
3133
)
3234

3335
type Config struct {
34-
Source *utils.DBConfig `toml:"source"`
35-
Target *utils.DBConfig `toml:"target"`
36-
SourceTable string `toml:"source-table"`
37-
TargetTable string `toml:"target-table"`
38-
Shading bool `toml:"shading"`
39-
ColName string `toml:"col-name"`
40-
ColType string `toml:"col-type"`
41-
MinString string `toml:"min-string"`
42-
MaxString string `toml:"max-string"`
43-
ValPrefix string `toml:"prefix"`
44-
Parallel int `toml:"parallel"`
36+
Source *config.DBConfig `toml:"source"`
37+
Target *config.DBConfig `toml:"target"`
38+
SourceTable string `toml:"source-table"`
39+
TargetTable string `toml:"target-table"`
40+
Shading bool `toml:"shading"`
41+
ColName string `toml:"col-name"`
42+
ColType string `toml:"col-type"`
43+
MinString string `toml:"min-string"`
44+
MaxString string `toml:"max-string"`
45+
ValPrefix string `toml:"prefix"`
46+
Parallel int `toml:"parallel"`
4547
}
4648

4749
func main() {

dcp/barrier/barrier.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ import (
44
"database/sql"
55
"time"
66

7+
"github.com/moiot/gravity/pkg/config"
8+
79
"github.com/moiot/gravity/pkg/utils"
810
log "github.com/sirupsen/logrus"
911
)
@@ -17,7 +19,7 @@ const (
1719
)
1820

1921
type Config struct {
20-
Db utils.DBConfig
22+
Db config.DBConfig
2123
TickerSeconds int
2224
TestDB *sql.DB
2325
}

docker-compose-gravity-test.yml

+9-2
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,10 @@ services:
2424
- ./mycnf:/etc/mysql/conf.d
2525

2626
mongo:
27-
image: mongo:4.1
2827
container_name: mongo
28+
build:
29+
context: ./
30+
dockerfile: Dockerfile.mongo.setup
2931
ports:
3032
- 27017
3133
logging:
@@ -71,4 +73,9 @@ services:
7173
- MONGO_HOST=mongo
7274
- KAFKA_BROKER=kafka:9092
7375
- ELASTICSEARCH_URLS=http://elasticsearch:9200
74-
command: ["bash", "./wait-for-it.sh", "source-db:3306","-t", "0", "--", "./wait-for-it.sh", "target-db:3306","-t", "0", "--", "./wait-for-it.sh", "mongo:27017", "-t", "0", "--", "./wait-for-it.sh", "kafka:9092", "-t", "0", "--", "./wait-for-it.sh", "elasticsearch:9200", "-t", "0", "--", "make", "go-test"]
76+
command: ["bash", "./wait-for-it.sh", "source-db:3306","-t", "0",
77+
"--", "./wait-for-it.sh", "target-db:3306","-t", "0",
78+
"--", "./wait-for-it.sh", "mongo:27017", "-t", "0",
79+
"--", "./wait-for-it.sh", "kafka:9092", "-t", "0",
80+
"--", "./wait-for-it.sh", "elasticsearch:9200", "-t", "0",
81+
"--", "make", "go-test"]

go.mod

+3
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,9 @@ require (
5151
github.com/siddontang/go-mysql v0.0.0-20190312052122-c6ab05a85eb8
5252
github.com/sirupsen/logrus v1.2.0
5353
github.com/stretchr/testify v1.3.0
54+
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c // indirect
55+
github.com/xdg/stringprep v1.0.0 // indirect
56+
go.mongodb.org/mongo-driver v1.0.1
5457
golang.org/x/net v0.0.0-20190311183353-d8887717615a
5558
google.golang.org/grpc v1.19.0
5659
gopkg.in/DATA-DOG/go-sqlmock.v1 v1.3.0

go.sum

+7
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ github.com/go-ole/go-ole v1.2.2/go.mod h1:pnvuG7BrDMZ8ifMurTQmxwhQM/odqm9sSqNe5B
7676
github.com/go-sql-driver/mysql v0.0.0-20170715192408-3955978caca4/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w=
7777
github.com/go-sql-driver/mysql v1.4.0 h1:7LxgVwFb2hIQtMm87NdgAVfXjnt4OePseqT1tKx+opk=
7878
github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w=
79+
github.com/go-stack/stack v1.8.0 h1:5SgMzNM5HxrEjV0ww2lTmX6E2Izsfxas4+YHWRs3Lsk=
7980
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
8081
github.com/gogo/protobuf v0.0.0-20180717141946-636bf0302bc9/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
8182
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
@@ -300,8 +301,14 @@ github.com/ulikunitz/xz v0.5.5 h1:pFrO0lVpTBXLpYw+pnLj6TbvHuyjXMfjGeCwSqCVwok=
300301
github.com/ulikunitz/xz v0.5.5/go.mod h1:2bypXElzHzzJZwzH67Y6wb67pO62Rzfn7BSiF4ABRW8=
301302
github.com/unrolled/render v0.0.0-20180914162206-b9786414de4d h1:ggUgChAeyge4NZ4QUw6lhHsVymzwSDJOZcE0s2X8S20=
302303
github.com/unrolled/render v0.0.0-20180914162206-b9786414de4d/go.mod h1:tu82oB5W2ykJRVioYsB+IQKcft7ryBr7w12qMBUPyXg=
304+
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV0YCnDjqSL7/q/JyPhhJSPk=
305+
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
306+
github.com/xdg/stringprep v1.0.0 h1:d9X0esnoa3dFsV0FG35rAT0RIhYFlPq7MiP+DW89La0=
307+
github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y=
303308
github.com/xiang90/probing v0.0.0-20160813154853-07dd2e8dfe18 h1:MPPkRncZLN9Kh4MEFmbnK4h3BD7AUmskWv2+EeZJCCs=
304309
github.com/xiang90/probing v0.0.0-20160813154853-07dd2e8dfe18/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
310+
go.mongodb.org/mongo-driver v1.0.1 h1:r2xNB8juGGrZVcIjX2TpY7HUfz+pNYq+GIuC9h6URZg=
311+
go.mongodb.org/mongo-driver v1.0.1/go.mod h1:u7ryQJ+DOzQmeO7zB6MHyr8jkEQvC8vH7qLUO4lqsUM=
305312
go.opencensus.io v0.20.1/go.mod h1:6WKK9ahsWS3RSO+PY9ZHZUfv2irvY6gN279GOPZjmmk=
306313
go.uber.org/atomic v1.3.2 h1:2Oa65PReHzfn29GpvgsYwloV9AVFHPDk8tYxt2c2tr4=
307314
go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=

integration_test/mongokafka/main_test.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ import (
44
"os"
55
"testing"
66

7+
"github.com/moiot/gravity/pkg/utils"
8+
79
"github.com/Shopify/sarama"
810

911
"github.com/moiot/gravity/pkg/consts"
@@ -13,7 +15,6 @@ import (
1315

1416
mgo "gopkg.in/mgo.v2"
1517

16-
"github.com/moiot/gravity/pkg/mongo"
1718
"github.com/moiot/gravity/pkg/mongo_test"
1819
)
1920

@@ -24,7 +25,7 @@ var kafkaAdmin sarama.ClusterAdmin
2425

2526
func TestMain(m *testing.M) {
2627
mongoCfg := mongo_test.TestConfig()
27-
s, err := mongo.CreateMongoSession(&mongoCfg)
28+
s, err := utils.CreateMongoSession(&mongoCfg)
2829
if err != nil {
2930
panic(err)
3031
}

integration_test/mongomysql/main_test.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,12 @@ import (
44
"os"
55
"testing"
66

7+
"github.com/moiot/gravity/pkg/utils"
8+
79
mgo "gopkg.in/mgo.v2"
810

911
"github.com/moiot/gravity/pkg/consts"
1012

11-
"github.com/moiot/gravity/pkg/mongo"
1213
"github.com/moiot/gravity/pkg/mongo_test"
1314
)
1415

@@ -17,7 +18,7 @@ var db *mgo.Database
1718

1819
func TestMain(m *testing.M) {
1920
mongoCfg := mongo_test.TestConfig()
20-
s, err := mongo.CreateMongoSession(&mongoCfg)
21+
s, err := utils.CreateMongoSession(&mongoCfg)
2122
if err != nil {
2223
panic(err)
2324
}

padder/config/config.go

+4-3
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,12 @@ package config
33
import (
44
"flag"
55

6+
"github.com/moiot/gravity/pkg/config"
7+
68
"github.com/BurntSushi/toml"
79
"github.com/juju/errors"
810

911
"github.com/moiot/gravity/pkg/logutil"
10-
"github.com/moiot/gravity/pkg/utils"
1112
)
1213

1314
type Config struct {
@@ -43,8 +44,8 @@ func (cfg *Config) ParseCmd(arguments []string) error {
4344
}
4445

4546
type MySQLConfig struct {
46-
Target *utils.DBConfig `toml:"target" json:"target"`
47-
StartPosition *utils.MySQLBinlogPosition `toml:"start-position" json:"start-position"`
47+
Target *config.DBConfig `toml:"target" json:"target"`
48+
StartPosition *config.MySQLBinlogPosition `toml:"start-position" json:"start-position"`
4849
}
4950

5051
func (c *Config) CreateConfigFromFile(path string) error {

pkg/config/config.go

+46-9
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ import (
1212
"gopkg.in/mgo.v2/bson"
1313

1414
"github.com/moiot/gravity/pkg/logutil"
15-
"github.com/moiot/gravity/pkg/utils"
1615
)
1716

1817
var DefaultBinlogSyncerTimeout = "10s"
@@ -69,7 +68,7 @@ type PipelineConfig struct {
6968

7069
TableConfig []*TableConfig `toml:"table-config" json:"table-config"`
7170

72-
TargetMySQL *utils.DBConfig `toml:"target-mysql" json:"target-mysql"`
71+
TargetMySQL *DBConfig `toml:"target-mysql" json:"target-mysql"`
7372

7473
TargetMySQLWorkerCfg *TargetMySQLWorkerConfig `toml:"target-mysql-worker" json:"target-mysql-worker"`
7574

@@ -93,8 +92,8 @@ type SourceKafkaConfig struct {
9392
}
9493

9594
type SourceProbeCfg struct {
96-
SourceMySQL *utils.DBConfig `mapstructure:"mysql" toml:"mysql" json:"mysql"`
97-
Annotation string `mapstructure:"annotation" toml:"annotation" json:"annotation"`
95+
SourceMySQL *DBConfig `mapstructure:"mysql" toml:"mysql" json:"mysql"`
96+
Annotation string `mapstructure:"annotation" toml:"annotation" json:"annotation"`
9897
}
9998

10099
type MongoPosition bson.MongoTimestamp
@@ -110,14 +109,14 @@ type MongoConfigs struct {
110109
}
111110

112111
type MySQLConfig struct {
113-
IgnoreBiDirectionalData bool `mapstructure:"ignore-bidirectional-data" toml:"ignore-bidirectional-data" json:"ignore-bidirectional-data"`
114-
Source *utils.DBConfig `mapstructure:"source" toml:"source" json:"source"`
115-
SourceSlave *utils.DBConfig `mapstructure:"source-slave" toml:"source-slave" json:"source-slave"`
116-
StartPosition *utils.MySQLBinlogPosition `mapstructure:"start-position" toml:"start-position" json:"start-position"`
112+
IgnoreBiDirectionalData bool `mapstructure:"ignore-bidirectional-data" toml:"ignore-bidirectional-data" json:"ignore-bidirectional-data"`
113+
Source *DBConfig `mapstructure:"source" toml:"source" json:"source"`
114+
SourceSlave *DBConfig `mapstructure:"source-slave" toml:"source-slave" json:"source-slave"`
115+
StartPosition *MySQLBinlogPosition `mapstructure:"start-position" toml:"start-position" json:"start-position"`
117116
}
118117

119118
type SourceTiDBConfig struct {
120-
SourceDB *utils.DBConfig `mapstructure:"source-db" toml:"source-db" json:"source-db"`
119+
SourceDB *DBConfig `mapstructure:"source-db" toml:"source-db" json:"source-db"`
121120
SourceKafka *SourceKafkaConfig `mapstructure:"source-kafka" toml:"source-kafka" json:"source-kafka"`
122121
// OffsetStoreConfig *SourceProbeCfg `mapstructure:"offset-store" toml:"offset-store" json:"offset-store"`
123122
PositionRepo *GenericPluginConfig `mapstructure:"position-repo" toml:"position-repo" json:"position-repo"`
@@ -140,6 +139,44 @@ type MongoConnConfig struct {
140139
Direct bool `mapstructure:"direct" toml:"direct" json:"direct"`
141140
}
142141

142+
func (cfg MongoConnConfig) URI() string {
143+
username := ""
144+
if cfg.Username != "" {
145+
username = cfg.Username
146+
}
147+
148+
password := ""
149+
if cfg.Password != "" {
150+
password = cfg.Password
151+
}
152+
153+
host := "localhost"
154+
if cfg.Host != "" {
155+
host = cfg.Host
156+
}
157+
158+
port := 27017
159+
if cfg.Port != 0 {
160+
port = cfg.Port
161+
}
162+
163+
db := cfg.Database
164+
165+
var url string
166+
if username == "" || password == "" {
167+
url = fmt.Sprintf("mongodb://%s:%d/%s", host, port, db)
168+
} else {
169+
url = fmt.Sprintf("mongodb://%s:%s@%s:%d/%s", username, password, host, port, db)
170+
}
171+
172+
//If not specified, the connection will timeout, probably because the replica set has not been initialized yet.
173+
if cfg.Direct {
174+
url += "?connect=direct"
175+
}
176+
177+
return url
178+
}
179+
143180
type TargetMySQLWorkerConfig struct {
144181
EnableDDL bool `toml:"enable-ddl" json:"enable-ddl"`
145182
UseBidirection bool `toml:"use-bidirection" json:"use-bidirection"`

pkg/config/mysql.go

+76
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
*
3+
* // Copyright 2019 , Beijing Mobike Technology Co., Ltd.
4+
* //
5+
* // Licensed under the Apache License, Version 2.0 (the "License");
6+
* // you may not use this file except in compliance with the License.
7+
* // You may obtain a copy of the License at
8+
* //
9+
* // http://www.apache.org/licenses/LICENSE-2.0
10+
* //
11+
* // Unless required by applicable law or agreed to in writing, software
12+
* // distributed under the License is distributed on an "AS IS" BASIS,
13+
* // See the License for the specific language governing permissions and
14+
* // limitations under the License.
15+
*/
16+
17+
package config
18+
19+
import (
20+
"time"
21+
22+
"github.com/juju/errors"
23+
)
24+
25+
// DBConfig is the DB configuration.
26+
type DBConfig struct {
27+
Host string `toml:"host" json:"host" mapstructure:"host"`
28+
Location string `toml:"location" json:"location" mapstructure:"location"`
29+
Username string `toml:"username" json:"username" mapstructure:"username"`
30+
Password string `toml:"password" json:"password" mapstructure:"password"`
31+
Port int `toml:"port" json:"port" mapstructure:"port"`
32+
Schema string `toml:"schema" json:"schema" mapstructure:"schema"`
33+
MaxIdle int `toml:"max-idle" json:"max-idle" mapstructure:"max-idle"`
34+
MaxOpen int `toml:"max-open" json:"max-open" mapstructure:"max-open"`
35+
MaxLifeTimeDurationStr string `toml:"max-life-time-duration" json:"max-life-time-duration" mapstructure:"max-life-time-duration"`
36+
MaxLifeTimeDuration time.Duration `toml:"-" json:"-" mapstructure:"-"`
37+
}
38+
39+
func (dbc *DBConfig) ValidateAndSetDefault() error {
40+
// Sets the location for time.Time values (when using parseTime=true). "Local" sets the system's location. See time.LoadLocation for details.
41+
// Note that this sets the location for time.Time values but does not change MySQL's time_zone setting.
42+
// For that see the time_zone system variable, which can also be set as a DSN parameter.
43+
if dbc.Location == "" {
44+
dbc.Location = time.Local.String()
45+
}
46+
47+
// set default values of connection related settings
48+
// assume the response time of db is 2ms, then
49+
// then a single connection can have tps of 500 TPS
50+
if dbc.MaxOpen == 0 {
51+
dbc.MaxOpen = 200
52+
}
53+
54+
if dbc.MaxIdle == 0 {
55+
dbc.MaxIdle = dbc.MaxOpen
56+
}
57+
58+
var err error
59+
if dbc.MaxLifeTimeDurationStr == "" {
60+
dbc.MaxLifeTimeDurationStr = "15m"
61+
dbc.MaxLifeTimeDuration = 15 * time.Minute
62+
} else {
63+
dbc.MaxLifeTimeDuration, err = time.ParseDuration(dbc.MaxLifeTimeDurationStr)
64+
if err != nil {
65+
return errors.Trace(err)
66+
}
67+
}
68+
69+
return nil
70+
}
71+
72+
type MySQLBinlogPosition struct {
73+
BinLogFileName string `toml:"binlog-name" json:"binlog-name" mapstructure:"binlog-name"`
74+
BinLogFilePos uint32 `toml:"binlog-pos" json:"binlog-pos" mapstructure:"binlog-pos"`
75+
BinlogGTID string `toml:"binlog-gtid" json:"binlog-gtid" mapstructure:"binlog-gtid"`
76+
}

pkg/config/table.go

-11
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
package config
22

3-
import "github.com/moiot/gravity/pkg/utils"
4-
53
type TableConfig struct {
64
Schema string `toml:"schema" json:"schema"`
75
Table string `toml:"table" json:"table"`
@@ -14,12 +12,3 @@ type TableConfig struct {
1412
ScanColumn string `toml:"scan-column" json:"scan-column"`
1513
ScanType string `toml:"scan-type" json:"scan-type"`
1614
}
17-
18-
func GetTableConfig(tableConfig []TableConfig, schema string, table string) *TableConfig {
19-
for i := range tableConfig {
20-
if utils.Glob(tableConfig[i].Schema, schema) && utils.Glob(tableConfig[i].Table, table) {
21-
return &tableConfig[i]
22-
}
23-
}
24-
return nil
25-
}

0 commit comments

Comments
 (0)