Skip to content

Commit

Permalink
capture RowsQueryEvents; add annotations to DML events
Browse files Browse the repository at this point in the history
  • Loading branch information
fjordan committed Apr 20, 2020
1 parent 4ba7d1c commit 09cd8b1
Show file tree
Hide file tree
Showing 8 changed files with 242 additions and 41 deletions.
19 changes: 16 additions & 3 deletions binlog_streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@ func (s *BinlogStreamer) Run() {
s.binlogSyncer.Close()
}()

var query []byte

currentFilename := s.lastStreamedBinlogPosition.Name
nextFilename := s.lastStreamedBinlogPosition.Name

Expand Down Expand Up @@ -210,8 +212,14 @@ func (s *BinlogStreamer) Run() {
// the mysql source. See:
// https://github.com/percona/percona-server/blob/93165de1451548ff11dd32c3d3e5df0ff28cfcfa/sql/rpl_binlog_sender.cc#L1020-L1026
isEventPositionValid = ev.Header.LogPos != 0
case *replication.RowsQueryEvent:
// A RowsQueryEvent will always precede the corresponding RowsEvent
// if binlog_rows_query_log_events is enabled, and is used to get
// the full query that was executed on the master (with annotations)
// that is otherwise not possible to reconstruct
query = ev.Event.(*replication.RowsQueryEvent).Query
case *replication.RowsEvent:
err = s.handleRowsEvent(ev)
err = s.handleRowsEvent(ev, query)
if err != nil {
s.logger.WithError(err).Error("failed to handle rows event")
s.ErrorHandler.Fatal("binlog_streamer", err)
Expand Down Expand Up @@ -242,6 +250,11 @@ func (s *BinlogStreamer) Run() {
// interruption to EITHER the start (if using GTIDs) or the end of the
// last transaction
isEventPositionResumable = true

// Here we also reset the query event as we are either at the beginning
// or the end of the current/next transaction. As such, the query will be
// reset following the next RowsQueryEvent before the corresponding RowsEvent(s)
query = nil
}

if isEventPositionValid {
Expand Down Expand Up @@ -313,7 +326,7 @@ func (s *BinlogStreamer) updateLastStreamedPosAndTime(evTimestamp uint32, evPos
}
}

func (s *BinlogStreamer) handleRowsEvent(ev *replication.BinlogEvent) error {
func (s *BinlogStreamer) handleRowsEvent(ev *replication.BinlogEvent, query []byte) error {
eventTime := time.Unix(int64(ev.Header.Timestamp), 0)
rowsEvent := ev.Event.(*replication.RowsEvent)

Expand All @@ -334,7 +347,7 @@ func (s *BinlogStreamer) handleRowsEvent(ev *replication.BinlogEvent) error {
return nil
}

dmlEvs, err := NewBinlogDMLEvents(table, ev, pos, s.lastResumableBinlogPosition)
dmlEvs, err := NewBinlogDMLEvents(table, ev, pos, s.lastResumableBinlogPosition, query)
if err != nil {
return err
}
Expand Down
6 changes: 3 additions & 3 deletions binlog_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (b *BinlogWriter) BufferBinlogEvents(events []DMLEvent) error {
func (b *BinlogWriter) writeEvents(events []DMLEvent) error {
WaitForThrottle(b.Throttler)

queryBuffer := []byte("BEGIN;\n")
queryBuffer := []byte(sql.AnnotateStmt("BEGIN;\n", b.DB.Marginalia))

for _, ev := range events {
eventDatabaseName := ev.Database()
Expand All @@ -91,12 +91,12 @@ func (b *BinlogWriter) writeEvents(events []DMLEvent) error {
eventTableName = targetTableName
}

sql, err := ev.AsSQLString(eventDatabaseName, eventTableName)
sqlStmt, err := ev.AsSQLString(eventDatabaseName, eventTableName)
if err != nil {
return fmt.Errorf("generating sql query at pos %v: %v", ev.BinlogPosition(), err)
}

queryBuffer = append(queryBuffer, sql...)
queryBuffer = append(queryBuffer, sql.AnnotateStmt(sqlStmt, b.DB.Marginalia)...)
queryBuffer = append(queryBuffer, ";\n"...)
}

Expand Down
53 changes: 43 additions & 10 deletions dml_events.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package ghostferry

import (
"errors"
"fmt"
"reflect"
"regexp"
"strconv"
"strings"

Expand All @@ -13,6 +15,8 @@ import (
"github.com/siddontang/go-mysql/schema"
)

var annotationRegex = regexp.MustCompile(`/\*(.*?)\*/`)

type RowData []interface{}

// The mysql driver never actually gives you a uint64 from Scan, instead you
Expand Down Expand Up @@ -45,13 +49,15 @@ type DMLEvent interface {
PaginationKey() (uint64, error)
BinlogPosition() mysql.Position
ResumableBinlogPosition() mysql.Position
Annotations() ([]string, error)
}

// The base of DMLEvent to provide the necessary methods.
type DMLEventBase struct {
table *TableSchema
pos mysql.Position
resumablePos mysql.Position
query []byte
}

func (e *DMLEventBase) Database() string {
Expand All @@ -74,18 +80,44 @@ func (e *DMLEventBase) ResumableBinlogPosition() mysql.Position {
return e.resumablePos
}

// Annotations will return all comments prefixed to the SQL string
func (e *DMLEventBase) Annotations() ([]string, error) {
if e.query == nil {
return nil, errors.New("could not get query from DML event")
}

captured := annotationRegex.FindAllStringSubmatch(string(e.query), -1)

var matches []string
for _, match := range captured {
if len(match) > 1 {
matches = append(matches, match[1])
}
}
return matches, nil
}

func NewDMLEventBase(table *TableSchema, pos, resumablePos mysql.Position, query []byte) *DMLEventBase {
return &DMLEventBase{
table: table,
pos: pos,
resumablePos: resumablePos,
query: query,
}
}

type BinlogInsertEvent struct {
newValues RowData
*DMLEventBase
}

func NewBinlogInsertEvents(table *TableSchema, rowsEvent *replication.RowsEvent, pos, resumablePos mysql.Position) ([]DMLEvent, error) {
func NewBinlogInsertEvents(eventBase *DMLEventBase, rowsEvent *replication.RowsEvent) ([]DMLEvent, error) {
insertEvents := make([]DMLEvent, len(rowsEvent.Rows))

for i, row := range rowsEvent.Rows {
insertEvents[i] = &BinlogInsertEvent{
newValues: row,
DMLEventBase: &DMLEventBase{table, pos, resumablePos},
DMLEventBase: eventBase,
}
}

Expand Down Expand Up @@ -123,7 +155,7 @@ type BinlogUpdateEvent struct {
*DMLEventBase
}

func NewBinlogUpdateEvents(table *TableSchema, rowsEvent *replication.RowsEvent, pos, resumablePos mysql.Position) ([]DMLEvent, error) {
func NewBinlogUpdateEvents(eventBase *DMLEventBase, rowsEvent *replication.RowsEvent) ([]DMLEvent, error) {
// UPDATE events have two rows in the RowsEvent. The first row is the
// entries of the old record (for WHERE) and the second row is the
// entries of the new record (for SET).
Expand All @@ -139,7 +171,7 @@ func NewBinlogUpdateEvents(table *TableSchema, rowsEvent *replication.RowsEvent,
updateEvents[i/2] = &BinlogUpdateEvent{
oldValues: row,
newValues: rowsEvent.Rows[i+1],
DMLEventBase: &DMLEventBase{table, pos, resumablePos},
DMLEventBase: eventBase,
}
}

Expand Down Expand Up @@ -183,13 +215,13 @@ func (e *BinlogDeleteEvent) NewValues() RowData {
return nil
}

func NewBinlogDeleteEvents(table *TableSchema, rowsEvent *replication.RowsEvent, pos, resumablePos mysql.Position) ([]DMLEvent, error) {
func NewBinlogDeleteEvents(eventBase *DMLEventBase, rowsEvent *replication.RowsEvent) ([]DMLEvent, error) {
deleteEvents := make([]DMLEvent, len(rowsEvent.Rows))

for i, row := range rowsEvent.Rows {
deleteEvents[i] = &BinlogDeleteEvent{
oldValues: row,
DMLEventBase: &DMLEventBase{table, pos, resumablePos},
DMLEventBase: eventBase,
}
}

Expand All @@ -211,7 +243,7 @@ func (e *BinlogDeleteEvent) PaginationKey() (uint64, error) {
return paginationKeyFromEventData(e.table, e.oldValues)
}

func NewBinlogDMLEvents(table *TableSchema, ev *replication.BinlogEvent, pos, resumablePos mysql.Position) ([]DMLEvent, error) {
func NewBinlogDMLEvents(table *TableSchema, ev *replication.BinlogEvent, pos, resumablePos mysql.Position, query []byte) ([]DMLEvent, error) {
rowsEvent := ev.Event.(*replication.RowsEvent)

for _, row := range rowsEvent.Rows {
Expand Down Expand Up @@ -242,13 +274,14 @@ func NewBinlogDMLEvents(table *TableSchema, ev *replication.BinlogEvent, pos, re
}
}

eventBase := NewDMLEventBase(table, pos, resumablePos, query)
switch ev.Header.EventType {
case replication.WRITE_ROWS_EVENTv1, replication.WRITE_ROWS_EVENTv2:
return NewBinlogInsertEvents(table, rowsEvent, pos, resumablePos)
return NewBinlogInsertEvents(eventBase, rowsEvent)
case replication.DELETE_ROWS_EVENTv1, replication.DELETE_ROWS_EVENTv2:
return NewBinlogDeleteEvents(table, rowsEvent, pos, resumablePos)
return NewBinlogDeleteEvents(eventBase, rowsEvent)
case replication.UPDATE_ROWS_EVENTv1, replication.UPDATE_ROWS_EVENTv2:
return NewBinlogUpdateEvents(table, rowsEvent, pos, resumablePos)
return NewBinlogUpdateEvents(eventBase, rowsEvent)
default:
return nil, fmt.Errorf("unrecognized rows event: %s", ev.Header.EventType.String())
}
Expand Down
3 changes: 3 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ mysql-1:
--collation-server=utf8mb4_unicode_ci
--max-connections=1000
--read-only=OFF
--binlog-rows-query-log-events=ON
environment:
MYSQL_ALLOW_EMPTY_PASSWORD: "yes"
volumes:
Expand All @@ -32,6 +33,7 @@ mysql-2:
--character-set-server=utf8mb4
--collation-server=utf8mb4_unicode_ci
--max-connections=1000
--binlog-rows-query-log-events=ON
environment:
MYSQL_ALLOW_EMPTY_PASSWORD: "yes"
volumes:
Expand All @@ -52,6 +54,7 @@ mysql-3:
--character-set-server=utf8mb4
--collation-server=utf8mb4_unicode_ci
--max-connections=1000
--binlog-rows-query-log-events=ON
environment:
MYSQL_ALLOW_EMPTY_PASSWORD: "yes"
volumes:
Expand Down
18 changes: 16 additions & 2 deletions sharding/test/copy_filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,16 +141,30 @@ func (t *CopyFilterTestSuite) TestShardingValueTypes() {
int64(1), int32(1), int16(1), int8(1), int(1),
}

eventBase := ghostferry.NewDMLEventBase(
t.normalTable,
mysql.Position{},
mysql.Position{},
nil,
)

for _, tenantId := range tenantIds {
dmlEvents, _ := ghostferry.NewBinlogInsertEvents(t.normalTable, t.newRowsEvent([]interface{}{1001, tenantId, "data"}), mysql.Position{}, mysql.Position{})
dmlEvents, _ := ghostferry.NewBinlogInsertEvents(eventBase, t.newRowsEvent([]interface{}{1001, tenantId, "data"}))
applicable, err := t.filter.ApplicableEvent(dmlEvents[0])
t.Require().Nil(err)
t.Require().True(applicable, fmt.Sprintf("value %t wasn't applicable", tenantId))
}
}

func (t *CopyFilterTestSuite) TestInvalidShardingValueTypesErrors() {
dmlEvents, err := ghostferry.NewBinlogInsertEvents(t.normalTable, t.newRowsEvent([]interface{}{1001, string("1"), "data"}), mysql.Position{}, mysql.Position{})
eventBase := ghostferry.NewDMLEventBase(
t.normalTable,
mysql.Position{},
mysql.Position{},
nil,
)

dmlEvents, err := ghostferry.NewBinlogInsertEvents(eventBase, t.newRowsEvent([]interface{}{1001, string("1"), "data"}))
_, err = t.filter.ApplicableEvent(dmlEvents[0])
t.Require().Equal("parsing new sharding key: invalid type %!t(string=1)", err.Error())
}
Expand Down
27 changes: 16 additions & 11 deletions sqlwrapper/ghostferry_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (

type DB struct {
*sqlorig.DB
marginalia string
Marginalia string
}

type Tx struct {
Expand All @@ -22,23 +22,23 @@ func Open(driverName, dataSourceName, marginalia string) (*DB, error) {
}

func (db DB) PrepareContext(ctx context.Context, query string) (*sqlorig.Stmt, error) {
return db.DB.PrepareContext(ctx, Annotate(query, db.marginalia))
return db.DB.PrepareContext(ctx, AnnotateStmt(query, db.Marginalia))
}

func (db DB) ExecContext(ctx context.Context, query string, args ...interface{}) (sqlorig.Result, error) {
return db.DB.ExecContext(ctx, Annotate(query, db.marginalia), args...)
return db.DB.ExecContext(ctx, AnnotateStmt(query, db.Marginalia), args...)
}

func (db DB) QueryContext(ctx context.Context, query string, args ...interface{}) (*sqlorig.Rows, error) {
return db.DB.QueryContext(ctx, query, args...)
}

func (db DB) Exec(query string, args ...interface{}) (sqlorig.Result, error) {
return db.DB.Exec(Annotate(query, db.marginalia), args...)
return db.DB.Exec(AnnotateStmt(query, db.Marginalia), args...)
}

func (db DB) Prepare(query string) (*sqlorig.Stmt, error) {
return db.DB.Prepare(Annotate(query, db.marginalia))
return db.DB.Prepare(AnnotateStmt(query, db.Marginalia))
}

func (db DB) Query(query string, args ...interface{}) (*sqlorig.Rows, error) {
Expand All @@ -55,23 +55,23 @@ func (db DB) QueryRowContext(ctx context.Context, query string, args ...interfac

func (db DB) Begin() (*Tx, error) {
tx, err := db.DB.Begin()
return &Tx{tx, db.marginalia}, err
return &Tx{tx, db.Marginalia}, err
}

func (tx Tx) ExecContext(ctx context.Context, query string, args ...interface{}) (sqlorig.Result, error) {
return tx.Tx.ExecContext(ctx, Annotate(query, tx.marginalia), args...)
return tx.Tx.ExecContext(ctx, AnnotateStmt(query, tx.marginalia), args...)
}

func (tx Tx) Exec(query string, args ...interface{}) (sqlorig.Result, error) {
return tx.Tx.Exec(Annotate(query, tx.marginalia), args...)
return tx.Tx.Exec(AnnotateStmt(query, tx.marginalia), args...)
}

func (tx Tx) Prepare(query string) (*sqlorig.Stmt, error) {
return tx.Tx.Prepare(Annotate(query, tx.marginalia))
return tx.Tx.Prepare(AnnotateStmt(query, tx.marginalia))
}

func (tx Tx) PrepareContext(ctx context.Context, query string) (*sqlorig.Stmt, error) {
return tx.Tx.PrepareContext(ctx, Annotate(query, tx.marginalia))
return tx.Tx.PrepareContext(ctx, AnnotateStmt(query, tx.marginalia))
}

func (tx Tx) QueryContext(ctx context.Context, query string, args ...interface{}) (*sqlorig.Rows, error) {
Expand All @@ -90,6 +90,11 @@ func (tx Tx) QueryRow(query string, args ...interface{}) *sqlorig.Row {
return tx.Tx.QueryRow(query, args...)
}

func Annotate(query, marginalia string) string {
// AnnotateStmt annotates a single SQL statement with the configured marginalia.
//
// *NOTE*
// This is NOT SAFE to use with multiple SQL statements as it naively annotates
// the single query string provided and does not attempt to parse the provided SQL
func AnnotateStmt(query, marginalia string) string {
return fmt.Sprintf("/*%s*/ %s", marginalia, query)
}
Loading

0 comments on commit 09cd8b1

Please sign in to comment.