Skip to content

Commit

Permalink
Updated stream buffer to recalculate framing index after clearing str…
Browse files Browse the repository at this point in the history
…eam buffer on malformed packet
  • Loading branch information
ajmcquilkin committed Mar 10, 2024
1 parent ede2efb commit c360a82
Showing 1 changed file with 57 additions and 30 deletions.
87 changes: 57 additions & 30 deletions src/connections/stream_buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,30 +119,23 @@ impl StreamBuffer {
});
}

// All valid packets start with the sequence [0x94 0xc3 size_msb size_lsb], where
// size_msb and size_lsb collectively give the size of the incoming packet
// Note that the maximum packet size currently stands at 240 bytes, meaning an MSB is not needed
let framing_index = match self.buffer.iter().position(|&b| b == 0x94) {
Some(idx) => idx,
None => {
warn!("Could not find index of 0x94, purging buffer");
self.buffer.clear(); // Clear buffer since no packets exist
return Err(StreamBufferError::MissingHeaderByte);
}
};
let mut framing_index = self.get_framing_index()?;

// Drop beginning of buffer if the framing byte is found later in the buffer
// It is not possible to make a valid packet if the framing byte is not at the beginning
// Drop beginning of buffer if the framing byte is found later in the buffer.
// It is not possible to make a valid packet if the framing byte is not at the beginning.
// We do this because the framing byte is the only way to determine the start of a packet,
// and it needs to be updated after each removal of a malformed packet.
// ! This needs to be done before the framing byte is accessed, as not doing so blocks
// ! the processing of valid packets when an invalid packet is at the beginning of the buffer
// ! For example, having 0xc3 as the first byte in the buffer followed by a valid packet will break
if framing_index > 0 {
while framing_index > 0 {
debug!(
"Found framing byte at index {}, shifting buffer",
framing_index
);

self.buffer = self.buffer[framing_index..].to_vec();
framing_index = self.get_framing_index()?;
}

// Get the "framing byte" after the start of the packet header, or fail if not found
Expand All @@ -152,7 +145,7 @@ impl StreamBuffer {
debug!("Could not find framing byte, waiting for more data");
return Err(StreamBufferError::IncompletePacket {
buffer_size: self.buffer.len(),
packet_size: 4, // ? Why 4?
packet_size: 4,
});
}
};
Expand Down Expand Up @@ -235,18 +228,45 @@ impl StreamBuffer {

Ok(decoded_packet)
}

// All valid packets start with the sequence [0x94 0xc3 size_msb size_lsb], where
// size_msb and size_lsb collectively give the size of the incoming packet
// Note that the maximum packet size currently stands at 240 bytes, meaning an MSB is not needed
fn get_framing_index(&mut self) -> Result<usize, StreamBufferError> {
match self.buffer.iter().position(|&b| b == 0x94) {
Some(idx) => Ok(idx),
None => {
warn!("Could not find index of 0x94, purging buffer");
self.buffer.clear(); // Clear buffer since no packets exist
Err(StreamBufferError::MissingHeaderByte)
}
}
}
}

#[cfg(test)]
mod tests {
use std::time::Duration;

use crate::{protobufs, utils_internal::format_data_packet};
use futures_util::FutureExt;
use prost::Message;
use tokio::sync::mpsc::unbounded_channel;

use super::*;

async fn timeout_test<F, T>(future: F, timeout: impl Into<Option<Duration>>) -> T
where
F: FutureExt<Output = T> + Send,
{
let timeout_opt: Option<Duration> = timeout.into();
let timeout_duration = timeout_opt.unwrap_or(Duration::from_millis(100));

tokio::time::timeout(timeout_duration, future)
.await
.expect("Future timed out")
}

fn mock_encoded_from_radio_packet(
id: u32,
payload_variant: protobufs::from_radio::PayloadVariant,
Expand Down Expand Up @@ -279,7 +299,7 @@ mod tests {
let mut buffer = StreamBuffer::new(mock_tx);
buffer.process_incoming_bytes(encoded_packet.unwrap().data().into());

assert_eq!(mock_rx.recv().await.unwrap(), packet);
assert_eq!(timeout_test(mock_rx.recv(), None).await, Some(packet));
assert_eq!(buffer.buffer.len(), 0);
}

Expand Down Expand Up @@ -313,37 +333,44 @@ mod tests {
buffer.buffer.append(&mut encoded_packet1.data_vec());
buffer.process_incoming_bytes(encoded_packet2.data().into());

assert_eq!(mock_rx.recv().await.unwrap(), packet1);
assert_eq!(mock_rx.recv().await.unwrap(), packet2);
assert_eq!(timeout_test(mock_rx.recv(), None).await, Some(packet1));
assert_eq!(timeout_test(mock_rx.recv(), None).await, Some(packet2));

assert_eq!(buffer.buffer.len(), 0);
}

#[tokio::test]
async fn should_purge_buffer_before_testing_framing_byte() {
let mut malformed_packet_encoding = vec![0xc3, 0x00, 0x01, 0xff];
let payload_variant1 =
protobufs::from_radio::PayloadVariant::MyInfo(protobufs::MyNodeInfo {
my_node_num: 1,
..Default::default()
});

let payload_variant =
let payload_variant2 =
protobufs::from_radio::PayloadVariant::MyInfo(protobufs::MyNodeInfo {
my_node_num: 1,
..Default::default()
});

let (_packet1, packet_data1) = mock_encoded_from_radio_packet(1, payload_variant1);
let (valid_packet, valid_packet_encoding) =
mock_encoded_from_radio_packet(1, payload_variant);
mock_encoded_from_radio_packet(1, payload_variant2);

let encoded_packet1 = format_data_packet(packet_data1.into()).unwrap();
let encoded_packet2 = format_data_packet(valid_packet_encoding.into()).unwrap();

let encoded_packet = format_data_packet(valid_packet_encoding.into()).unwrap();
// Remove first byte from encoded_packet1 to simulate a malformed packet
let mut malformed_packet_encoding = encoded_packet1.data_vec();
malformed_packet_encoding.remove(0);

let (mock_tx, mut mock_rx) = unbounded_channel::<protobufs::FromRadio>();

let mut buffer = StreamBuffer::new(mock_tx);
buffer.buffer.append(&mut malformed_packet_encoding);
buffer.process_incoming_bytes(encoded_packet.data().into());

assert_eq!(
tokio::time::timeout(Duration::from_millis(100), mock_rx.recv())
.await
.unwrap(),
Some(valid_packet)
);

buffer.process_incoming_bytes(encoded_packet2.data().into());

assert_eq!(timeout_test(mock_rx.recv(), None).await, Some(valid_packet));
}
}

0 comments on commit c360a82

Please sign in to comment.