-
Notifications
You must be signed in to change notification settings - Fork 8
/
main.py
55 lines (48 loc) · 1.48 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
import json
from pymysqlreplication import BinLogStreamReader
from pymysqlreplication.row_event import (
DeleteRowsEvent,
UpdateRowsEvent,
WriteRowsEvent,
)
from utils import concat_sql_from_binlog_event
import pymysql
import os
import sys
import logging
# Logging
logging.basicConfig(
stream=sys.stdout,
level=logging.INFO,
format="%(levelname)s %(message)s")
def main(mysqlConfigs):
conn = pymysql.connect(**mysqlConfigs)
cursor = conn.cursor()
stream = BinLogStreamReader(
connection_settings = mysqlConfigs,
server_id=100,
blocking=True,
resume_stream=True,
only_events=[DeleteRowsEvent, WriteRowsEvent, UpdateRowsEvent])
for binlogevent in stream:
e_start_pos, last_pos = stream.log_pos, stream.log_pos
for row in binlogevent.rows:
event = {"schema": binlogevent.schema,
"table": binlogevent.table,
"type": type(binlogevent).__name__,
"row": row
}
#if isinstance(binlog_event, QueryEvent) and binlog_event.query == 'BEGIN':
# e_start_pos = last_pos
print("/*", json.dumps(event), "*/")
print(concat_sql_from_binlog_event(cursor=cursor, binlog_event=binlogevent, row=row, e_start_pos=e_start_pos))
print()
if __name__ == "__main__":
mysqlConfigs = {
"host": os.getenv('MYSQL_HOST'),
"port": int(os.getenv('MYSQL_PORT')),
"user": os.getenv('MYSQL_USER'),
"passwd": os.getenv('MYSQL_PASSWORD'),
'db': os.getenv('MYSQL_DATABASE'),
}
main(mysqlConfigs)