-
Notifications
You must be signed in to change notification settings - Fork 1k
/
Copy pathtransaction_payload_event.go
154 lines (132 loc) · 4.14 KB
/
transaction_payload_event.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
package replication
import (
"encoding/binary"
"encoding/hex"
"fmt"
"io"
"github.com/go-mysql-org/go-mysql/mysql"
"github.com/klauspost/compress/zstd"
)
// On The Wire: Field Types
// See also binary_log::codecs::binary::Transaction_payload::fields in MySQL
// https://dev.mysql.com/doc/dev/mysql-server/latest/classbinary__log_1_1codecs_1_1binary_1_1Transaction__payload.html#a9fff7ac12ba064f40e9216565c53d07b
const (
OTW_PAYLOAD_HEADER_END_MARK = iota
OTW_PAYLOAD_SIZE_FIELD
OTW_PAYLOAD_COMPRESSION_TYPE_FIELD
OTW_PAYLOAD_UNCOMPRESSED_SIZE_FIELD
)
// Compression Types
const (
ZSTD = 0
NONE = 255
)
type TransactionPayloadEvent struct {
format FormatDescriptionEvent
Size uint64
UncompressedSize uint64
CompressionType uint64
Payload []byte
Events []*BinlogEvent
}
func (e *TransactionPayloadEvent) compressionType() string {
switch e.CompressionType {
case ZSTD:
return "ZSTD"
case NONE:
return "NONE"
default:
return "Unknown"
}
}
func (e *TransactionPayloadEvent) Dump(w io.Writer) {
fmt.Fprintf(w, "Payload Size: %d\n", e.Size)
fmt.Fprintf(w, "Payload Uncompressed Size: %d\n", e.UncompressedSize)
fmt.Fprintf(w, "Payload CompressionType: %s\n", e.compressionType())
fmt.Fprintf(w, "Payload Body: \n%s", hex.Dump(e.Payload))
fmt.Fprintln(w, "=== Start of events decoded from compressed payload ===")
for _, event := range e.Events {
event.Dump(w)
}
fmt.Fprintln(w, "=== End of events decoded from compressed payload ===")
fmt.Fprintln(w)
}
func (e *TransactionPayloadEvent) Decode(data []byte) error {
err := e.decodeFields(data)
if err != nil {
return err
}
return e.decodePayload()
}
func (e *TransactionPayloadEvent) decodeFields(data []byte) error {
offset := uint64(0)
for {
fieldType := mysql.FixedLengthInt(data[offset : offset+1])
offset++
if fieldType == OTW_PAYLOAD_HEADER_END_MARK {
e.Payload = data[offset:]
break
} else {
fieldLength := mysql.FixedLengthInt(data[offset : offset+1])
offset++
switch fieldType {
case OTW_PAYLOAD_SIZE_FIELD:
e.Size = mysql.FixedLengthInt(data[offset : offset+fieldLength])
case OTW_PAYLOAD_COMPRESSION_TYPE_FIELD:
e.CompressionType = mysql.FixedLengthInt(data[offset : offset+fieldLength])
case OTW_PAYLOAD_UNCOMPRESSED_SIZE_FIELD:
e.UncompressedSize = mysql.FixedLengthInt(data[offset : offset+fieldLength])
}
offset += fieldLength
}
}
return nil
}
func (e *TransactionPayloadEvent) decodePayload() error {
if e.CompressionType != ZSTD {
return fmt.Errorf("TransactionPayloadEvent has compression type %d (%s)",
e.CompressionType, e.compressionType())
}
decoder, err := zstd.NewReader(nil, zstd.WithDecoderConcurrency(0))
if err != nil {
return err
}
defer decoder.Close()
payloadUncompressed, err := decoder.DecodeAll(e.Payload, nil)
if err != nil {
return err
}
// The uncompressed data needs to be split up into individual events for Parse()
// to work on them. We can't use e.parser directly as we need to disable checksums
// but we still need the initialization from the FormatDescriptionEvent. We can't
// modify e.parser as it is used elsewhere.
parser := NewBinlogParser()
parser.format = &FormatDescriptionEvent{
Version: e.format.Version,
ServerVersion: e.format.ServerVersion,
CreateTimestamp: e.format.CreateTimestamp,
EventHeaderLength: e.format.EventHeaderLength,
EventTypeHeaderLengths: e.format.EventTypeHeaderLengths,
ChecksumAlgorithm: BINLOG_CHECKSUM_ALG_OFF,
}
offset := uint32(0)
for {
payloadUncompressedLength := uint32(len(payloadUncompressed))
if offset+13 > payloadUncompressedLength {
break
}
eventLength := binary.LittleEndian.Uint32(payloadUncompressed[offset+9 : offset+13])
if offset+eventLength > payloadUncompressedLength {
return fmt.Errorf("Event length of %d with offset %d in uncompressed payload exceeds payload length of %d",
eventLength, offset, payloadUncompressedLength)
}
data := payloadUncompressed[offset : offset+eventLength]
pe, err := parser.Parse(data)
if err != nil {
return err
}
e.Events = append(e.Events, pe)
offset += eventLength
}
return nil
}