Skip to content

Commit

Permalink
Align the FastNFT model with objects rather than accounts, and abstra…
Browse files Browse the repository at this point in the history
…ct fastpay specific details (MystenLabs#3)

This PR moves the fastpay logic towards the fastX logic as discussed in the spec doc (https://www.overleaf.com/7111272316rvsqhrmycmwn):

* Rename account to object
* Remove balance
* Repair or silence tests
* Remove cross shard logic
* Remove more of the received infrastructure
* Added object IDs
* Encapsulate the accounts structure in authority
* Added object id and removed amount from transfers. Fixed tests.
* Nix transfer orders, not applicable to fastNFT
* Use object ID to index object state. Tests fixed, but two failing.
* Implemented signature checks and ownership transfer
* Use a address_to_object_id_hack function (to refactor)

Co-authored-by: George Danezis <[email protected]>
Co-authored-by: François Garillot <[email protected]>
  • Loading branch information
3 people authored Nov 30, 2021
1 parent e170b6f commit 9e91225
Show file tree
Hide file tree
Showing 20 changed files with 280 additions and 593 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Rust build directory
/target
.pre-commit*

# Move build directory
build
Expand Down
23 changes: 10 additions & 13 deletions fastpay/src/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,6 @@ struct ClientServerBenchmark {
/// Maximum size of datagrams received and sent (bytes)
#[structopt(long, default_value = transport::DEFAULT_MAX_DATAGRAM_SIZE)]
buffer_size: usize,
/// Number of cross shards messages allowed before blocking the main server loop
#[structopt(long, default_value = "1")]
cross_shard_queue_size: usize,
}

fn main() {
Expand Down Expand Up @@ -125,17 +122,18 @@ impl ClientServerBenchmark {
let mut account_keys = Vec::new();
for _ in 0..self.num_accounts {
let keypair = get_key_pair();
let i = AuthorityState::get_shard(self.num_shards, &keypair.0) as usize;
assert!(states[i].in_shard(&keypair.0));
let client = AccountOffchainState {
balance: Balance::from(Amount::from(100)),
let object_id: ObjectID = address_to_object_id_hack(keypair.0);
let i = AuthorityState::get_shard(self.num_shards, &object_id) as usize;
assert!(states[i].in_shard(&object_id));
let client = ObjectState {
id: object_id,
contents: Vec::new(),
owner: keypair.0,
next_sequence_number: SequenceNumber::from(0),
pending_confirmation: None,
confirmed_log: Vec::new(),
synchronization_log: Vec::new(),
received_log: Vec::new(),
};
states[i].accounts.insert(keypair.0, client);
states[i].insert_object(client);
account_keys.push(keypair);
}

Expand All @@ -145,15 +143,15 @@ impl ClientServerBenchmark {
let mut next_recipient = get_key_pair().0;
for (pubx, secx) in account_keys.iter() {
let transfer = Transfer {
object_id: address_to_object_id_hack(*pubx),
sender: *pubx,
recipient: Address::FastPay(next_recipient),
amount: Amount::from(50),
sequence_number: SequenceNumber::from(0),
user_data: UserData::default(),
};
next_recipient = *pubx;
let order = TransferOrder::new(transfer.clone(), secx);
let shard = AuthorityState::get_shard(self.num_shards, pubx);
let shard = AuthorityState::get_shard(self.num_shards, &order.transfer.object_id);

// Serialize order
let bufx = serialize_transfer_order(&order);
Expand Down Expand Up @@ -187,7 +185,6 @@ impl ClientServerBenchmark {
self.port,
state,
self.buffer_size,
self.cross_shard_queue_size,
);
server.spawn().await.unwrap()
}
Expand Down
8 changes: 5 additions & 3 deletions fastpay/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,9 @@ fn make_benchmark_transfer_orders(
let mut next_recipient = get_key_pair().0;
for account in accounts_config.accounts_mut() {
let transfer = Transfer {
object_id: address_to_object_id_hack(account.address),
sender: account.address,
recipient: Address::FastPay(next_recipient),
amount: Amount::from(1),
sequence_number: account.next_sequence_number,
user_data: UserData::default(),
};
Expand Down Expand Up @@ -220,7 +220,9 @@ async fn mass_broadcast_orders(
// Re-index orders by shard for this particular authority client.
let mut sharded_requests = HashMap::new();
for (address, buf) in &orders {
let shard = AuthorityState::get_shard(num_shards, address);
// TODO: fix this
let id: ObjectID = address_to_object_id_hack(*address);
let shard = AuthorityState::get_shard(num_shards, &id);
sharded_requests
.entry(shard)
.or_insert_with(Vec::new)
Expand Down Expand Up @@ -502,7 +504,7 @@ fn main() {
.iter()
.fold(0, |acc, buf| match deserialize_response(&buf[..]) {
Some(info) => {
confirmed.insert(info.sender);
confirmed.insert(info.object_id);
acc + 1
}
None => acc,
Expand Down
1 change: 0 additions & 1 deletion fastpay/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,6 @@ impl AccountsConfig {
.received_certificates
.binary_search_by_key(&certificate.key(), CertifiedTransferOrder::key)
{
config.balance = config.balance.try_add(transfer.amount.into()).unwrap();
config.received_certificates.insert(position, certificate)
}
}
Expand Down
90 changes: 6 additions & 84 deletions fastpay/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::transport::*;
use fastpay_core::{authority::*, base_types::*, client::*, error::*, messages::*, serialize::*};

use bytes::Bytes;
use futures::{channel::mpsc, future::FutureExt, sink::SinkExt, stream::StreamExt};
use futures::future::FutureExt;
use log::*;
use std::io;
use tokio::time;
Expand All @@ -16,7 +16,6 @@ pub struct Server {
base_port: u32,
state: AuthorityState,
buffer_size: usize,
cross_shard_queue_size: usize,
// Stats
packets_processed: u64,
user_errors: u64,
Expand All @@ -29,15 +28,13 @@ impl Server {
base_port: u32,
state: AuthorityState,
buffer_size: usize,
cross_shard_queue_size: usize,
) -> Self {
Self {
network_protocol,
base_address,
base_port,
state,
buffer_size,
cross_shard_queue_size,
packets_processed: 0,
user_errors: 0,
}
Expand All @@ -51,41 +48,6 @@ impl Server {
self.user_errors
}

async fn forward_cross_shard_queries(
network_protocol: NetworkProtocol,
base_address: String,
base_port: u32,
this_shard: ShardId,
mut receiver: mpsc::Receiver<(Vec<u8>, ShardId)>,
) {
let mut pool = network_protocol
.make_outgoing_connection_pool()
.await
.expect("Initialization should not fail");

let mut queries_sent = 0u64;
while let Some((buf, shard)) = receiver.next().await {
// Send cross-shard query.
let remote_address = format!("{}:{}", base_address, base_port + shard);
let status = pool.send_data_to(&buf, &remote_address).await;
if let Err(error) = status {
error!("Failed to send cross-shard query: {}", error);
} else {
debug!("Sent cross shard query: {} -> {}", this_shard, shard);
queries_sent += 1;
if queries_sent % 2000 == 0 {
info!(
"{}:{} (shard {}) has sent {} cross-shard queries",
base_address,
base_port + this_shard,
this_shard,
queries_sent
);
}
}
}
}

pub async fn spawn(self) -> Result<SpawnedServer, io::Error> {
info!(
"Listening to {} traffic on {}:{}",
Expand All @@ -99,29 +61,16 @@ impl Server {
self.base_port + self.state.shard_id
);

let (cross_shard_sender, cross_shard_receiver) = mpsc::channel(self.cross_shard_queue_size);
tokio::spawn(Self::forward_cross_shard_queries(
self.network_protocol,
self.base_address.clone(),
self.base_port,
self.state.shard_id,
cross_shard_receiver,
));

let buffer_size = self.buffer_size;
let protocol = self.network_protocol;
let state = RunningServerState {
server: self,
cross_shard_sender,
};
let state = RunningServerState { server: self };
// Launch server for the appropriate protocol.
protocol.spawn_server(&address, state, buffer_size).await
}
}

struct RunningServerState {
server: Server,
cross_shard_sender: mpsc::Sender<(Vec<u8>, ShardId)>,
}

impl MessageHandler for RunningServerState {
Expand Down Expand Up @@ -149,21 +98,7 @@ impl MessageHandler for RunningServerState {
.state
.handle_confirmation_order(confirmation_order)
{
Ok((info, send_shard)) => {
// Send a message to other shard
if let Some(cross_shard_update) = send_shard {
let shard = cross_shard_update.shard_id;
let tmp_out = serialize_cross_shard(&message);
debug!(
"Scheduling cross shard query: {} -> {}",
self.server.state.shard_id, shard
);
self.cross_shard_sender
.send((tmp_out, shard))
.await
.expect("internal channel should not fail");
};

Ok(info) => {
// Response
Ok(Some(serialize_info_response(&info)))
}
Expand All @@ -175,19 +110,6 @@ impl MessageHandler for RunningServerState {
.state
.handle_account_info_request(*message)
.map(|info| Some(serialize_info_response(&info))),
SerializedMessage::CrossShard(message) => {
match self
.server
.state
.handle_cross_shard_recipient_commit(*message)
{
Ok(_) => Ok(None), // Nothing to reply
Err(error) => {
error!("Failed to handle cross-shard query: {}", error);
Ok(None) // Nothing to reply
}
}
}
_ => Err(FastPayError::UnexpectedMessage),
}
}
Expand Down Expand Up @@ -293,7 +215,7 @@ impl AuthorityClient for Client {
order: TransferOrder,
) -> AsyncResult<'_, AccountInfoResponse, FastPayError> {
Box::pin(async move {
let shard = AuthorityState::get_shard(self.num_shards, &order.transfer.sender);
let shard = AuthorityState::get_shard(self.num_shards, &order.transfer.object_id);
self.send_recv_bytes(shard, serialize_transfer_order(&order))
.await
})
Expand All @@ -307,7 +229,7 @@ impl AuthorityClient for Client {
Box::pin(async move {
let shard = AuthorityState::get_shard(
self.num_shards,
&order.transfer_certificate.value.transfer.sender,
&order.transfer_certificate.value.transfer.object_id,
);
self.send_recv_bytes(shard, serialize_cert(&order.transfer_certificate))
.await
Expand All @@ -320,7 +242,7 @@ impl AuthorityClient for Client {
request: AccountInfoRequest,
) -> AsyncResult<'_, AccountInfoResponse, FastPayError> {
Box::pin(async move {
let shard = AuthorityState::get_shard(self.num_shards, &request.sender);
let shard = AuthorityState::get_shard(self.num_shards, &request.object_id);
self.send_recv_bytes(shard, serialize_info_request(&request))
.await
})
Expand Down
28 changes: 13 additions & 15 deletions fastpay/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ fn make_shard_server(
committee_config_path: &str,
initial_accounts_config_path: &str,
buffer_size: usize,
cross_shard_queue_size: usize,
shard: u32,
) -> network::Server {
let server_config =
Expand All @@ -40,19 +39,23 @@ fn make_shard_server(
);

// Load initial states
for (address, balance) in &initial_accounts_config.accounts {
if AuthorityState::get_shard(num_shards, address) != shard {
for (address, _balance) in &initial_accounts_config.accounts {
// TODO: fix this total hack
let id: ObjectID = address_to_object_id_hack(*address);

if AuthorityState::get_shard(num_shards, &id) != shard {
continue;
}
let client = AccountOffchainState {
balance: *balance,

let client = ObjectState {
id,
contents: Vec::new(),
owner: *address,
next_sequence_number: SequenceNumber::from(0),
pending_confirmation: None,
confirmed_log: Vec::new(),
synchronization_log: Vec::new(),
received_log: Vec::new(),
};
state.accounts.insert(*address, client);
state.insert_object(client);
}

network::Server::new(
Expand All @@ -61,7 +64,6 @@ fn make_shard_server(
server_config.authority.base_port,
state,
buffer_size,
cross_shard_queue_size,
)
}

Expand All @@ -71,7 +73,6 @@ fn make_servers(
committee_config_path: &str,
initial_accounts_config_path: &str,
buffer_size: usize,
cross_shard_queue_size: usize,
) -> Vec<network::Server> {
let server_config =
AuthorityServerConfig::read(server_config_path).expect("Fail to read server config");
Expand All @@ -85,7 +86,6 @@ fn make_servers(
committee_config_path,
initial_accounts_config_path,
buffer_size,
cross_shard_queue_size,
shard,
))
}
Expand Down Expand Up @@ -118,7 +118,7 @@ enum ServerCommands {

/// Number of cross shards messages allowed before blocking the main server loop
#[structopt(long, default_value = "1000")]
cross_shard_queue_size: usize,
_cross_shard_queue_size: usize, // TODO: remove this once client is re-factored.

/// Path to the file containing the public description of all authorities in this FastPay committee
#[structopt(long)]
Expand Down Expand Up @@ -163,7 +163,7 @@ fn main() {
match options.cmd {
ServerCommands::Run {
buffer_size,
cross_shard_queue_size,
_cross_shard_queue_size, // TODO: remove this once client is re-factored.
committee,
initial_accounts,
shard,
Expand All @@ -178,7 +178,6 @@ fn main() {
&committee,
&initial_accounts,
buffer_size,
cross_shard_queue_size,
shard,
);
vec![server]
Expand All @@ -191,7 +190,6 @@ fn main() {
&committee,
&initial_accounts,
buffer_size,
cross_shard_queue_size,
)
}
};
Expand Down
Loading

0 comments on commit 9e91225

Please sign in to comment.