Skip to content

Commit

Permalink
network: convert worker-to-primary interface to quic/anemo (MystenLab…
Browse files Browse the repository at this point in the history
  • Loading branch information
aschran authored Sep 13, 2022
1 parent acb6c86 commit daf89be
Show file tree
Hide file tree
Showing 27 changed files with 397 additions and 558 deletions.
22 changes: 6 additions & 16 deletions narwhal/benchmark/benchmark/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,10 +151,7 @@ class Committee:
"authorities: {
"name": {
"stake": 1,
"primary: {
"primary_to_primary": x.x.x.x:x,
"worker_to_primary": x.x.x.x:x,
},
"primary_address": x.x.x.x:x,
"network_key: NETWORK_KEY==
},
...
Expand Down Expand Up @@ -183,15 +180,12 @@ def __init__(self, addresses, base_port):
self.json = {'authorities': OrderedDict(), 'epoch': 0}
for name, (network_name, hosts) in addresses.items():
host = hosts.pop(0)
primary_addr = {
'primary_to_primary': f'/ip4/{host}/tcp/{port}/http',
'worker_to_primary': f'/ip4/{host}/tcp/{port + 1}/http'
}
port += 2
primary_addr = f'/ip4/{host}/tcp/{port}/http'
port += 1

self.json['authorities'][name] = {
'stake': 1,
'primary': primary_addr,
'primary_address': primary_addr,
'network_key': network_name
}

Expand All @@ -201,8 +195,7 @@ def primary_addresses(self, faults=0):
addresses = []
good_nodes = self.size() - faults
for authority in list(self.json['authorities'].values())[:good_nodes]:
addresses += [multiaddr_to_url_data(
authority['primary']['primary_to_primary'])]
addresses += [multiaddr_to_url_data(authority['primary_address'])]
return addresses

def ips(self, name=None):
Expand All @@ -214,10 +207,7 @@ def ips(self, name=None):

ips = set()
for name in names:
addresses = self.json['authorities'][name]['primary']
ips.add(self.ip(addresses['primary_to_primary']))
ips.add(self.ip(addresses['worker_to_primary']))

ips.add(self.ip(self.json['authorities'][name]['primary_address']))
return ips

def remove_nodes(self, nodes):
Expand Down
37 changes: 13 additions & 24 deletions narwhal/config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -468,27 +468,19 @@ impl WorkerCache {
}
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct PrimaryAddresses {
/// Address to receive messages from other primaries (WAN).
pub primary_to_primary: Multiaddr,
/// Address to receive messages from our workers (LAN).
pub worker_to_primary: Multiaddr,
}

#[derive(Clone, Serialize, Deserialize, Debug)]
#[derive(Clone, Serialize, Deserialize, Debug, Eq, PartialEq)]
pub struct Authority {
/// The voting power of this authority.
pub stake: Stake,
/// The network addresses of the primary.
pub primary: PrimaryAddresses,
/// The network address of the primary.
pub primary_address: Multiaddr,
/// Network key of the primary.
pub network_key: NetworkPublicKey,
}

pub type SharedCommittee = Arc<ArcSwap<Committee>>;

