diff --git a/.gitignore b/.gitignore index 0a321dc707..b6f6a48976 100644 --- a/.gitignore +++ b/.gitignore @@ -17,3 +17,6 @@ Cargo.lock # Test Files spike/ *.torrent + +# Visual Studio Code Files +.vscode diff --git a/bip_peer/src/manager/task.rs b/bip_peer/src/manager/task.rs index 3f696dfc38..164ac93f5d 100644 --- a/bip_peer/src/manager/task.rs +++ b/bip_peer/src/manager/task.rs @@ -1,3 +1,5 @@ +#![allow(deprecated)] + use std::io; use manager::builder::PeerManagerBuilder; diff --git a/bip_peer/src/message/bencode/mod.rs b/bip_peer/src/message/bencode/mod.rs index 32ec893a1b..8fd131ab48 100644 --- a/bip_peer/src/message/bencode/mod.rs +++ b/bip_peer/src/message/bencode/mod.rs @@ -31,14 +31,14 @@ pub const CLIENT_IPV4_ADDR_KEY: &'static [u8] = b"ipv4"; pub const CLIENT_MAX_REQUESTS_KEY: &'static [u8] = b"reqq"; pub const METADATA_SIZE_KEY: &'static [u8] = b"metadata_size"; -pub fn parse_id_map<'a, B>(root: &BDictAccess<'a, B>) -> HashMap +pub fn parse_id_map<'a, B>(root: &BDictAccess<'a, B>) -> HashMap where B: BRefAccess<'a> { let mut id_map = HashMap::new(); if let Ok(ben_id_map) = CONVERT.lookup_and_convert_dict(root, ID_MAP_KEY) { for (id, ben_value) in ben_id_map.to_list() { match (str::from_utf8(id), CONVERT.convert_int(ben_value, id)) { - (Ok(str_id), Ok(value)) => { id_map.insert(ExtendedType::from_id(str_id), value); }, + (Ok(str_id), Ok(value)) => { id_map.insert(ExtendedType::from_id(str_id), value as u8); }, _ => () } } @@ -129,4 +129,25 @@ fn parse_ipv6_addr(ipv6_bytes: &[u8]) -> Ipv6Addr { ipv6_bytes[4], ipv6_bytes[5], ipv6_bytes[6], ipv6_bytes[7], ipv6_bytes[8], ipv6_bytes[9], ipv6_bytes[10], ipv6_bytes[11], ipv6_bytes[12], ipv6_bytes[13], ipv6_bytes[14], ipv6_bytes[15]]) +} + +// ----------------------------------------------------------------------------// + +pub const MESSAGE_TYPE_KEY: &'static [u8] = b"msg_type"; +pub const PIECE_INDEX_KEY: &'static [u8] = b"piece"; +pub const TOTAL_SIZE_KEY: &'static [u8] = b"total_size"; + +pub fn parse_message_type<'a, B>(root: &BDictAccess<'a, B>) -> io::Result + where B: BRefAccess<'a> { + CONVERT.lookup_and_convert_int(root, MESSAGE_TYPE_KEY).map(|msg_type| msg_type as u8).into() +} + +pub fn parse_piece_index<'a, B>(root: &BDictAccess<'a, B>) -> io::Result + where B: BRefAccess<'a> { + CONVERT.lookup_and_convert_int(root, PIECE_INDEX_KEY).into() +} + +pub fn parse_total_size<'a, B>(root: &BDictAccess<'a, B>) -> io::Result + where B: BRefAccess<'a> { + CONVERT.lookup_and_convert_int(root, TOTAL_SIZE_KEY).into() } \ No newline at end of file diff --git a/bip_peer/src/message/bits_extension.rs b/bip_peer/src/message/bits_extension.rs index c87a8652f7..d12b77b726 100644 --- a/bip_peer/src/message/bits_extension.rs +++ b/bip_peer/src/message/bits_extension.rs @@ -14,8 +14,8 @@ use message::bencode; const PORT_MESSAGE_LEN: u32 = 3; const BASE_EXTENDED_MESSAGE_LEN: u32 = 6; -const PORT_MESSAGE_ID: u8 = 9; -const EXTENDED_MESSAGE_ID: u8 = 20; +const PORT_MESSAGE_ID: u8 = 9; +pub const EXTENDED_MESSAGE_ID: u8 = 20; const EXTENDED_MESSAGE_HANDSHAKE_ID: u8 = 0; @@ -152,7 +152,7 @@ impl ExtendedType { /// See `http://www.bittorrent.org/beps/bep_0010.html`. #[derive(Clone, Debug, PartialEq, Eq)] pub struct ExtendedMessage { - id_map: HashMap, + id_map: HashMap, client_id: Option, client_tcp_port: Option, our_ip: Option, @@ -164,7 +164,7 @@ pub struct ExtendedMessage { } impl ExtendedMessage { - fn with_raw(id_map: HashMap, client_id: Option, client_tcp_port: Option, + fn with_raw(id_map: HashMap, client_id: Option, client_tcp_port: Option, our_ip: Option, client_ipv6_addr: Option, client_ipv4_addr: Option, client_max_requests: Option, metadata_size: Option, raw_bencode: Bytes) -> ExtendedMessage { ExtendedMessage{ id_map: id_map, client_id: client_id, client_tcp_port: client_tcp_port, @@ -172,7 +172,7 @@ impl ExtendedMessage { client_max_requests: client_max_requests, metadata_size: metadata_size, raw_bencode: raw_bencode } } - pub fn new(id_map: HashMap, client_id: Option, client_tcp_port: Option, + pub fn new(id_map: HashMap, client_id: Option, client_tcp_port: Option, our_ip: Option, client_ipv6_addr: Option, client_ipv4_addr: Option, client_max_requests: Option, metadata_size: Option) -> ExtendedMessage { let mut message = ExtendedMessage{ id_map: id_map, client_id: client_id, client_tcp_port: client_tcp_port, @@ -236,7 +236,7 @@ impl ExtendedMessage { self.raw_bencode.len() } - pub fn query_id(&self, ext_type: &ExtendedType) -> Option { + pub fn query_id(&self, ext_type: &ExtendedType) -> Option { self.id_map.get(ext_type).map(|id| *id) } @@ -291,7 +291,7 @@ fn bencode_from_extended_params(extended: &ExtendedMessage) -> Vec { { let ben_id_map_access = ben_id_map.dict_mut().unwrap(); for (ext_id, &value) in extended.id_map.iter() { - ben_id_map_access.insert(ext_id.id().as_bytes(), ben_int!(value)); + ben_id_map_access.insert(ext_id.id().as_bytes(), ben_int!(value as i64)); } } diff --git a/bip_peer/src/message/mod.rs b/bip_peer/src/message/mod.rs index c6a242c85e..bfdeaf5adf 100644 --- a/bip_peer/src/message/mod.rs +++ b/bip_peer/src/message/mod.rs @@ -42,12 +42,14 @@ const HEADER_LEN: usize = MESSAGE_LENGTH_LEN_BYTES + MESSAGE_ID_LE mod bencode; mod bits_extension; +mod prot_extension; mod standard; mod null; pub use message::bits_extension::{BitsExtensionMessage, PortMessage, ExtendedMessage, ExtendedType}; pub use message::standard::{HaveMessage, BitFieldMessage, BitFieldIter, RequestMessage, PieceMessage, CancelMessage}; pub use message::null::NullProtocolMessage; +pub use message::prot_extension::{PeerExtensionProtocolMessage, LtMetadataMessage, LtMetadataRequestMessage, LtMetadataDataMessage, LtMetadataRejectMessage}; /// Enumeration of messages for `PeerWireProtocol`. pub enum PeerWireProtocolMessage

