Skip to content

Commit

Permalink
Don't iterate over peers in generic_proto::behaviour::poll (paritytec…
Browse files Browse the repository at this point in the history
…h#6142)

* Don't iterate over peers in generic_proto::behaviour::poll

* Improve comment

* Rework to use DelayIds
  • Loading branch information
tomaka authored May 26, 2020
1 parent bda3b40 commit b3adec2
Showing 1 changed file with 62 additions and 21 deletions.
83 changes: 62 additions & 21 deletions client/network/src/protocol/generic_proto/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,18 @@ pub struct GenericProto {
/// List of peers in our state.
peers: FnvHashMap<PeerId, PeerState>,

/// The elements in `peers` occasionally contain `Delay` objects that we would normally have
/// to be polled one by one. In order to avoid doing so, as an optimization, every `Delay` is
/// instead put inside of `delays` and reference by a [`DelayId`]. This stream
/// yields `PeerId`s whose `DelayId` is potentially ready.
///
/// By design, we never remove elements from this list. Elements are removed only when the
/// `Delay` triggers. As such, this stream may produce obsolete elements.
delays: stream::FuturesUnordered<Pin<Box<dyn Future<Output = (DelayId, PeerId)> + Send>>>,

/// [`DelayId`] to assign to the next delay.
next_delay_id: DelayId,

/// List of incoming messages we have sent to the peer set manager and that are waiting for an
/// answer.
incoming: SmallVec<[IncomingPeer; 6]>,
Expand All @@ -141,6 +153,10 @@ pub struct GenericProto {
queue_size_report: Option<HistogramVec>,
}

/// Identifier for a delay firing.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
struct DelayId(u64);

/// State of a peer we're connected to.
#[derive(Debug)]
enum PeerState {
Expand All @@ -158,8 +174,8 @@ enum PeerState {

/// The peerset requested that we connect to this peer. We are currently not connected.
PendingRequest {
/// When to actually start dialing.
timer: futures_timer::Delay,
/// When to actually start dialing. References an entry in `delays`.
timer: DelayId,
/// When the `timer` will trigger.
timer_deadline: Instant,
},
Expand All @@ -183,8 +199,8 @@ enum PeerState {
DisabledPendingEnable {
/// The connections that are currently open for custom protocol traffic.
open: SmallVec<[ConnectionId; crate::MAX_CONNECTIONS_PER_PEER]>,
/// When to enable this remote.
timer: futures_timer::Delay,
/// When to enable this remote. References an entry in `delays`.
timer: DelayId,
/// When the `timer` will trigger.
timer_deadline: Instant,
},
Expand Down Expand Up @@ -338,6 +354,8 @@ impl GenericProto {
notif_protocols: Vec::new(),
peerset,
peers: FnvHashMap::default(),
delays: Default::default(),
next_delay_id: DelayId(0),
incoming: SmallVec::new(),
next_incoming_index: sc_peerset::IncomingIndex(0),
events: VecDeque::new(),
Expand Down Expand Up @@ -627,10 +645,20 @@ impl GenericProto {

match mem::replace(occ_entry.get_mut(), PeerState::Poisoned) {
PeerState::Banned { ref until } if *until > now => {
let peer_id = occ_entry.key().clone();
debug!(target: "sub-libp2p", "PSM => Connect({:?}): Will start to connect at \
until {:?}", occ_entry.key(), until);
until {:?}", peer_id, until);

let delay_id = self.next_delay_id;
self.next_delay_id.0 += 1;
let delay = futures_timer::Delay::new(*until - now);
self.delays.push(async move {
delay.await;
(delay_id, peer_id)
}.boxed());

*occ_entry.into_mut() = PeerState::PendingRequest {
timer: futures_timer::Delay::new(*until - now),
timer: delay_id,
timer_deadline: *until,
};
},
Expand All @@ -649,11 +677,21 @@ impl GenericProto {
open,
banned_until: Some(ref banned)
} if *banned > now => {
let peer_id = occ_entry.key().clone();
debug!(target: "sub-libp2p", "PSM => Connect({:?}): But peer is banned until {:?}",
occ_entry.key(), banned);
peer_id, banned);

let delay_id = self.next_delay_id;
self.next_delay_id.0 += 1;
let delay = futures_timer::Delay::new(*banned - now);
self.delays.push(async move {
delay.await;
(delay_id, peer_id)
}.boxed());

*occ_entry.into_mut() = PeerState::DisabledPendingEnable {
open,
timer: futures_timer::Delay::new(*banned - now),
timer: delay_id,
timer_deadline: *banned,
};
},
Expand Down Expand Up @@ -1363,34 +1401,37 @@ impl NetworkBehaviour for GenericProto {
}
}

for (peer_id, peer_state) in self.peers.iter_mut() {
match peer_state {
PeerState::PendingRequest { timer, .. } => {
if let Poll::Pending = Pin::new(timer).poll(cx) {
continue;
}
while let Poll::Ready(Some((delay_id, peer_id))) =
Pin::new(&mut self.delays).poll_next(cx) {
let peer_state = match self.peers.get_mut(&peer_id) {
Some(s) => s,
// We intentionally never remove elements from `delays`, and it may
// thus contain peers which are now gone. This is a normal situation.
None => continue,
};

match peer_state {
PeerState::PendingRequest { timer, .. } if *timer == delay_id => {
debug!(target: "sub-libp2p", "Libp2p <= Dial {:?} now that ban has expired", peer_id);
self.events.push_back(NetworkBehaviourAction::DialPeer {
peer_id: peer_id.clone(),
peer_id,
condition: DialPeerCondition::Disconnected
});
*peer_state = PeerState::Requested;
}

PeerState::DisabledPendingEnable { timer, open, .. } => {
if let Poll::Pending = Pin::new(timer).poll(cx) {
continue;
}

PeerState::DisabledPendingEnable { timer, open, .. } if *timer == delay_id => {
debug!(target: "sub-libp2p", "Handler({:?}) <= Enable (ban expired)", peer_id);
self.events.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id: peer_id.clone(),
peer_id,
handler: NotifyHandler::All,
event: NotifsHandlerIn::Enable,
});
*peer_state = PeerState::Enabled { open: mem::replace(open, Default::default()) };
}

// We intentionally never remove elements from `delays`, and it may
// thus contain obsolete entries. This is a normal situation.
_ => {},
}
}
Expand Down

0 comments on commit b3adec2

Please sign in to comment.