Skip to content

Commit

Permalink
quic test timeout fix (solana-labs#29260)
Browse files Browse the repository at this point in the history
Allow longer chunk receive timeout without impacting testing the stream exit condition for unit tests. The exit is periodically checked, we will break only when the total allowed chunk receive timed out. The start time is reset when a new chunk is received.
  • Loading branch information
lijunwangs authored Dec 16, 2022
1 parent 8ef900b commit d1cf4ce
Showing 1 changed file with 19 additions and 7 deletions.
26 changes: 19 additions & 7 deletions streamer/src/nonblocking/quic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -566,9 +566,17 @@ async fn handle_connection(
let last_update = last_update.clone();
tokio::spawn(async move {
let mut maybe_batch = None;
// The min is to guard against a value too small which can wake up unnecessarily
// frequently and wasting CPU cycles. The max guard against waiting for too long
// which delay exit and cause some test failures when the timeout value is large.
// Within this value, the heuristic is to wake up 10 times to check for exit
// for the set timeout if there are no data.
let exit_check_interval =
(wait_for_chunk_timeout_ms / 10).clamp(10, 1000);
let mut start = Instant::now();
while !stream_exit.load(Ordering::Relaxed) {
if let Ok(chunk) = tokio::time::timeout(
Duration::from_millis(wait_for_chunk_timeout_ms),
Duration::from_millis(exit_check_interval),
stream.read_chunk(PACKET_DATA_SIZE, false),
)
.await
Expand All @@ -585,12 +593,16 @@ async fn handle_connection(
last_update.store(timing::timestamp(), Ordering::Relaxed);
break;
}
start = Instant::now();
} else {
debug!("Timeout in receiving on stream");
stats
.total_stream_read_timeouts
.fetch_add(1, Ordering::Relaxed);
break;
let elapse = Instant::now() - start;
if elapse.as_millis() as u64 > wait_for_chunk_timeout_ms {
debug!("Timeout in receiving on stream");
stats
.total_stream_read_timeouts
.fetch_add(1, Ordering::Relaxed);
break;
}
}
}
stats.total_streams.fetch_sub(1, Ordering::Relaxed);
Expand Down Expand Up @@ -1020,7 +1032,7 @@ pub mod test {
MAX_STAKED_CONNECTIONS,
MAX_UNSTAKED_CONNECTIONS,
stats.clone(),
1000,
2000,
)
.unwrap();
(t, exit, receiver, server_address, stats)
Expand Down

0 comments on commit d1cf4ce

Please sign in to comment.