-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathriver_extra_test.go
114 lines (85 loc) · 2.71 KB
/
river_extra_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
package river
import (
"fmt"
//"net/http"
//"net/url"
"os"
"time"
. "github.com/pingcap/check"
)
func (s *riverTestSuite) setupExtra(c *C) (r *River) {
var err error
schema := `
CREATE TABLE IF NOT EXISTS %s (
id INT,
title VARCHAR(256),
pid INT,
PRIMARY KEY(id)) ENGINE=INNODB;
`
s.testExecute(c, "DROP TABLE IF EXISTS test_river_extra")
s.testExecute(c, fmt.Sprintf(schema, "test_river_extra"))
schema = `
CREATE TABLE IF NOT EXISTS %s (
id INT,
PRIMARY KEY(id)) ENGINE=INNODB;
`
s.testExecute(c, "DROP TABLE IF EXISTS test_river_parent")
s.testExecute(c, fmt.Sprintf(schema, "test_river_parent"))
cfg := new(Config)
cfg.MyAddr = *myAddr
cfg.MyUser = "root"
cfg.MyPassword = ""
// cfg.RedisAddr = *esAddr
cfg.ServerID = 1001
cfg.Flavor = "mysql"
cfg.DataDir = "./test_river_extra"
cfg.DumpExec = "mysqldump"
cfg.StatAddr = "127.0.0.1:12800"
cfg.FlushBulkTime = TomlDuration{3 * time.Millisecond}
os.RemoveAll(cfg.DataDir)
cfg.Sources = []SourceConfig{SourceConfig{Schema: "test", Tables: []string{"test_river_extra", "test_river_parent"}}}
cfg.Rules = []*Rule{
&Rule{Schema: "test",
Table: "test_river_parent"},
&Rule{Schema: "test",
Table: "test_river_extra"}}
r, err = NewRiver(cfg)
c.Assert(err, IsNil)
return r
}
func (s *riverTestSuite) testPrepareExtraData(c *C) {
s.testExecute(c, "INSERT INTO test_river_parent (id) VALUES (?)", 1)
s.testExecute(c, "INSERT INTO test_river_extra (id, title, pid) VALUES (?, ?, ?)", 1, "first", 1)
s.testExecute(c, "INSERT INTO test_river_extra (id, title, pid) VALUES (?, ?, ?)", 2, "second", 1)
s.testExecute(c, "INSERT INTO test_river_extra (id, title, pid) VALUES (?, ?, ?)", 3, "third", 1)
s.testExecute(c, "INSERT INTO test_river_extra (id, title, pid) VALUES (?, ?, ?)", 4, "fourth", 1)
}
/**
func (s *riverTestSuite) testElasticExtraExists(c *C, id string, parent string, exist bool) {
index := "river"
docType := "river_extra"
reqURL := fmt.Sprintf("http://%s/%s/%s/%s?parent=%s", s.r.es.Addr,
url.QueryEscape(index),
url.QueryEscape(docType),
url.QueryEscape(id),
url.QueryEscape(parent))
r, err := s.r.es.Do("HEAD", reqURL, nil)
c.Assert(err, IsNil)
if exist {
c.Assert(r.Code, Equals, http.StatusOK)
} else {
c.Assert(r.Code, Equals, http.StatusNotFound)
}
}
func (s *riverTestSuite) TestRiverWithParent(c *C) {
river := s.setupExtra(c)
defer river.Close()
s.testPrepareExtraData(c)
go func() { river.Run() }()
testWaitSyncDone(c, river)
s.testElasticExtraExists(c, "1", "1", true)
s.testExecute(c, "DELETE FROM test_river_extra WHERE id = ?", 1)
testWaitSyncDone(c, river)
s.testElasticExtraExists(c, "1", "1", false)
}
*/