forked from rusuly/mysql_cdc
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbinlog_reader.rs
76 lines (64 loc) · 2.33 KB
/
binlog_reader.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
use crate::constants;
use crate::errors::Error;
use crate::events::binlog_event::BinlogEvent;
use crate::events::event_header::EventHeader;
use crate::events::event_parser::EventParser;
use constants::EVENT_HEADER_SIZE;
use std::fs::File;
use std::io::{ErrorKind, Read};
const MAGIC_NUMBER: [u8; constants::FIRST_EVENT_POSITION] = [0xfe, 0x62, 0x69, 0x6e];
/// Reads binlog events from a stream.
pub struct BinlogReader {
stream: File,
parser: EventParser,
payload_buffer: Vec<u8>,
}
impl BinlogReader {
pub fn new(mut stream: File) -> Result<Self, Error> {
let mut header = [0; constants::FIRST_EVENT_POSITION];
stream.read_exact(&mut header)?;
if header != MAGIC_NUMBER {
return Err(Error::String("Invalid binary log file header".to_string()));
}
Ok(Self {
stream,
parser: EventParser::new(),
payload_buffer: vec![0; constants::PAYLOAD_BUFFER_SIZE],
})
}
pub fn read_events(self) -> Self {
self
}
pub fn read_event(&mut self) -> Result<(EventHeader, BinlogEvent), Error> {
// Parse header
let mut header_buffer = [0; EVENT_HEADER_SIZE];
self.stream.read_exact(&mut header_buffer)?;
let header = EventHeader::parse(&header_buffer)?;
let payload_length = header.event_length as usize - EVENT_HEADER_SIZE;
if payload_length as usize > constants::PAYLOAD_BUFFER_SIZE {
let mut vec: Vec<u8> = vec![0; payload_length];
self.stream.read_exact(&mut vec)?;
let binlog_event = self.parser.parse_event(&header, &vec)?;
Ok((header, binlog_event))
} else {
let slice = &mut self.payload_buffer[0..payload_length];
self.stream.read_exact(slice)?;
let binlog_event = self.parser.parse_event(&header, slice)?;
Ok((header, binlog_event))
}
}
}
impl Iterator for BinlogReader {
type Item = Result<(EventHeader, BinlogEvent), Error>;
fn next(&mut self) -> Option<Self::Item> {
let result = self.read_event();
if let Err(error) = &result {
if let Error::IoError(io_error) = error {
if let ErrorKind::UnexpectedEof = io_error.kind() {
return None;
}
}
}
Some(result)
}
}