Skip to content

Commit

Permalink
H3: incoming UniStream header parsing and polling
Browse files Browse the repository at this point in the history
  • Loading branch information
stammw authored and djc committed Oct 4, 2019
1 parent e5a7e90 commit 248ec17
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 3 deletions.
3 changes: 3 additions & 0 deletions quinn-h3/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ pub mod qpack;
pub mod server;

mod frame;
mod streams;

use err_derive::Error;
use quinn::VarInt;
Expand Down Expand Up @@ -55,6 +56,8 @@ pub enum Error {
Internal(&'static str),
#[error(display = "Incorrect peer behavior: {}", _0)]
Peer(String),
#[error(display = "unknown stream type {}", _0)]
UnknownStream(u64),
#[error(display = "IO error: {}", _0)]
Io(std::io::Error),
#[error(display = "Overflow max data size")]
Expand Down
11 changes: 8 additions & 3 deletions quinn-h3/src/proto/mod.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use bytes::BufMut;
use quinn_proto::coding::BufMutExt;
use bytes::{Buf, BufMut};
use quinn_proto::coding::{BufExt, BufMutExt, UnexpectedEnd};

pub mod connection;
pub mod frame;
pub mod headers;

pub struct StreamType(u64);
#[derive(Debug, PartialEq, Eq)]
pub struct StreamType(pub u64);

macro_rules! stream_types {
{$($name:ident = $val:expr,)*} => {
Expand All @@ -26,4 +27,8 @@ impl StreamType {
pub fn encode<W: BufMut>(&self, buf: &mut W) {
buf.write_var(self.0);
}

pub fn decode<B: Buf>(buf: &mut B) -> Result<Self, UnexpectedEnd> {
Ok(StreamType(buf.get_var()?))
}
}
82 changes: 82 additions & 0 deletions quinn-h3/src/streams.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
use std::{convert::TryFrom, io, mem, pin::Pin, task::Context};

use bytes::BytesMut;
use futures::{io::AsyncRead, ready, Future, Poll};
use quinn::RecvStream;
use quinn_proto::VarInt;

use crate::{
frame::{FrameDecoder, FrameStream},
proto::StreamType,
Error,
};

pub enum NewUni {
Control(ControlStream),
Push(PushStream),
Encoder(RecvStream),
Decoder(RecvStream),
}

impl TryFrom<(StreamType, RecvStream)> for NewUni {
type Error = Error;
fn try_from(value: (StreamType, RecvStream)) -> Result<Self, Self::Error> {
let (ty, recv) = value;
Ok(match ty {
StreamType::CONTROL => NewUni::Control(ControlStream(FrameDecoder::stream(recv))),
StreamType::PUSH => NewUni::Push(PushStream(FrameDecoder::stream(recv))),
StreamType::ENCODER => NewUni::Encoder(recv),
StreamType::DECODER => NewUni::Decoder(recv),
_ => return Err(Error::UnknownStream(ty.0)),
})
}
}

pub struct RecvUni {
inner: Option<(RecvStream, Vec<u8>, usize)>,
}

impl RecvUni {
pub fn new(recv: RecvStream) -> Self {
Self {
inner: Some((recv, vec![0u8; VarInt::MAX.size()], 0)),
}
}
}

impl Future for RecvUni {
type Output = Result<NewUni, Error>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
loop {
match self.inner {
None => panic!("polled after resolved"),
Some((ref mut recv, ref mut buf, ref mut len)) => {
match ready!(Pin::new(recv).poll_read(cx, &mut buf[..*len + 1]))? {
0 => {
return Poll::Ready(Err(Error::Peer(format!(
"Uni stream closed before type received",
))))
}
_ => {
*len += 1;
let mut cur = io::Cursor::new(&buf);
if let Ok(ty) = StreamType::decode(&mut cur) {
match mem::replace(&mut self.inner, None) {
Some((recv, _, _)) => {
return Poll::Ready(NewUni::try_from((ty, recv)))
}
_ => unreachable!(),
};
};
}
}
}
}
}
}
}

pub struct ControlStream(FrameStream);

pub struct PushStream(FrameStream);

0 comments on commit 248ec17

Please sign in to comment.