Skip to content

Commit

Permalink
fix: fix mux udp relaying
Browse files Browse the repository at this point in the history
  • Loading branch information
p4gefau1t committed Mar 4, 2021
1 parent a03c625 commit bdd4c28
Show file tree
Hide file tree
Showing 7 changed files with 23 additions and 23 deletions.
4 changes: 2 additions & 2 deletions src/protocol/direct/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ pub struct DirectUdpStream {
#[async_trait]
impl UdpRead for DirectUdpStream {
async fn read_from(&mut self, buf: &mut [u8]) -> io::Result<(usize, Address)> {
let (size, addr) = self.inner.recv_from(buf).await?;
Ok((size, Address::SocketAddress(addr)))
let (len, addr) = self.inner.recv_from(buf).await?;
Ok((len, Address::SocketAddress(addr)))
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/protocol/dokodemo/acceptor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ pub struct DokodemoUdpStream {
#[async_trait]
impl UdpRead for DokodemoUdpStream {
async fn read_from(&mut self, buf: &mut [u8]) -> Result<(usize, Address)> {
let (size, _) = self.inner.recv_from(buf).await?;
Ok((size, self.addr.clone()))
let (len, _) = self.inner.recv_from(buf).await?;
Ok((len, self.addr.clone()))
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/protocol/mux/acceptor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ impl ProxyAcceptor for MuxAcceptor {

async fn accept(&self) -> io::Result<AcceptResult<Self::TS, Self::US>> {
if let Some(result) = self.accept_stream_rx.lock().await.recv().await {
return Ok(result);
Ok(result)
} else {
Err(io::ErrorKind::ConnectionReset.into())
}
Expand Down
2 changes: 1 addition & 1 deletion src/protocol/mux/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ impl<T: ProxyConnector> ProxyConnector for MuxConnector<T> {
async fn connect_udp(&self) -> io::Result<Self::US> {
let mut stream = self.spawn_mux_stream().await?;
RequestHeader::new(
Command::TcpConnect,
Command::UdpAssociate,
&Address::DomainNameAddress("UDP_CONN".to_string(), 0),
)
.write_to(&mut stream)
Expand Down
10 changes: 5 additions & 5 deletions src/protocol/mux/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,10 +141,9 @@ impl UdpHeader {
R: AsyncRead + Unpin,
{
let address = Address::read_from_stream(stream).await?;
log::debug!("udp addr read: {}", address);
let mut len = [0u8; 2];
stream.read_exact(&mut len).await?;
let len = ((len[0] as u16) << 8) | (len[1] as u16);
let mut len_buf = [0u8; 2];
stream.read_exact(&mut len_buf).await?;
let len = ((len_buf[0] as u16) << 8) | (len_buf[1] as u16);
Ok(Self {
address,
payload_len: len,
Expand All @@ -156,7 +155,7 @@ impl UdpHeader {
W: AsyncWrite + Unpin,
{
self.address.write_to_stream(w).await?;
self.payload_len.to_be_bytes();

w.write(&self.payload_len.to_be_bytes()).await?;
Ok(())
}
Expand Down Expand Up @@ -445,6 +444,7 @@ impl UdpWrite for MuxStreamWriteHalf {
let len = min(buf.len(), MAX_DATA_LEN);
let udp_header = UdpHeader::new(addr, len);
udp_header.write_to(self).await?;
self.write(buf).await?;
Ok(())
}
}
Expand Down
7 changes: 3 additions & 4 deletions src/protocol/trojan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,9 @@ impl RequestHeader {
R: AsyncRead + Unpin,
{
let mut hash_buf = [0u8; 56];
let size = stream.read(&mut hash_buf).await?;
if size != 56 {
first_packet.extend_from_slice(&hash_buf[..size]);
let len = stream.read(&mut hash_buf).await?;
if len != 56 {
first_packet.extend_from_slice(&hash_buf[..len]);
return Err(new_error("first packet too short"));
}

Expand Down Expand Up @@ -200,7 +200,6 @@ impl TrojanUdpHeader {
W: AsyncWrite + Unpin,
{
self.address.write_to_stream(w).await?;
self.payload_len.to_be_bytes();
w.write(&self.payload_len.to_be_bytes()).await?;
let crlf = b"\r\n";
w.write(crlf).await?;
Expand Down
17 changes: 9 additions & 8 deletions src/proxy/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,12 @@ const RELAY_BUFFER_SIZE: usize = 0x4000;
async fn copy_udp<R: UdpRead, W: UdpWrite>(r: &mut R, w: &mut W) -> io::Result<()> {
let mut buf = [0u8; RELAY_BUFFER_SIZE];
loop {
let (size, addr) = r.read_from(&mut buf).await?;
if size == 0 {
let (len, addr) = r.read_from(&mut buf).await?;
log::debug!("udp packet from {} len {}", addr, len);
if len == 0 {
break;
}
w.write_to(&buf[..size], &addr).await?;
w.write_to(&buf[..len], &addr).await?;
}
Ok(())
}
Expand All @@ -56,11 +57,11 @@ async fn copy_tcp<R: AsyncRead + Unpin, W: AsyncWrite + Unpin>(
) -> io::Result<()> {
let mut buf = [0u8; RELAY_BUFFER_SIZE];
loop {
let size = r.read(&mut buf).await?;
if size == 0 {
let len = r.read(&mut buf).await?;
if len == 0 {
break;
}
w.write(&buf[..size]).await?;
w.write(&buf[..len]).await?;
}
Ok(())
}
Expand All @@ -77,8 +78,8 @@ pub async fn relay_udp<T: ProxyUdpStream, U: ProxyUdpStream>(a: T, b: U) {
if let Err(e) = e {
log::debug!("udp session ends: {}", e)
}
let _ = T::reunite(a_rx, a_tx).close();
let _ = U::reunite(b_rx, b_tx).close();
let _ = T::reunite(a_rx, a_tx).close().await;
let _ = U::reunite(b_rx, b_tx).close().await;
}

pub async fn relay_tcp<T: ProxyTcpStream, U: ProxyTcpStream>(a: T, b: U) {
Expand Down

0 comments on commit bdd4c28

Please sign in to comment.