Skip to content

Commit

Permalink
Merge pull request ProvableHQ#1570 from niklaslong/deferred_transacti…
Browse files Browse the repository at this point in the history
…ons_v2

Feat: deferred de/serialisation for transactions
  • Loading branch information
howardwu authored Jan 20, 2022
2 parents 506fcbf + 73cec1f commit e0d6639
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 23 deletions.
6 changes: 3 additions & 3 deletions src/network/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ pub enum Message<N: Network, E: Environment> {
/// UnconfirmedBlock := (block_height, block_hash, block)
UnconfirmedBlock(u32, N::BlockHash, Data<Block<N>>),
/// UnconfirmedTransaction := (transaction)
UnconfirmedTransaction(Transaction<N>),
UnconfirmedTransaction(Data<Transaction<N>>),
/// PoolRegister := (address)
PoolRegister(Address<N>),
/// PoolRequest := (share_difficulty, block_template)
Expand Down Expand Up @@ -198,7 +198,7 @@ impl<N: Network, E: Environment> Message<N, E> {
writer.write_all(&block_hash.to_bytes_le()?)?;
block.serialize_blocking_into(writer)
}
Self::UnconfirmedTransaction(transaction) => Ok(bincode::serialize_into(writer, transaction)?),
Self::UnconfirmedTransaction(transaction) => Ok(transaction.serialize_blocking_into(writer)?),
Self::PoolRegister(address) => Ok(bincode::serialize_into(writer, address)?),
Self::PoolRequest(share_difficulty, block_template) => {
bincode::serialize_into(&mut *writer, share_difficulty)?;
Expand Down Expand Up @@ -271,7 +271,7 @@ impl<N: Network, E: Environment> Message<N, E> {
bincode::deserialize(&data[4..36])?,
Data::Buffer(data[36..].to_vec().into()),
),
10 => Self::UnconfirmedTransaction(bincode::deserialize(data)?),
10 => Self::UnconfirmedTransaction(Data::Buffer(data.to_vec().into())),
11 => Self::PoolRegister(bincode::deserialize(data)?),
12 => Self::PoolRequest(bincode::deserialize(&data[0..8])?, Data::Buffer(data[8..].to_vec().into())),
13 => Self::PoolResponse(
Expand Down
57 changes: 38 additions & 19 deletions src/network/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,13 @@ impl<N: Network, E: Environment> Peer<N, E> {

is_ready_to_send
}
Message::UnconfirmedTransaction(ref transaction) => {
Message::UnconfirmedTransaction(ref mut data) => {
let transaction = if let Data::Object(transaction) = data {
transaction
} else {
panic!("Logic error: the transaction shouldn't have been serialized yet.");
};

// Retrieve the last seen timestamp of this transaction for this peer.
let last_seen = peer
.seen_outbound_transactions
Expand All @@ -403,12 +409,16 @@ impl<N: Network, E: Environment> Peer<N, E> {
// Report the unconfirmed block height.
if is_ready_to_send {
trace!(
"Preparing to send '{} {}' to {}",
message.name(),
"Preparing to send 'UnconfirmedTransaction {}' to {}",
transaction.transaction_id(),
peer_ip
);
}

// Perform non-blocking serialization of the transaction.
let serialized_transaction = Data::serialize(data.clone()).await.expect("Transaction serialization is bugged");
let _ = std::mem::replace(data, Data::Buffer(serialized_transaction));

is_ready_to_send
}
_ => true,
Expand Down Expand Up @@ -668,24 +678,33 @@ impl<N: Network, E: Environment> Peer<N, E> {
break;
}

// Retrieve the last seen timestamp of the received transaction.
let last_seen = peer.seen_inbound_transactions.entry(transaction.transaction_id()).or_insert(SystemTime::UNIX_EPOCH);
let is_router_ready = last_seen.elapsed().unwrap().as_secs() > E::RADIO_SILENCE_IN_SECS;

// Update the timestamp for the received transaction.
peer.seen_inbound_transactions.insert(transaction.transaction_id(), SystemTime::now());

// Ensure the node is not peering.
let is_node_ready = !E::status().is_peering();
// Perform the deferred non-blocking deserialisation of the
// transaction.
match transaction.deserialize().await {
Ok(transaction) => {
// Retrieve the last seen timestamp of the received transaction.
let last_seen = peer.seen_inbound_transactions.entry(transaction.transaction_id()).or_insert(SystemTime::UNIX_EPOCH);
let is_router_ready = last_seen.elapsed().unwrap().as_secs() > E::RADIO_SILENCE_IN_SECS;

// Update the timestamp for the received transaction.
peer.seen_inbound_transactions.insert(transaction.transaction_id(), SystemTime::now());

// Ensure the node is not peering.
let is_node_ready = !E::status().is_peering();

// If this node is a beacon or sync node, skip this message, after updating the timestamp.
if E::NODE_TYPE == NodeType::Beacon || E::NODE_TYPE == NodeType::Sync || !is_router_ready || !is_node_ready {
trace!("Skipping 'UnconfirmedTransaction {}' from {}", transaction.transaction_id(), peer_ip);
} else {
// Route the `UnconfirmedTransaction` to the prover.
if let Err(error) = prover_router.send(ProverRequest::UnconfirmedTransaction(peer_ip, transaction)).await {
warn!("[UnconfirmedTransaction] {}", error);

}
}

// If this node is a beacon or sync node, skip this message, after updating the timestamp.
if E::NODE_TYPE == NodeType::Beacon || E::NODE_TYPE == NodeType::Sync || !is_router_ready || !is_node_ready {
trace!("Skipping 'UnconfirmedTransaction {}' from {}", transaction.transaction_id(), peer_ip);
} else {
// Route the `UnconfirmedTransaction` to the prover.
if let Err(error) = prover_router.send(ProverRequest::UnconfirmedTransaction(peer_ip, transaction)).await {
warn!("[UnconfirmedTransaction] {}", error);
}
Err(error) => warn!("[UnconfirmedTransaction] {}", error)
}
}
Message::PoolRegister(address) => {
Expand Down
2 changes: 1 addition & 1 deletion src/network/prover.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ impl<N: Network, E: Environment> Prover<N, E> {
match self.memory_pool.write().await.add_transaction(&transaction) {
Ok(()) => {
// Upon success, propagate the unconfirmed transaction to the connected peers.
let request = PeersRequest::MessagePropagate(peer_ip, Message::UnconfirmedTransaction(transaction));
let request = PeersRequest::MessagePropagate(peer_ip, Message::UnconfirmedTransaction(Data::Object(transaction)));
if let Err(error) = self.peers_router.send(request).await {
warn!("[UnconfirmedTransaction] {}", error);
}
Expand Down

0 comments on commit e0d6639

Please sign in to comment.