#[derive(Clone, Serialize, Deserialize, Debug)]
#[derive(Clone, Serialize, Deserialize, Debug, Eq, PartialEq)]
pub struct Committee {
/// The authorities of epoch.
pub authorities: BTreeMap<PublicKey, Authority>,
Expand Down Expand Up @@ -570,11 +562,11 @@ impl Committee {
.clone()
}

/// Returns the primary addresses of the target primary.
pub fn primary(&self, to: &PublicKey) -> Result<PrimaryAddresses, ConfigError> {
/// Returns the primary address of the target primary.
pub fn primary(&self, to: &PublicKey) -> Result<Multiaddr, ConfigError> {
self.authorities
.get(&to.clone())
.map(|x| x.primary.clone())
.map(|x| x.primary_address.clone())
.ok_or_else(|| ConfigError::NotInCommittee((*to).encode_base64()))
}

Expand All @@ -589,14 +581,14 @@ impl Committee {
pub fn others_primaries(
&self,
myself: &PublicKey,
) -> Vec<(PublicKey, PrimaryAddresses, NetworkPublicKey)> {
) -> Vec<(PublicKey, Multiaddr, NetworkPublicKey)> {
self.authorities
.iter()
.filter(|(name, _)| *name != myself)
.map(|(name, authority)| {
(
name.clone(),
authority.primary.clone(),
authority.primary_address.clone(),
authority.network_key.clone(),
)
})
Expand All @@ -606,10 +598,7 @@ impl Committee {
fn get_all_network_addresses(&self) -> HashSet<&Multiaddr> {
self.authorities
.values()
.flat_map(|authority| {
std::iter::once(&authority.primary.primary_to_primary)
.chain(std::iter::once(&authority.primary.worker_to_primary))
})
.map(|authority| &authority.primary_address)
.collect()
}

Expand All @@ -627,7 +616,7 @@ impl Committee {
/// will generate no update and return a vector of errors.
pub fn update_primary_network_info(
&mut self,
mut new_info: BTreeMap<PublicKey, (Stake, PrimaryAddresses)>,
mut new_info: BTreeMap<PublicKey, (Stake, Multiaddr)>,
) -> Result<(), Vec<CommitteeUpdateError>> {
let mut errors = None;

Expand All @@ -645,13 +634,13 @@ impl Committee {
let res = table
.iter()
.fold(Ok(BTreeMap::new()), |acc, (pk, authority)| {
if let Some((stake, addresses)) = new_info.remove(pk) {
if let Some((stake, address)) = new_info.remove(pk) {
if stake == authority.stake {
match acc {
// No error met yet, update the accumulator
Ok(mut bmap) => {
let mut res = authority.clone();
res.primary = addresses;
res.primary_address = address;
bmap.insert(pk.clone(), res);
Ok(bmap)
}
Expand Down
20 changes: 9 additions & 11 deletions narwhal/config/tests/config_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,10 @@
// 1. Run `cargo insta test --review` under `./config`.
// 2. Review, accept or reject changes.

use config::{
ConsensusAPIGrpcParameters, Import, Parameters, PrimaryAddresses, PrometheusMetricsParameters,
Stake,
};
use config::{ConsensusAPIGrpcParameters, Import, Parameters, PrometheusMetricsParameters, Stake};
use crypto::PublicKey;
use insta::assert_json_snapshot;
use multiaddr::Multiaddr;
use rand::{rngs::StdRng, seq::SliceRandom, SeedableRng};
use std::collections::{BTreeMap, HashMap};
use std::{fs::File, io::Write};
Expand Down Expand Up @@ -71,8 +69,8 @@ fn update_primary_network_info_test() {
let invalid_new_info = committee2
.authorities
.iter()
.map(|(pk, a)| (pk.clone(), (a.stake, a.primary.clone())))
.collect::<BTreeMap<_, (Stake, PrimaryAddresses)>>();
.map(|(pk, a)| (pk.clone(), (a.stake, a.primary_address.clone())))
.collect::<BTreeMap<_, (Stake, Multiaddr)>>();
let res2 = committee
.clone()
.update_primary_network_info(invalid_new_info)
Expand All @@ -90,8 +88,8 @@ fn update_primary_network_info_test() {
.authorities
.iter()
// change the stake
.map(|(pk, a)| (pk.clone(), (a.stake + 1, a.primary.clone())))
.collect::<BTreeMap<_, (Stake, PrimaryAddresses)>>();
.map(|(pk, a)| (pk.clone(), (a.stake + 1, a.primary_address.clone())))
.collect::<BTreeMap<_, (Stake, Multiaddr)>>();
let res2 = committee
.clone()
.update_primary_network_info(invalid_new_info)
Expand All @@ -109,7 +107,7 @@ fn update_primary_network_info_test() {

committee4.authorities.iter().for_each(|(pk, a)| {
pk_n_stake.push((pk.clone(), a.stake));
addresses.push(a.primary.clone())
addresses.push(a.primary_address.clone())
});

let mut rng = rand::thread_rng();
Expand All @@ -119,13 +117,13 @@ fn update_primary_network_info_test() {
.into_iter()
.zip(addresses)
.map(|((pk, stk), addr)| (pk, (stk, addr)))
.collect::<BTreeMap<PublicKey, (Stake, PrimaryAddresses)>>();
.collect::<BTreeMap<PublicKey, (Stake, Multiaddr)>>();

let mut comm = committee;
let res = comm.update_primary_network_info(new_info.clone());
assert!(res.is_ok());
for (pk, a) in comm.authorities.iter() {
assert_eq!(a.primary, new_info.get(pk).unwrap().1);
assert_eq!(a.primary_address, new_info.get(pk).unwrap().1);
}
}

Expand Down
20 changes: 4 additions & 16 deletions narwhal/config/tests/snapshots/config_tests__committee.snap
Original file line number Diff line number Diff line change
Expand Up @@ -6,34 +6,22 @@ expression: committee
"authorities": {
"i9DpjC/kZbDd57csSg7qWFMF3uTECO2jBD3M5BOXU8T7AebTrMO+9D5IbnUI8N7lFhmjEiKFhwUOlFGbx1jkdP212PR/XvnayHxDaJuMkytxACBnshi9TrbZiZP10tqL": {
"stake": 1,
"primary": {
"primary_to_primary": "/ip4/127.0.0.1/tcp/0/http",
"worker_to_primary": "/ip4/127.0.0.1/tcp/0/http"
},
"primary_address": "/ip4/127.0.0.1/tcp/0/http",
"network_key": "+oKq7VFT0pAQvjlvEti2YUw5fHXEYT/bs21P9wTlAKs="
},
"jGbcLB6p5T8JhcF7TnrxmRK208QODFkgpaElCbTrNhn14H7Fbqd/CzBim6HMctdbE5RgeCpfDi+J+0xCtLil+uPSYBAiIOY9B1Tn4YRt7v05iOreTtN/E4VDfRneGhYY": {
"stake": 1,
"primary": {
"primary_to_primary": "/ip4/127.0.0.1/tcp/0/http",
"worker_to_primary": "/ip4/127.0.0.1/tcp/0/http"
},
"primary_address": "/ip4/127.0.0.1/tcp/0/http",
"network_key": "rvP0pLjsod/DQzYb+OQ2vULeklnAS4MU644gVN1ugqs="
},
"ld89LijKV9Ra6U/hkj6PnWKgcDbW4IgHAy1sHOLjHNvxLDoMQYJhDA5yb3+rOIfRBm91BOK0aSlGEtfdAZOB5af1kfxciSrnzDAHDKw1eCMo92nMEHWj5bpJbSlnySf7": {
"stake": 1,
"primary": {
"primary_to_primary": "/ip4/127.0.0.1/tcp/0/http",
"worker_to_primary": "/ip4/127.0.0.1/tcp/0/http"
},
"primary_address": "/ip4/127.0.0.1/tcp/0/http",
"network_key": "r7SQXpVQz0YqggB4JKBV2XWuk0RwxHfZ3bttKyZMNvo="
},
"teRXk6Iq1Pod0UgOcrvBrvVvs8ZpM0bUgbZNL7YpH4n06TvGuiaId9SfpuYe+JOOF4rXiNzQv7CW6npbn3SVZMv9NzwYJcz5RL6F3gAMpNx+o9YM5+gqNtn/OGArUTCC": {
"stake": 1,
"primary": {
"primary_to_primary": "/ip4/127.0.0.1/tcp/0/http",
"worker_to_primary": "/ip4/127.0.0.1/tcp/0/http"
},
"primary_address": "/ip4/127.0.0.1/tcp/0/http",
"network_key": "Kt9mRluFkBunwfq2VREQbBXuSYGOsFo95bA/PIrvVhc="
}
},
Expand Down
Loading

0 comments on commit daf89be

Please sign in to comment.