Skip to content

Commit

Permalink
Do not reserve QUIC stream capacity for unstaked client on forward po…
Browse files Browse the repository at this point in the history
  • Loading branch information
pgarg66 authored Jan 17, 2024
1 parent 3fa44e6 commit 9edf65b
Show file tree
Hide file tree
Showing 2 changed files with 206 additions and 115 deletions.
35 changes: 13 additions & 22 deletions streamer/src/nonblocking/quic.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use {
crate::{
nonblocking::stream_throttle::{
self, ConnectionStreamCounter, StakedStreamLoadEMA, STREAM_STOP_CODE_THROTTLING,
ConnectionStreamCounter, StakedStreamLoadEMA, STREAM_STOP_CODE_THROTTLING,
},
quic::{configure_server, QuicServerError, StreamStats},
streamer::StakedNodes,
Expand Down Expand Up @@ -90,7 +90,7 @@ pub enum ConnectionPeerType {
}

impl ConnectionPeerType {
fn is_staked(&self) -> bool {
pub(crate) fn is_staked(&self) -> bool {
matches!(self, ConnectionPeerType::Staked(_))
}
}
Expand Down Expand Up @@ -156,7 +156,10 @@ async fn run_server(
let mut last_datapoint = Instant::now();
let unstaked_connection_table: Arc<Mutex<ConnectionTable>> =
Arc::new(Mutex::new(ConnectionTable::new()));
let stream_load_ema = Arc::new(StakedStreamLoadEMA::new(stats.clone()));
let stream_load_ema = Arc::new(StakedStreamLoadEMA::new(
max_unstaked_connections > 0,
stats.clone(),
));
let staked_connection_table: Arc<Mutex<ConnectionTable>> =
Arc::new(Mutex::new(ConnectionTable::new()));
let (sender, receiver) = async_unbounded();
Expand Down Expand Up @@ -718,25 +721,17 @@ async fn handle_connection(
);
let stable_id = connection.stable_id();
stats.total_connections.fetch_add(1, Ordering::Relaxed);
let mut max_streams_per_throttling_interval =
stream_throttle::max_streams_for_connection_in_throttling_duration(
params.peer_type,
params.total_stake,
stream_load_ema.clone(),
);
while !stream_exit.load(Ordering::Relaxed) {
if let Ok(stream) =
tokio::time::timeout(WAIT_FOR_STREAM_TIMEOUT, connection.accept_uni()).await
{
match stream {
Ok(mut stream) => {
if let ConnectionPeerType::Staked(peer_stake) = params.peer_type {
max_streams_per_throttling_interval = stream_load_ema
.available_load_capacity_in_throttling_duration(
peer_stake,
params.total_stake,
);
}
let max_streams_per_throttling_interval = stream_load_ema
.available_load_capacity_in_throttling_duration(
params.peer_type,
params.total_stake,
);

stream_counter.reset_throttling_params_if_needed();
if stream_counter.stream_count.load(Ordering::Relaxed)
Expand All @@ -746,9 +741,7 @@ async fn handle_connection(
let _ = stream.stop(VarInt::from_u32(STREAM_STOP_CODE_THROTTLING));
continue;
}
if params.peer_type.is_staked() {
stream_load_ema.increment_load();
}
stream_load_ema.increment_load(params.peer_type);
stream_counter.stream_count.fetch_add(1, Ordering::Relaxed);
stats.total_streams.fetch_add(1, Ordering::Relaxed);
stats.total_new_streams.fetch_add(1, Ordering::Relaxed);
Expand Down Expand Up @@ -797,9 +790,7 @@ async fn handle_connection(
}
}
stats.total_streams.fetch_sub(1, Ordering::Relaxed);
if params.peer_type.is_staked() {
stream_load_ema.update_ema_if_needed();
}
stream_load_ema.update_ema_if_needed();
});
}
Err(e) => {
Expand Down
Loading

0 comments on commit 9edf65b

Please sign in to comment.