Skip to content

Commit

Permalink
ref: don't start listener for crawler
Browse files Browse the repository at this point in the history
  • Loading branch information
niklaslong committed Sep 2, 2021
1 parent 9d9a20e commit 91f05d4
Show file tree
Hide file tree
Showing 16 changed files with 49 additions and 72 deletions.
4 changes: 0 additions & 4 deletions bin/crawler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,6 @@ async fn start_server(config: Config) -> anyhow::Result<()> {
// Initialize metrics framework.
node.initialize_metrics().await?;

// Start listening for incoming connections.
// TODO: remove this; a crawler doesn't need to have a listener.
node.listen().await?;

// Start RPC thread, if the RPC configuration is enabled.
if config.rpc.json_rpc {
let rpc_address = format!("{}:{}", config.rpc.ip, config.rpc.port)
Expand Down
1 change: 0 additions & 1 deletion network/src/inbound/inbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ impl Node {
let listener = TcpListener::bind(&self.config.desired_address).await?;
let own_listener_address = listener.local_addr()?;

self.set_local_address(own_listener_address);
info!("Initializing listener for node ({:x})", self.id);

let node_clone = self.clone();
Expand Down
22 changes: 5 additions & 17 deletions network/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ pub struct InnerNode {
/// The current state of the node.
state: StateCode,
/// The local address of this node.
pub local_address: OnceCell<SocketAddr>,
pub local_address: SocketAddr,
/// The pre-configured parameters of this node.
pub config: Config,
/// The cache of node's inbound messages.
Expand Down Expand Up @@ -122,7 +122,7 @@ impl Node {
let node = Self(Arc::new(InnerNode {
id: thread_rng().gen(),
state: Default::default(),
local_address: Default::default(),
local_address: config.desired_address,
config,
storage,
inbound_cache: Default::default(),
Expand Down Expand Up @@ -318,23 +318,15 @@ impl Node {
}

#[inline]
pub fn local_address(&self) -> Option<SocketAddr> {
self.local_address.get().copied()
pub fn local_address(&self) -> SocketAddr {
self.local_address
}

#[inline]
pub fn is_shutting_down(&self) -> bool {
self.shutting_down.load(Ordering::Relaxed)
}

/// Sets the local address of the node to the given value.
#[inline]
pub fn set_local_address(&self, addr: SocketAddr) {
self.local_address
.set(addr)
.expect("local address was set more than once!");
}

pub async fn initialize_metrics(&self) -> Result<()> {
debug!("Initializing metrics");
if let Some(metrics_task) = snarkos_metrics::initialize() {
Expand All @@ -353,11 +345,7 @@ impl Node {
}

pub fn version(&self) -> Version {
Version::new(
crate::PROTOCOL_VERSION,
self.local_address().map(|x| x.port()).unwrap_or_default(),
self.id,
)
Version::new(crate::PROTOCOL_VERSION, self.local_address().port(), self.id)
}

pub async fn run_sync(&self) -> Result<()> {
Expand Down
8 changes: 4 additions & 4 deletions network/src/peers/peers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ impl Node {
debug!("Connecting to {}...", remote_address);

// Local address must be known by now.
let own_address = self.local_address().unwrap();
let own_address = self.local_address();

// Don't connect if maximum number of connections has been reached.
if !self.can_connect() {
Expand Down Expand Up @@ -178,7 +178,7 @@ impl Node {
///
pub async fn connect_to_addresses(&self, addrs: &[SocketAddr]) {
// Local address must be known by now.
let own_address = self.local_address().unwrap();
let own_address = self.local_address();

for node_addr in addrs
.iter()
Expand Down Expand Up @@ -210,7 +210,7 @@ impl Node {
///
async fn connect_to_disconnected_peers(&self, count: usize) {
// Local address must be known by now.
let own_address = self.local_address().unwrap();
let own_address = self.local_address();

// If this node is not a bootnode, attempt to satisfy the minimum number of peer connections.
let random_peers = {
Expand Down Expand Up @@ -366,7 +366,7 @@ impl Node {
/// Add all new/updated addresses to our disconnected.
/// The connection handler will be responsible for sending out handshake requests to them.
pub(crate) async fn process_inbound_peers(&self, source: SocketAddr, peers: Vec<SocketAddr>) {
let local_address = self.local_address().unwrap(); // the address must be known by now
let local_address = self.local_address(); // the address must be known by now

for peer_address in peers.iter().filter(|&peer_addr| *peer_addr != local_address) {
// Inform the peer book that we found a peer.
Expand Down
2 changes: 1 addition & 1 deletion network/src/sync/memory_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ impl Node {
) {
debug!("Propagating a memory pool transaction to connected peers");

let local_address = self.local_address().unwrap();
let local_address = self.local_address();

for remote_address in self.connected_peers() {
if remote_address != transaction_sender && remote_address != local_address {
Expand Down
2 changes: 1 addition & 1 deletion network/src/sync/miner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ impl MinerInstance {
/// Calling this function multiple times will spawn additional listeners on separate threads.
pub fn spawn(self) -> task::JoinHandle<()> {
task::spawn(async move {
let local_address = self.node.local_address().unwrap();
let local_address = self.node.local_address();

info!("Initializing Aleo miner - Your miner address is {}", self.miner_address);

Expand Down
6 changes: 3 additions & 3 deletions network/tests/cleanup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ async fn check_connection_task_cleanup() {
// Breach the usual ulimit barriers.
for _ in 0..10_000 {
// Connect a peer.
let peer = handshaken_peer(node.local_address().unwrap()).await;
let peer = handshaken_peer(node.local_address()).await;
wait_until!(5, node.peer_book.get_active_peer_count() == 1);

// Drop the peer stream.
Expand All @@ -59,7 +59,7 @@ async fn check_inactive_conn_cleanup() {
let node = test_node(setup).await;

// A connection with a peer that will remain inactive.
let _peer = handshaken_peer(node.local_address().unwrap()).await;
let _peer = handshaken_peer(node.local_address()).await;

// Wait until the connection is complete.
wait_until!(1, node.peer_book.get_active_peer_count() == 1);
Expand Down Expand Up @@ -106,7 +106,7 @@ async fn check_node_connections_cleanup() {

for i in 0..NUM_CONNS {
// Connect a peer.
let peer = handshaken_peer(node.local_address().unwrap()).await;
let peer = handshaken_peer(node.local_address()).await;
let addr = peer.addr;
wait_until!(5, node.peer_book.get_active_peer_count() == 1);

Expand Down
16 changes: 8 additions & 8 deletions network/tests/fuzzing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ async fn fuzzing_zeroes_pre_handshake() {
..Default::default()
};
let node = test_node(node_setup).await;
let node_addr = node.local_address().unwrap();
let node_addr = node.local_address();

let mut stream = TcpStream::connect(node_addr).await.unwrap();
wait_until!(1, node.peer_book.get_active_peer_count() == 1);
Expand Down Expand Up @@ -91,7 +91,7 @@ async fn fuzzing_valid_header_pre_handshake() {
..Default::default()
};
let node = test_node(node_setup).await;
let node_addr = node.local_address().unwrap();
let node_addr = node.local_address();

for _ in 0..ITERATIONS {
let random_len: usize = thread_rng().gen_range(1..(64 * 1024));
Expand Down Expand Up @@ -144,7 +144,7 @@ async fn fuzzing_pre_handshake() {
..Default::default()
};
let node = test_node(node_setup).await;
let node_addr = node.local_address().unwrap();
let node_addr = node.local_address();

for _ in 0..ITERATIONS {
let random_len: usize = thread_rng().gen_range(1..(64 * 1024));
Expand Down Expand Up @@ -195,7 +195,7 @@ async fn fuzzing_corrupted_version_pre_handshake() {
};

let node = test_node(node_setup).await;
let node_addr = node.local_address().unwrap();
let node_addr = node.local_address();

for i in 0..ITERATIONS {
let mut stream = TcpStream::connect(node_addr).await.unwrap();
Expand Down Expand Up @@ -264,7 +264,7 @@ async fn fuzzing_corrupted_empty_payloads_pre_handshake() {
};

let node = test_node(node_setup).await;
let node_addr = node.local_address().unwrap();
let node_addr = node.local_address();

for payload in &[Payload::GetMemoryPool, Payload::GetPeers, Payload::Pong] {
let serialized = Payload::serialize(payload).unwrap();
Expand Down Expand Up @@ -329,7 +329,7 @@ async fn fuzzing_corrupted_payloads_with_bodies_pre_handshake() {
};

let node = test_node(node_setup).await;
let node_addr = node.local_address().unwrap();
let node_addr = node.local_address();

let mut rng = thread_rng();
let random_len: usize = rng.gen_range(1..(64 * 1024));
Expand Down Expand Up @@ -448,7 +448,7 @@ async fn fuzzing_corrupted_payloads_with_hashes_pre_handshake() {
};

let node = test_node(node_setup).await;
let node_addr = node.local_address().unwrap();
let node_addr = node.local_address();

let hashes: Vec<BlockHeaderHash> = (0u8..10).map(|i| BlockHeaderHash::new(vec![i; 32])).collect();

Expand Down Expand Up @@ -527,7 +527,7 @@ async fn connection_request_spam() {
};

let node = test_node(node_setup).await;
let node_addr = node.local_address().unwrap();
let node_addr = node.local_address();

let sockets = Arc::new(Mutex::new(Vec::with_capacity(NUM_ATTEMPTS)));

Expand Down
24 changes: 12 additions & 12 deletions network/tests/handshake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ async fn handshake_responder_side() {
..Default::default()
};
let node = test_node(setup).await;
let node_listener = node.local_address().unwrap();
let node_listener = node.local_address();

// set up a fake node (peer), which is just a socket
let mut peer_stream = TcpStream::connect(&node_listener).await.unwrap();
Expand Down Expand Up @@ -169,7 +169,7 @@ async fn reject_non_version_messages_before_handshake() {

// start the fake node (peer) which is just a socket
// note: the connection needs to be re-established as it is reset
let mut peer_stream = TcpStream::connect(node.local_address().unwrap()).await.unwrap();
let mut peer_stream = TcpStream::connect(node.local_address()).await.unwrap();

// send a GetPeers message without a prior handshake established
write_message_to_stream(Payload::GetPeers, &mut peer_stream).await;
Expand All @@ -179,56 +179,56 @@ async fn reject_non_version_messages_before_handshake() {
assert_node_rejected_message(&node, &mut peer_stream).await;

// GetMemoryPool
let mut peer_stream = TcpStream::connect(node.local_address().unwrap()).await.unwrap();
let mut peer_stream = TcpStream::connect(node.local_address()).await.unwrap();
write_message_to_stream(Payload::GetMemoryPool, &mut peer_stream).await;
assert_node_rejected_message(&node, &mut peer_stream).await;

// GetBlock
let mut peer_stream = TcpStream::connect(node.local_address().unwrap()).await.unwrap();
let mut peer_stream = TcpStream::connect(node.local_address()).await.unwrap();
let block_hash = BlockHeaderHash::new([0u8; 32].to_vec());
write_message_to_stream(Payload::GetBlocks(vec![block_hash]), &mut peer_stream).await;
assert_node_rejected_message(&node, &mut peer_stream).await;

// GetSync
let mut peer_stream = TcpStream::connect(node.local_address().unwrap()).await.unwrap();
let mut peer_stream = TcpStream::connect(node.local_address()).await.unwrap();
let block_hash = BlockHeaderHash::new([0u8; 32].to_vec());
write_message_to_stream(Payload::GetSync(vec![block_hash]), &mut peer_stream).await;
assert_node_rejected_message(&node, &mut peer_stream).await;

// Peers
let mut peer_stream = TcpStream::connect(node.local_address().unwrap()).await.unwrap();
let mut peer_stream = TcpStream::connect(node.local_address()).await.unwrap();
let peers = vec!["127.0.0.1:0".parse().unwrap()];
write_message_to_stream(Payload::Peers(peers), &mut peer_stream).await;
assert_node_rejected_message(&node, &mut peer_stream).await;

// MemoryPool
let mut peer_stream = TcpStream::connect(node.local_address().unwrap()).await.unwrap();
let mut peer_stream = TcpStream::connect(node.local_address()).await.unwrap();
let memory_pool = vec![vec![0u8, 10]];
write_message_to_stream(Payload::MemoryPool(memory_pool), &mut peer_stream).await;
assert_node_rejected_message(&node, &mut peer_stream).await;

// Block
let mut peer_stream = TcpStream::connect(node.local_address().unwrap()).await.unwrap();
let mut peer_stream = TcpStream::connect(node.local_address()).await.unwrap();
let block = vec![0u8, 10];
let height = None;
write_message_to_stream(Payload::Block(block, height), &mut peer_stream).await;
assert_node_rejected_message(&node, &mut peer_stream).await;

// SyncBlock
let mut peer_stream = TcpStream::connect(node.local_address().unwrap()).await.unwrap();
let mut peer_stream = TcpStream::connect(node.local_address()).await.unwrap();
let sync_block = vec![0u8, 10];
let height = Some(1);
write_message_to_stream(Payload::SyncBlock(sync_block, height), &mut peer_stream).await;
assert_node_rejected_message(&node, &mut peer_stream).await;

// Sync
let mut peer_stream = TcpStream::connect(node.local_address().unwrap()).await.unwrap();
let mut peer_stream = TcpStream::connect(node.local_address()).await.unwrap();
let block_hash = BlockHeaderHash::new(vec![0u8; 32]);
write_message_to_stream(Payload::Sync(vec![block_hash]), &mut peer_stream).await;
assert_node_rejected_message(&node, &mut peer_stream).await;

// Transaction
let mut peer_stream = TcpStream::connect(node.local_address().unwrap()).await.unwrap();
let mut peer_stream = TcpStream::connect(node.local_address()).await.unwrap();
let transaction = vec![0u8, 10];
write_message_to_stream(Payload::Transaction(transaction), &mut peer_stream).await;
assert_node_rejected_message(&node, &mut peer_stream).await;
Expand Down Expand Up @@ -276,7 +276,7 @@ async fn handshake_timeout_responder_side() {
..Default::default()
};
let node = test_node(setup).await;
let node_addr = node.local_address().unwrap();
let node_addr = node.local_address();

// set up a "peer" that won't perform a valid handshake
let _fake_peer = TcpStream::connect(node_addr).await.unwrap();
Expand Down
2 changes: 1 addition & 1 deletion network/tests/peers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ async fn bootnode_peer_propagation() {

// Spin up and connect nodes A and B.
let node_alice = test_node(setup(true, vec![])).await;
let addr_alice = node_alice.local_address().unwrap();
let addr_alice = node_alice.local_address();

// Connect B to A.
let node_bob = test_node(setup(false, vec![addr_alice.to_string()])).await;
Expand Down
4 changes: 2 additions & 2 deletions network/tests/topology.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,8 +208,8 @@ async fn binary_star_contact() {
let bootnode_a = test_node(bootnode_setup.clone()).await;
let bootnode_b = test_node(bootnode_setup).await;

let ba = bootnode_a.local_address().unwrap().to_string();
let bb = bootnode_b.local_address().unwrap().to_string();
let ba = bootnode_a.local_address().to_string();
let bb = bootnode_b.local_address().to_string();

// Create the nodes to be used as the leafs in the stars.
let setup = TestSetup {
Expand Down
10 changes: 4 additions & 6 deletions rpc/tests/protected_rpc_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -465,16 +465,14 @@ mod protected_rpc_tests {
};
let some_node = test_node(setup).await;

some_node
.connect_to_addresses(&[rpc_node.local_address().unwrap()])
.await;
some_node.connect_to_addresses(&[rpc_node.local_address()]).await;

wait_until!(3, rpc_node.peer_book.get_connected_peer_count() == 1);

let meta = authentication();
let request = format!(
"{{ \"jsonrpc\":\"2.0\", \"id\": 1, \"method\": \"disconnect\", \"params\": [\"{}\"] }}",
some_node.local_address().unwrap()
some_node.local_address()
);
let _response = rpc.handle_request(&request, meta).await.unwrap();

Expand All @@ -497,8 +495,8 @@ mod protected_rpc_tests {
let meta = authentication();
let request = format!(
"{{ \"jsonrpc\":\"2.0\", \"id\": 1, \"method\": \"connect\", \"params\": [\"{}\", \"{}\"] }}",
some_node1.local_address().unwrap(),
some_node2.local_address().unwrap()
some_node1.local_address(),
some_node2.local_address()
);
let _response = rpc.handle_request(&request, meta).await.unwrap();

Expand Down
8 changes: 2 additions & 6 deletions rpc/tests/rpc_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -377,12 +377,8 @@ mod rpc_tests {
let some_node1 = test_node(setup.clone()).await;
let some_node2 = test_node(setup).await;

rpc_node
.connect_to_addresses(&[some_node1.local_address().unwrap()])
.await;
some_node1
.connect_to_addresses(&[some_node2.local_address().unwrap()])
.await;
rpc_node.connect_to_addresses(&[some_node1.local_address()]).await;
some_node1.connect_to_addresses(&[some_node2.local_address()]).await;

wait_until!(3, rpc_node.peer_book.get_connected_peer_count() == 1);
wait_until!(3, some_node1.peer_book.get_connected_peer_count() == 2);
Expand Down
2 changes: 1 addition & 1 deletion testing/src/network/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ pub async fn handshaken_peer(node_listener: SocketAddr) -> FakeNode {
pub async fn handshaken_node_and_peer(node_setup: TestSetup) -> (Node, FakeNode) {
// start a test node and listen for incoming connections
let node = test_node(node_setup).await;
let node_listener = node.local_address().unwrap();
let node_listener = node.local_address();
let fake_node = handshaken_peer(node_listener).await;

(node, fake_node)
Expand Down
Loading

0 comments on commit 91f05d4

Please sign in to comment.