Skip to content

Commit

Permalink
renames Packet Meta::{,set_}addr methods to {,set_}socket_addr (solan…
Browse files Browse the repository at this point in the history
…a-labs#25478)

In order to distinguish between Meta.addr field which is an IpAddr and
the methods which refer to a SocketAddr.
  • Loading branch information
behzadnouri authored May 23, 2022
1 parent bac05dc commit c248fb3
Show file tree
Hide file tree
Showing 14 changed files with 52 additions and 49 deletions.
4 changes: 2 additions & 2 deletions bench-streamer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ fn producer(addr: &SocketAddr, exit: Arc<AtomicBool>) -> JoinHandle<()> {
packet_batch.packets.resize(10, Packet::default());
for w in packet_batch.packets.iter_mut() {
w.meta.size = PACKET_DATA_SIZE;
w.meta.set_addr(addr);
w.meta.set_socket_addr(addr);
}
let packet_batch = Arc::new(packet_batch);
spawn(move || loop {
Expand All @@ -34,7 +34,7 @@ fn producer(addr: &SocketAddr, exit: Arc<AtomicBool>) -> JoinHandle<()> {
}
let mut num = 0;
for p in &packet_batch.packets {
let a = p.meta.addr();
let a = p.meta.socket_addr();
assert!(p.meta.size <= PACKET_DATA_SIZE);
send.send_to(&p.data[..p.meta.size], &a).unwrap();
num += 1;
Expand Down
6 changes: 3 additions & 3 deletions core/src/ancestor_hashes_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ impl AncestorHashesService {
outstanding_requests: &RwLock<OutstandingAncestorHashesRepairs>,
blockstore: &Blockstore,
) -> Option<(Slot, DuplicateAncestorDecision)> {
let from_addr = packet.meta.addr();
let from_addr = packet.meta.socket_addr();
limited_deserialize(&packet.data[..packet.meta.size.saturating_sub(SIZE_OF_NONCE)])
.ok()
.and_then(|ancestor_hashes_response| {
Expand Down Expand Up @@ -1117,7 +1117,7 @@ mod test {
.recv_timeout(Duration::from_millis(10_000))
.unwrap();
let packet = &mut response_packet.packets[0];
packet.meta.set_addr(&responder_info.serve_repair);
packet.meta.set_socket_addr(&responder_info.serve_repair);
let decision = AncestorHashesService::verify_and_process_ancestor_response(
packet,
&ancestor_hashes_request_statuses,
Expand Down Expand Up @@ -1478,7 +1478,7 @@ mod test {
.recv_timeout(Duration::from_millis(10_000))
.unwrap();
let packet = &mut response_packet.packets[0];
packet.meta.set_addr(&responder_info.serve_repair);
packet.meta.set_socket_addr(&responder_info.serve_repair);
let decision = AncestorHashesService::verify_and_process_ancestor_response(
packet,
&ancestor_hashes_request_statuses,
Expand Down
6 changes: 4 additions & 2 deletions core/src/find_packet_sender_stake_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,10 @@ impl FindPacketSenderStakeStage {
.into_par_iter()
.flat_map(|batch| batch.packets.par_iter_mut())
.for_each(|packet| {
packet.meta.sender_stake =
*ip_to_stake.get(&packet.meta.addr().ip()).unwrap_or(&0);
packet.meta.sender_stake = ip_to_stake
.get(&packet.meta.addr)
.copied()
.unwrap_or_default();
});
});
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/repair_response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ pub fn repair_response_packet_from_bytes(
if packet.meta.size > packet.data.len() {
return None;
}
packet.meta.set_addr(dest);
packet.meta.set_socket_addr(dest);
packet.data[..bytes.len()].copy_from_slice(&bytes);
let mut wr = io::Cursor::new(&mut packet.data[bytes.len()..]);
bincode::serialize_into(&mut wr, &nonce).expect("Buffer not large enough to fit nonce");
Expand Down
2 changes: 1 addition & 1 deletion core/src/serve_repair.rs
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ impl ServeRepair {
) {
// iter over the packets
packet_batch.packets.iter().for_each(|packet| {
let from_addr = packet.meta.addr();
let from_addr = packet.meta.socket_addr();
limited_deserialize(&packet.data[..packet.meta.size])
.into_iter()
.for_each(|request| {
Expand Down
5 changes: 3 additions & 2 deletions core/src/window_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ where
}
if packet.meta.repair() {
let repair_info = RepairMeta {
_from_addr: packet.meta.addr(),
_from_addr: packet.meta.socket_addr(),
// If can't parse the nonce, dump the packet.
nonce: repair_response::nonce(&packet.data)?,
};
Expand Down Expand Up @@ -408,7 +408,8 @@ where

stats.num_packets += packets.iter().map(|pkt| pkt.packets.len()).sum::<usize>();
for packet in packets.iter().flat_map(|pkt| pkt.packets.iter()) {
*stats.addrs.entry(packet.meta.addr()).or_default() += 1;
let addr = packet.meta.socket_addr();
*stats.addrs.entry(addr).or_default() += 1;
}
stats.elapsed += now.elapsed();
Ok(())
Expand Down
6 changes: 3 additions & 3 deletions gossip/src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1901,7 +1901,7 @@ impl ClusterInfo {
}
check
};
// Because pull-responses are sent back to packet.meta.addr() of
// Because pull-responses are sent back to packet.meta.socket_addr() of
// incoming pull-requests, pings are also sent to request.from_addr (as
// opposed to caller.gossip address).
move |request| {
Expand Down Expand Up @@ -2470,7 +2470,7 @@ impl ClusterInfo {
let protocol: Protocol = limited_deserialize(data).ok()?;
protocol.sanitize().ok()?;
let protocol = protocol.par_verify(&self.stats)?;
Some((packet.meta.addr(), protocol))
Some((packet.meta.socket_addr(), protocol))
};
let packets: Vec<_> = {
let _st = ScopedTimer::from(&self.stats.verify_gossip_packets_time);
Expand Down Expand Up @@ -3234,7 +3234,7 @@ mod tests {
remote_nodes.into_iter(),
pongs.into_iter()
) {
assert_eq!(packet.meta.addr(), socket);
assert_eq!(packet.meta.socket_addr(), socket);
let bytes = serialize(&pong).unwrap();
match limited_deserialize(&packet.data[..packet.meta.size]).unwrap() {
Protocol::PongMessage(pong) => assert_eq!(serialize(&pong).unwrap(), bytes),
Expand Down
2 changes: 1 addition & 1 deletion perf/src/packet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ impl PacketBatch {

pub fn set_addr(&mut self, addr: &SocketAddr) {
for p in self.packets.iter_mut() {
p.meta.set_addr(addr);
p.meta.set_socket_addr(addr);
}
}

Expand Down
18 changes: 9 additions & 9 deletions sdk/src/packet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ pub const PACKET_DATA_SIZE: usize = 1280 - 40 - 8;
bitflags! {
#[repr(C)]
pub struct PacketFlags: u8 {
const DISCARD = 0b00000001;
const FORWARDED = 0b00000010;
const REPAIR = 0b00000100;
const SIMPLE_VOTE_TX = 0b00001000;
const TRACER_TX = 0b00010000;
const DISCARD = 0b0000_0001;
const FORWARDED = 0b0000_0010;
const REPAIR = 0b0000_0100;
const SIMPLE_VOTE_TX = 0b0000_1000;
const TRACER_TX = 0b0001_0000;
}
}

Expand Down Expand Up @@ -63,7 +63,7 @@ impl Packet {
let len = wr.position() as usize;
packet.meta.size = len;
if let Some(dest) = dest {
packet.meta.set_addr(dest);
packet.meta.set_socket_addr(dest);
}
Ok(())
}
Expand All @@ -75,7 +75,7 @@ impl fmt::Debug for Packet {
f,
"Packet {{ size: {:?}, addr: {:?} }}",
self.meta.size,
self.meta.addr()
self.meta.socket_addr()
)
}
}
Expand All @@ -99,11 +99,11 @@ impl PartialEq for Packet {
}

impl Meta {
pub fn addr(&self) -> SocketAddr {
pub fn socket_addr(&self) -> SocketAddr {
SocketAddr::new(self.addr, self.port)
}

pub fn set_addr(&mut self, socket_addr: &SocketAddr) {
pub fn set_socket_addr(&mut self, socket_addr: &SocketAddr) {
self.addr = socket_addr.ip();
self.port = socket_addr.port();
}
Expand Down
16 changes: 8 additions & 8 deletions streamer/src/nonblocking/recvmmsg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ pub async fn recv_mmsg(
}
Ok((nrecv, from)) => {
p.meta.size = nrecv;
p.meta.set_addr(&from);
p.meta.set_socket_addr(&from);
}
}
i += 1;
Expand Down Expand Up @@ -68,7 +68,7 @@ mod tests {
assert_eq!(sent, recv);
for packet in packets.iter().take(recv) {
assert_eq!(packet.meta.size, PACKET_DATA_SIZE);
assert_eq!(packet.meta.addr(), saddr);
assert_eq!(packet.meta.socket_addr(), saddr);
}
}

Expand All @@ -94,7 +94,7 @@ mod tests {
assert_eq!(TEST_NUM_MSGS, recv);
for packet in packets.iter().take(recv) {
assert_eq!(packet.meta.size, PACKET_DATA_SIZE);
assert_eq!(packet.meta.addr(), saddr);
assert_eq!(packet.meta.socket_addr(), saddr);
}

packets
Expand All @@ -104,7 +104,7 @@ mod tests {
assert_eq!(sent - TEST_NUM_MSGS, recv);
for packet in packets.iter().take(recv) {
assert_eq!(packet.meta.size, PACKET_DATA_SIZE);
assert_eq!(packet.meta.addr(), saddr);
assert_eq!(packet.meta.socket_addr(), saddr);
}
}

Expand Down Expand Up @@ -136,7 +136,7 @@ mod tests {
assert_eq!(TEST_NUM_MSGS, recv);
for packet in packets.iter().take(recv) {
assert_eq!(packet.meta.size, PACKET_DATA_SIZE);
assert_eq!(packet.meta.addr(), saddr);
assert_eq!(packet.meta.socket_addr(), saddr);
}

packets
Expand Down Expand Up @@ -175,11 +175,11 @@ mod tests {
assert_eq!(TEST_NUM_MSGS, recv);
for packet in packets.iter().take(sent1) {
assert_eq!(packet.meta.size, PACKET_DATA_SIZE);
assert_eq!(packet.meta.addr(), saddr1);
assert_eq!(packet.meta.socket_addr(), saddr1);
}
for packet in packets.iter().skip(sent1).take(recv - sent1) {
assert_eq!(packet.meta.size, PACKET_DATA_SIZE);
assert_eq!(packet.meta.addr(), saddr2);
assert_eq!(packet.meta.socket_addr(), saddr2);
}

packets
Expand All @@ -189,7 +189,7 @@ mod tests {
assert_eq!(sent1 + sent2 - TEST_NUM_MSGS, recv);
for packet in packets.iter().take(recv) {
assert_eq!(packet.meta.size, PACKET_DATA_SIZE);
assert_eq!(packet.meta.addr(), saddr2);
assert_eq!(packet.meta.socket_addr(), saddr2);
}
}
}
10 changes: 5 additions & 5 deletions streamer/src/packet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ pub fn send_to(
socket_addr_space: &SocketAddrSpace,
) -> Result<()> {
for p in &batch.packets {
let addr = p.meta.addr();
let addr = p.meta.socket_addr();
if socket_addr_space.check(&addr) {
socket.send_to(&p.data[..p.meta.size], &addr)?;
}
Expand All @@ -92,7 +92,7 @@ mod tests {
let packets = vec![Packet::default()];
let mut packet_batch = PacketBatch::new(packets);
packet_batch.set_addr(&send_addr);
assert_eq!(packet_batch.packets[0].meta.addr(), send_addr);
assert_eq!(packet_batch.packets[0].meta.socket_addr(), send_addr);
}

#[test]
Expand All @@ -107,7 +107,7 @@ mod tests {
batch.packets.resize(10, Packet::default());

for m in batch.packets.iter_mut() {
m.meta.set_addr(&addr);
m.meta.set_socket_addr(&addr);
m.meta.size = PACKET_DATA_SIZE;
}
send_to(&batch, &send_socket, &SocketAddrSpace::Unspecified).unwrap();
Expand All @@ -122,7 +122,7 @@ mod tests {

for m in &batch.packets {
assert_eq!(m.meta.size, PACKET_DATA_SIZE);
assert_eq!(m.meta.addr(), saddr);
assert_eq!(m.meta.socket_addr(), saddr);
}
}

Expand Down Expand Up @@ -164,7 +164,7 @@ mod tests {
let mut batch = PacketBatch::default();
batch.packets.resize(1, Packet::default());
for m in batch.packets.iter_mut() {
m.meta.set_addr(&addr);
m.meta.set_socket_addr(&addr);
m.meta.size = 1;
}
send_to(&batch, &send_socket, &SocketAddrSpace::Unspecified).unwrap();
Expand Down
2 changes: 1 addition & 1 deletion streamer/src/quic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ fn handle_chunk(
if maybe_batch.is_none() {
let mut batch = PacketBatch::with_capacity(1);
let mut packet = Packet::default();
packet.meta.set_addr(remote_addr);
packet.meta.set_socket_addr(remote_addr);
packet.meta.sender_stake = stake;
batch.packets.push(packet);
*maybe_batch = Some(batch);
Expand Down
18 changes: 9 additions & 9 deletions streamer/src/recvmmsg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ pub fn recv_mmsg(socket: &UdpSocket, packets: &mut [Packet]) -> io::Result</*num
}
Ok((nrecv, from)) => {
p.meta.size = nrecv;
p.meta.set_addr(&from);
p.meta.set_socket_addr(&from);
if i == 0 {
socket.set_nonblocking(true)?;
}
Expand Down Expand Up @@ -107,7 +107,7 @@ pub fn recv_mmsg(sock: &UdpSocket, packets: &mut [Packet]) -> io::Result</*num p
for (addr, hdr, pkt) in izip!(addrs, hdrs, packets.iter_mut()).take(nrecv) {
pkt.meta.size = hdr.msg_len as usize;
if let Some(addr) = cast_socket_addr(&addr, &hdr) {
pkt.meta.set_addr(&addr.to_std());
pkt.meta.set_socket_addr(&addr.to_std());
}
}
Ok(nrecv)
Expand Down Expand Up @@ -148,7 +148,7 @@ mod tests {
assert_eq!(sent, recv);
for packet in packets.iter().take(recv) {
assert_eq!(packet.meta.size, PACKET_DATA_SIZE);
assert_eq!(packet.meta.addr(), saddr);
assert_eq!(packet.meta.socket_addr(), saddr);
}
};

Expand All @@ -174,7 +174,7 @@ mod tests {
assert_eq!(TEST_NUM_MSGS, recv);
for packet in packets.iter().take(recv) {
assert_eq!(packet.meta.size, PACKET_DATA_SIZE);
assert_eq!(packet.meta.addr(), saddr);
assert_eq!(packet.meta.socket_addr(), saddr);
}

packets
Expand All @@ -184,7 +184,7 @@ mod tests {
assert_eq!(sent - TEST_NUM_MSGS, recv);
for packet in packets.iter().take(recv) {
assert_eq!(packet.meta.size, PACKET_DATA_SIZE);
assert_eq!(packet.meta.addr(), saddr);
assert_eq!(packet.meta.socket_addr(), saddr);
}
};

Expand Down Expand Up @@ -216,7 +216,7 @@ mod tests {
assert_eq!(TEST_NUM_MSGS, recv);
for packet in packets.iter().take(recv) {
assert_eq!(packet.meta.size, PACKET_DATA_SIZE);
assert_eq!(packet.meta.addr(), saddr);
assert_eq!(packet.meta.socket_addr(), saddr);
}
reader.set_nonblocking(true).unwrap();

Expand Down Expand Up @@ -256,11 +256,11 @@ mod tests {
assert_eq!(TEST_NUM_MSGS, recv);
for packet in packets.iter().take(sent1) {
assert_eq!(packet.meta.size, PACKET_DATA_SIZE);
assert_eq!(packet.meta.addr(), saddr1);
assert_eq!(packet.meta.socket_addr(), saddr1);
}
for packet in packets.iter().skip(sent1).take(recv - sent1) {
assert_eq!(packet.meta.size, PACKET_DATA_SIZE);
assert_eq!(packet.meta.addr(), saddr2);
assert_eq!(packet.meta.socket_addr(), saddr2);
}

packets
Expand All @@ -270,7 +270,7 @@ mod tests {
assert_eq!(sent1 + sent2 - TEST_NUM_MSGS, recv);
for packet in packets.iter().take(recv) {
assert_eq!(packet.meta.size, PACKET_DATA_SIZE);
assert_eq!(packet.meta.addr(), saddr2);
assert_eq!(packet.meta.socket_addr(), saddr2);
}
}
}
4 changes: 2 additions & 2 deletions streamer/src/streamer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ fn recv_send(
packet_batch.packets.iter().for_each(|p| stats.record(p));
}
let packets = packet_batch.packets.iter().filter_map(|pkt| {
let addr = pkt.meta.addr();
let addr = pkt.meta.socket_addr();
socket_addr_space
.check(&addr)
.then(|| (&pkt.data[..pkt.meta.size], addr))
Expand Down Expand Up @@ -480,7 +480,7 @@ mod test {
{
p.data[0] = i as u8;
p.meta.size = PACKET_DATA_SIZE;
p.meta.set_addr(&addr);
p.meta.set_socket_addr(&addr);
}
packet_batch.packets.push(p);
}
Expand Down

0 comments on commit c248fb3

Please sign in to comment.