From 248ec17f0316b3771d1d7a2f0f5be241b71c2810 Mon Sep 17 00:00:00 2001 From: stammw Date: Tue, 1 Oct 2019 22:53:24 +0200 Subject: [PATCH] H3: incoming UniStream header parsing and polling --- quinn-h3/src/lib.rs | 3 ++ quinn-h3/src/proto/mod.rs | 11 ++++-- quinn-h3/src/streams.rs | 82 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 93 insertions(+), 3 deletions(-) create mode 100644 quinn-h3/src/streams.rs diff --git a/quinn-h3/src/lib.rs b/quinn-h3/src/lib.rs index 33652fe09..d30acfee8 100644 --- a/quinn-h3/src/lib.rs +++ b/quinn-h3/src/lib.rs @@ -20,6 +20,7 @@ pub mod qpack; pub mod server; mod frame; +mod streams; use err_derive::Error; use quinn::VarInt; @@ -55,6 +56,8 @@ pub enum Error { Internal(&'static str), #[error(display = "Incorrect peer behavior: {}", _0)] Peer(String), + #[error(display = "unknown stream type {}", _0)] + UnknownStream(u64), #[error(display = "IO error: {}", _0)] Io(std::io::Error), #[error(display = "Overflow max data size")] diff --git a/quinn-h3/src/proto/mod.rs b/quinn-h3/src/proto/mod.rs index b22536fe2..e03c0064a 100644 --- a/quinn-h3/src/proto/mod.rs +++ b/quinn-h3/src/proto/mod.rs @@ -1,11 +1,12 @@ -use bytes::BufMut; -use quinn_proto::coding::BufMutExt; +use bytes::{Buf, BufMut}; +use quinn_proto::coding::{BufExt, BufMutExt, UnexpectedEnd}; pub mod connection; pub mod frame; pub mod headers; -pub struct StreamType(u64); +#[derive(Debug, PartialEq, Eq)] +pub struct StreamType(pub u64); macro_rules! stream_types { {$($name:ident = $val:expr,)*} => { @@ -26,4 +27,8 @@ impl StreamType { pub fn encode(&self, buf: &mut W) { buf.write_var(self.0); } + + pub fn decode(buf: &mut B) -> Result { + Ok(StreamType(buf.get_var()?)) + } } diff --git a/quinn-h3/src/streams.rs b/quinn-h3/src/streams.rs new file mode 100644 index 000000000..f56c81b9e --- /dev/null +++ b/quinn-h3/src/streams.rs @@ -0,0 +1,82 @@ +use std::{convert::TryFrom, io, mem, pin::Pin, task::Context}; + +use bytes::BytesMut; +use futures::{io::AsyncRead, ready, Future, Poll}; +use quinn::RecvStream; +use quinn_proto::VarInt; + +use crate::{ + frame::{FrameDecoder, FrameStream}, + proto::StreamType, + Error, +}; + +pub enum NewUni { + Control(ControlStream), + Push(PushStream), + Encoder(RecvStream), + Decoder(RecvStream), +} + +impl TryFrom<(StreamType, RecvStream)> for NewUni { + type Error = Error; + fn try_from(value: (StreamType, RecvStream)) -> Result { + let (ty, recv) = value; + Ok(match ty { + StreamType::CONTROL => NewUni::Control(ControlStream(FrameDecoder::stream(recv))), + StreamType::PUSH => NewUni::Push(PushStream(FrameDecoder::stream(recv))), + StreamType::ENCODER => NewUni::Encoder(recv), + StreamType::DECODER => NewUni::Decoder(recv), + _ => return Err(Error::UnknownStream(ty.0)), + }) + } +} + +pub struct RecvUni { + inner: Option<(RecvStream, Vec, usize)>, +} + +impl RecvUni { + pub fn new(recv: RecvStream) -> Self { + Self { + inner: Some((recv, vec![0u8; VarInt::MAX.size()], 0)), + } + } +} + +impl Future for RecvUni { + type Output = Result; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { + loop { + match self.inner { + None => panic!("polled after resolved"), + Some((ref mut recv, ref mut buf, ref mut len)) => { + match ready!(Pin::new(recv).poll_read(cx, &mut buf[..*len + 1]))? { + 0 => { + return Poll::Ready(Err(Error::Peer(format!( + "Uni stream closed before type received", + )))) + } + _ => { + *len += 1; + let mut cur = io::Cursor::new(&buf); + if let Ok(ty) = StreamType::decode(&mut cur) { + match mem::replace(&mut self.inner, None) { + Some((recv, _, _)) => { + return Poll::Ready(NewUni::try_from((ty, recv))) + } + _ => unreachable!(), + }; + }; + } + } + } + } + } + } +} + +pub struct ControlStream(FrameStream); + +pub struct PushStream(FrameStream);