Skip to content

Commit

Permalink
continue searching for newline characters
Browse files Browse the repository at this point in the history
  • Loading branch information
elwerene committed Dec 10, 2021
1 parent 38dd74e commit 79cf5a9
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 2 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "changes-stream2"
description = "couchdb follower"
version = "0.2.2"
version = "0.2.3"
authors = [
"Ashley Williams <[email protected]>",
"René Rössler <[email protected]>",
Expand Down
9 changes: 8 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ pub struct ChangesStream {
buffer: Vec<u8>,
/// Source of http chunks provided by reqwest
source: Pin<Box<dyn Stream<Item = reqwest::Result<Bytes>> + Send>>,
/// Search pos for newline
newline_search_pos: usize,
}

impl ChangesStream {
Expand Down Expand Up @@ -56,6 +58,7 @@ impl ChangesStream {
Ok(ChangesStream {
buffer: vec![],
source,
newline_search_pos: 0,
})
}
}
Expand All @@ -71,11 +74,13 @@ impl Stream for ChangesStream {
let line_break_pos = self
.buffer
.iter()
.skip(self.newline_search_pos)
.enumerate()
.find(|(_pos, b)| **b == 0x0A) // search for \n
.map(|(pos, _b)| pos);
.map(|(pos, _b)| self.newline_search_pos + pos);
if let Some(line_break_pos) = line_break_pos {
let mut line = self.buffer.drain(0..=line_break_pos).collect::<Vec<_>>();
self.newline_search_pos = 0;

if line.len() < 15 {
// skip prologue, epilogue and empty lines (continuous mode)
Expand Down Expand Up @@ -104,6 +109,8 @@ impl Stream for ChangesStream {

return Poll::Ready(Some(result));
} else {
self.newline_search_pos = self.buffer.len();

match Stream::poll_next(self.source.as_mut(), cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(None) => return Poll::Ready(None),
Expand Down

0 comments on commit 79cf5a9

Please sign in to comment.