Skip to content

Commit

Permalink
Move Streams type into stream module
Browse files Browse the repository at this point in the history
  • Loading branch information
djc committed Feb 27, 2019
1 parent 694b94a commit 92deb61
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 72 deletions.
73 changes: 1 addition & 72 deletions quinn-proto/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use crate::packet::{
PartialDecode, SpaceId, LONG_RESERVED_BITS, SHORT_RESERVED_BITS,
};
use crate::range_set::RangeSet;
use crate::stream::{self, ReadError, Stream, WriteError};
use crate::stream::{self, ReadError, Stream, Streams, WriteError};
use crate::transport_parameters::{self, TransportParameters};
use crate::{
frame, Directionality, EndpointConfig, Frame, Side, StreamId, Transmit, TransportError,
Expand Down Expand Up @@ -2991,77 +2991,6 @@ where
buf.into()
}

struct Streams {
// Set of streams that are currently open, or could be immediately opened by the peer
streams: FnvHashMap<StreamId, Stream>,
next_uni: u64,
next_bi: u64,
// Locally initiated
max_uni: u64,
max_bi: u64,
// Maximum that can be remotely initiated
max_remote_uni: u64,
max_remote_bi: u64,
// Lowest that hasn't actually been opened
next_remote_uni: u64,
next_remote_bi: u64,
// Next to report to the application, once opened
next_reported_remote_uni: u64,
next_reported_remote_bi: u64,
}

impl Streams {
fn read(&mut self, id: StreamId, buf: &mut [u8]) -> Result<(usize, bool), ReadError> {
let rs = self.get_recv_mut(id).ok_or(ReadError::UnknownStream)?;
Ok((rs.read(buf)?, rs.receiving_unknown_size()))
}

fn read_unordered(&mut self, id: StreamId) -> Result<(Bytes, u64, bool), ReadError> {
let rs = self.get_recv_mut(id).ok_or(ReadError::UnknownStream)?;
let (buf, len) = rs.read_unordered()?;
Ok((buf, len, rs.receiving_unknown_size()))
}

fn get_recv_stream(
&mut self,
side: Side,
id: StreamId,
) -> Result<Option<&mut Stream>, TransportError> {
if side == id.initiator() {
match id.directionality() {
Directionality::Uni => {
return Err(TransportError::STREAM_STATE_ERROR(
"illegal operation on send-only stream",
));
}
Directionality::Bi if id.index() >= self.next_bi => {
return Err(TransportError::STREAM_STATE_ERROR(
"operation on unopened stream",
));
}
Directionality::Bi => {}
};
} else {
let limit = match id.directionality() {
Directionality::Bi => self.max_remote_bi,
Directionality::Uni => self.max_remote_uni,
};
if id.index() >= limit {
return Err(TransportError::STREAM_LIMIT_ERROR(""));
}
}
Ok(self.streams.get_mut(&id))
}

fn get_recv_mut(&mut self, id: StreamId) -> Option<&mut stream::Recv> {
self.streams.get_mut(&id)?.recv_mut()
}

fn get_send_mut(&mut self, id: StreamId) -> Option<&mut stream::Send> {
self.streams.get_mut(&id)?.send_mut()
}
}

/// Retransmittable data queue
#[derive(Debug, Clone)]
struct Retransmits {
Expand Down
73 changes: 73 additions & 0 deletions quinn-proto/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,81 @@ use std::collections::VecDeque;

use bytes::Bytes;
use err_derive::Error;
use fnv::FnvHashMap;

use crate::range_set::RangeSet;
use crate::{Directionality, Side, StreamId, TransportError};

pub struct Streams {
// Set of streams that are currently open, or could be immediately opened by the peer
pub streams: FnvHashMap<StreamId, Stream>,
pub next_uni: u64,
pub next_bi: u64,
// Locally initiated
pub max_uni: u64,
pub max_bi: u64,
// Maximum that can be remotely initiated
pub max_remote_uni: u64,
pub max_remote_bi: u64,
// Lowest that hasn't actually been opened
pub next_remote_uni: u64,
pub next_remote_bi: u64,
// Next to report to the application, once opened
pub next_reported_remote_uni: u64,
pub next_reported_remote_bi: u64,
}

impl Streams {
pub fn read(&mut self, id: StreamId, buf: &mut [u8]) -> Result<(usize, bool), ReadError> {
let rs = self.get_recv_mut(id).ok_or(ReadError::UnknownStream)?;
Ok((rs.read(buf)?, rs.receiving_unknown_size()))
}

pub fn read_unordered(&mut self, id: StreamId) -> Result<(Bytes, u64, bool), ReadError> {
let rs = self.get_recv_mut(id).ok_or(ReadError::UnknownStream)?;
let (buf, len) = rs.read_unordered()?;
Ok((buf, len, rs.receiving_unknown_size()))
}

pub fn get_recv_stream(
&mut self,
side: Side,
id: StreamId,
) -> Result<Option<&mut Stream>, TransportError> {
if side == id.initiator() {
match id.directionality() {
Directionality::Uni => {
return Err(TransportError::STREAM_STATE_ERROR(
"illegal operation on send-only stream",
));
}
Directionality::Bi if id.index() >= self.next_bi => {
return Err(TransportError::STREAM_STATE_ERROR(
"operation on unopened stream",
));
}
Directionality::Bi => {}
};
} else {
let limit = match id.directionality() {
Directionality::Bi => self.max_remote_bi,
Directionality::Uni => self.max_remote_uni,
};
if id.index() >= limit {
return Err(TransportError::STREAM_LIMIT_ERROR(""));
}
}
Ok(self.streams.get_mut(&id))
}

pub fn get_recv_mut(&mut self, id: StreamId) -> Option<&mut Recv> {
self.streams.get_mut(&id)?.recv_mut()
}

pub fn get_send_mut(&mut self, id: StreamId) -> Option<&mut Send> {
self.streams.get_mut(&id)?.send_mut()
}
}

#[derive(Debug)]
pub enum Stream {
Expand Down

0 comments on commit 92deb61

Please sign in to comment.