where P: PeerProtocol { @@ -100,7 +102,7 @@ impl

ManagedMessage for PeerWireProtocolMessage

impl

PeerWireProtocolMessage

where P: PeerProtocol { - pub fn bytes_needed(bytes: &[u8], _ext_protocol: &mut P) -> io::Result> { + pub fn bytes_needed(bytes: &[u8]) -> io::Result> { match be_u32(bytes) { // We need 4 bytes for the length, plus whatever the length is... IResult::Done(_, length) => Ok(Some(MESSAGE_LENGTH_LEN_BYTES + u32_to_usize(length))), diff --git a/bip_peer/src/message/prot_extension.rs b/bip_peer/src/message/prot_extension.rs index f5de772497..840dc3070b 100644 --- a/bip_peer/src/message/prot_extension.rs +++ b/bip_peer/src/message/prot_extension.rs @@ -1,37 +1,260 @@ +use std::io::{self, Write}; + +use bip_bencode::{BDecodeOpt, BencodeRef, BConvert}; +use bytes::Bytes; +use nom::{IResult, be_u32, be_u8, ErrorKind}; +use byteorder::{WriteBytesExt, BigEndian}; + +use message::{self, ExtendedMessage, PeerWireProtocolMessage, ExtendedType}; +use message::bencode; +use message::bits_extension; +use protocol::{PeerProtocol}; + +const EXTENSION_HEADER_LEN: usize = message::HEADER_LEN + 1; + +const REQUEST_MESSAGE_TYPE_ID: u8 = 0; +const DATA_MESSAGE_TYPE_ID: u8 = 1; +const REJECT_MESSAGE_TYPE_ID: u8 = 2; + +const ROOT_ERROR_KEY: &'static str = "PeerExtensionProtocolMessage"; + pub enum PeerExtensionProtocolMessage

where P: PeerProtocol { - LtMetadata(LtMetadataMesage), - UtPex(UtPexMessage), - Custom(P::PeerMessage) + LtMetadata(LtMetadataMessage), + //UtPex(UtPexMessage), + Custom(P::ProtocolMessage) +} + +impl

PeerExtensionProtocolMessage

where P: PeerProtocol { + pub fn bytes_needed(bytes: &[u8]) -> io::Result> { + // Follows same length prefix logic as our normal wire protocol... + PeerWireProtocolMessage::

::bytes_needed(bytes) + } + + pub fn parse_bytes(bytes: Bytes, extended: &ExtendedMessage, custom_prot: &mut P) -> io::Result> { + match parse_extensions(bytes, extended, custom_prot) { + IResult::Done(_, result) => result, + _ => Err(io::Error::new(io::ErrorKind::Other, "Failed To Parse PeerExtensionProtocolMessage")) + } + } + + pub fn write_bytes(&self, mut writer: W, extended: &ExtendedMessage, custom_prot: &mut P) -> io::Result<()> + where W: Write + { + match self { + &PeerExtensionProtocolMessage::LtMetadata(ref msg) => { + let ext_id = if let Some(ext_id) = extended.query_id(&ExtendedType::LtMetadata) { + ext_id + } else { return Err(io::Error::new(io::ErrorKind::Other, "Can't Send LtMetadataMessage As We Have No Id Mapping")) }; + + let total_len = (2 + msg.message_size()) as u32; + + try!(message::write_length_id_pair(&mut writer, total_len, Some(bits_extension::EXTENDED_MESSAGE_ID))); + try!(writer.write_u8(ext_id)); + + msg.write_bytes(writer) + }, + &PeerExtensionProtocolMessage::Custom(ref msg) => custom_prot.write_bytes(msg, writer) + } + } + + pub fn message_size(&self, custom_prot: &mut P) -> usize { + match self { + &PeerExtensionProtocolMessage::LtMetadata(ref msg) => msg.message_size(), + &PeerExtensionProtocolMessage::Custom(ref msg) => custom_prot.message_size(&msg) + } + } +} + +fn parse_extensions

(mut bytes: Bytes, extended: &ExtendedMessage, custom_prot: &mut P) -> IResult<(), io::Result>> + where P: PeerProtocol { + let header_bytes = bytes.clone(); + + // Attempt to parse a built in message type, otherwise, see if it is an extension type. + alt!((), + ignore_input!( + switch!(header_bytes.as_ref(), throwaway_input!(tuple!(be_u32, be_u8, be_u8)), + (message_len, bits_extension::EXTENDED_MESSAGE_ID, message_id) => + call!(parse_extensions_with_id, bytes.split_off(EXTENSION_HEADER_LEN).split_to(message_len as usize - 2), extended, message_id) + ) + ) | map!(value!(custom_prot.parse_bytes(bytes)), + |res_cust_ext| res_cust_ext.map(|cust_ext| PeerExtensionProtocolMessage::Custom(cust_ext))) + ) +} + +fn parse_extensions_with_id

(_input: (), bytes: Bytes, extended: &ExtendedMessage, id: u8) -> IResult<(), io::Result>> + where P: PeerProtocol { + let lt_metadata_id = extended.query_id(&ExtendedType::LtMetadata); + //let ut_pex_id = extended.query_id(&ExtendedType::UtPex); + + let result = if lt_metadata_id == Some(id) { + LtMetadataMessage::parse_bytes(bytes) + .map(|lt_metadata_msg| PeerExtensionProtocolMessage::LtMetadata(lt_metadata_msg)) + } else { + Err(io::Error::new(io::ErrorKind::Other, format!("Unknown Id For PeerExtensionProtocolMessage: {}", id))) + }; + + IResult::Done((), result) } // ----------------------------------------------------------------------------// -pub struct LtMetadataMessage { +pub enum LtMetadataMessage { Request(LtMetadataRequestMessage), Data(LtMetadataDataMessage), Reject(LtMetadataRejectMessage) } +impl LtMetadataMessage { + pub fn parse_bytes(mut bytes: Bytes) -> io::Result { + // Our bencode is pretty flat, and we dont want to enforce a full decode, as data + // messages have the raw data appended outside of the bencode structure... + let decode_opts = BDecodeOpt::new(1, false, false); + + match BencodeRef::decode(bytes.clone().as_ref(), decode_opts) { + Ok(bencode) => { + let bencode_dict = try!(bencode::CONVERT.convert_dict(&bencode, ROOT_ERROR_KEY)); + let msg_type = try!(bencode::parse_message_type(bencode_dict)); + let piece = try!(bencode::parse_piece_index(bencode_dict)); + + let bencode_bytes = bytes.split_to(bencode.buffer().len()); + let extra_bytes = bytes; + + match msg_type { + REQUEST_MESSAGE_TYPE_ID => Ok(LtMetadataMessage::Request(LtMetadataRequestMessage::with_bytes(piece, bencode_bytes))), + REJECT_MESSAGE_TYPE_ID => Ok(LtMetadataMessage::Reject(LtMetadataRejectMessage::with_bytes(piece, bencode_bytes))), + DATA_MESSAGE_TYPE_ID => { + let total_size = try!(bencode::parse_total_size(bencode_dict)); + + Ok(LtMetadataMessage::Data(LtMetadataDataMessage::with_bytes(piece, total_size, extra_bytes, bencode_bytes))) + }, + other => { return Err(io::Error::new(io::ErrorKind::Other, format!("Failed To Recognize Message Type For LtMetadataMessage: {}", msg_type))) } + } + }, + Err(_) => Err(io::Error::new(io::ErrorKind::Other, "Failed To Parse LtMetadataMessage As Bencode")) + } + } + + pub fn write_bytes(&self, writer: W) -> io::Result<()> + where W: Write + { + match self { + &LtMetadataMessage::Request(ref request) => request.write_bytes(writer), + &LtMetadataMessage::Data(ref data) => data.write_bytes(writer), + &LtMetadataMessage::Reject(ref reject) => reject.write_bytes(writer), + } + } + + pub fn message_size(&self) -> usize { + match self { + &LtMetadataMessage::Request(ref request) => request.message_size(), + &LtMetadataMessage::Data(ref data) => data.message_size(), + &LtMetadataMessage::Reject(ref reject) => reject.message_size(), + } + } +} + +// ----------------------------------------------------------------------------// pub struct LtMetadataRequestMessage { - piece: i64 + piece: i64, + bytes: Bytes } impl LtMetadataRequestMessage { pub fn new(piece: i64) -> LtMetadataRequestMessage { - LtMetadataRequestMessage{ piece: piece } + let encoded_bytes = (ben_map!{ + bencode::MESSAGE_TYPE_KEY => ben_int!(REQUEST_MESSAGE_TYPE_ID as i64), + bencode::PIECE_INDEX_KEY => ben_int!(piece) + }).encode(); + + let mut bytes = Bytes::with_capacity(encoded_bytes.len()); + bytes.extend_from_slice(&encoded_bytes[..]); + + LtMetadataRequestMessage::with_bytes(piece, bytes) + } + + pub fn with_bytes(piece: i64, bytes: Bytes) -> LtMetadataRequestMessage { + LtMetadataRequestMessage{ piece: piece, bytes: bytes } + } + + pub fn write_bytes(&self, mut writer: W) -> io::Result<()> + where W: Write + { + writer.write_all(self.bytes.as_ref()) + } + + pub fn message_size(&self) -> usize { + self.bytes.len() } } pub struct LtMetadataDataMessage { piece: i64, - total_size: i64 + total_size: i64, + data: Bytes, + bytes: Bytes } +impl LtMetadataDataMessage { + pub fn new(piece: i64, total_size: i64, data: Bytes) -> LtMetadataDataMessage { + let encoded_bytes = (ben_map!{ + bencode::MESSAGE_TYPE_KEY => ben_int!(DATA_MESSAGE_TYPE_ID as i64), + bencode::PIECE_INDEX_KEY => ben_int!(piece), + bencode::TOTAL_SIZE_KEY => ben_int!(total_size) + }).encode(); + let mut bytes = Bytes::with_capacity(encoded_bytes.len()); + bytes.extend_from_slice(&encoded_bytes[..]); + + LtMetadataDataMessage::with_bytes(piece, total_size, data, bytes) + } + + pub fn with_bytes(piece: i64, total_size: i64, data: Bytes, bytes: Bytes) -> LtMetadataDataMessage { + LtMetadataDataMessage{ piece: piece, total_size: total_size, data: data, bytes: bytes } + } + + pub fn write_bytes(&self, mut writer: W) -> io::Result<()> + where W: Write + { + try!(writer.write_all(self.bytes.as_ref())); + + writer.write_all(self.data.as_ref()) + } + + pub fn message_size(&self) -> usize { + self.bytes.len() + self.data.len() + } +} pub struct LtMetadataRejectMessage { - piece: i64 + piece: i64, + bytes: Bytes } -// ----------------------------------------------------------------------------// \ No newline at end of file +impl LtMetadataRejectMessage { + pub fn new(piece: i64) -> LtMetadataRejectMessage { + let encoded_bytes = (ben_map!{ + bencode::MESSAGE_TYPE_KEY => ben_int!(REJECT_MESSAGE_TYPE_ID as i64), + bencode::PIECE_INDEX_KEY => ben_int!(piece) + }).encode(); + + let mut bytes = Bytes::with_capacity(encoded_bytes.len()); + bytes.extend_from_slice(&encoded_bytes[..]); + + LtMetadataRejectMessage::with_bytes(piece, bytes) + } + + pub fn with_bytes(piece: i64, bytes: Bytes) -> LtMetadataRejectMessage { + LtMetadataRejectMessage{ piece: piece, bytes: bytes } + } + + pub fn write_bytes(&self, mut writer: W) -> io::Result<()> + where W: Write + { + writer.write_all(self.bytes.as_ref()) + } + + pub fn message_size(&self) -> usize { + self.bytes.len() + } +} \ No newline at end of file diff --git a/bip_peer/src/protocol/extension.rs b/bip_peer/src/protocol/extension.rs index 5c8389f4a3..2af41257fc 100644 --- a/bip_peer/src/protocol/extension.rs +++ b/bip_peer/src/protocol/extension.rs @@ -1,8 +1,15 @@ +use std::io::{self, Write}; + use bytes::Bytes; +use message::{ExtendedMessage, PeerExtensionProtocolMessage}; +use protocol::{PeerProtocol, NestedPeerProtocol}; + /// Protocol message for peer wire messages. pub struct PeerExtensionProtocol

{ - custom_protocol: P + our_extended_msg: Option, + their_extended_msg: Option, + custom_protocol: P } impl

PeerExtensionProtocol

{ @@ -10,7 +17,7 @@ impl

PeerExtensionProtocol

{ /// /// Notes for `PeerWireProtocol` apply to this custom extension protocol, so refer to that. pub fn new(custom_protocol: P) -> PeerExtensionProtocol

{ - PeerExtensionProtocol{ custom_protocol: custom_protocol } + PeerExtensionProtocol{ our_extended_msg: None, their_extended_msg: None, custom_protocol: custom_protocol } } } @@ -18,19 +25,39 @@ impl

PeerProtocol for PeerExtensionProtocol

where P: PeerProtocol { type ProtocolMessage = PeerExtensionProtocolMessage

; fn bytes_needed(&mut self, bytes: &[u8]) -> io::Result> { - PeerExtensionProtocolMessage::bytes_needed(bytes, &mut self.custom_protocol) + PeerExtensionProtocolMessage::

::bytes_needed(bytes) } fn parse_bytes(&mut self, bytes: Bytes) -> io::Result { - PeerExtensionProtocolMessage::parse_bytes(bytes, &mut self.custom_protocol) + match self.their_extended_msg { + Some(ref extended_msg) => PeerExtensionProtocolMessage::parse_bytes(bytes, extended_msg, &mut self.custom_protocol), + None => Err(io::Error::new(io::ErrorKind::Other, "Extension Message Received From Peer Before Extended Message...")) + } } fn write_bytes(&mut self, message: &Self::ProtocolMessage, writer: W) -> io::Result<()> where W: Write { - message.write_bytes(writer, &mut self.custom_protocol) + match self.our_extended_msg { + Some(ref extended_msg) => PeerExtensionProtocolMessage::write_bytes(message, writer, extended_msg, &mut self.custom_protocol), + None => Err(io::Error::new(io::ErrorKind::Other, "Extension Message Sent From Us Before Extended Message...")) + } } fn message_size(&mut self, message: &Self::ProtocolMessage) -> usize { message.message_size(&mut self.custom_protocol) } +} + +impl

NestedPeerProtocol for PeerExtensionProtocol

where P: NestedPeerProtocol { + fn received_message(&mut self, message: &ExtendedMessage) { + self.custom_protocol.received_message(message); + + self.their_extended_msg = Some(message.clone()); + } + + fn sent_message(&mut self, message: &ExtendedMessage) { + self.custom_protocol.sent_message(message); + + self.our_extended_msg = Some(message.clone()); + } } \ No newline at end of file diff --git a/bip_peer/src/protocol/mod.rs b/bip_peer/src/protocol/mod.rs index 9040de011b..2502006bd4 100644 --- a/bip_peer/src/protocol/mod.rs +++ b/bip_peer/src/protocol/mod.rs @@ -4,6 +4,7 @@ use std::io::{self, Write}; use bytes::Bytes; +pub mod extension; pub mod unit; pub mod null; pub mod wire; diff --git a/bip_peer/src/protocol/wire.rs b/bip_peer/src/protocol/wire.rs index 82622bcd60..0a92c8895f 100644 --- a/bip_peer/src/protocol/wire.rs +++ b/bip_peer/src/protocol/wire.rs @@ -25,7 +25,7 @@ impl

PeerProtocol for PeerWireProtocol

where P: PeerProtocol + NestedPeerP type ProtocolMessage = PeerWireProtocolMessage

; fn bytes_needed(&mut self, bytes: &[u8]) -> io::Result> { - PeerWireProtocolMessage::bytes_needed(bytes, &mut self.ext_protocol) + PeerWireProtocolMessage::

::bytes_needed(bytes) } fn parse_bytes(&mut self, bytes: Bytes) -> io::Result {