Skip to content

Commit

Permalink
execution driver can sync missing parent certificates (MystenLabs#3450)
Browse files Browse the repository at this point in the history
* Remove exec driver functionality from main sync path (partial revert of 6e78098)

* Move common shuffle_by_stake code to committee

* Add execute_cert_to_true_effects

* Add test and fix bug

* The execution driver path in NodeSync syncs mising parent certificates.

* Move wait_for_tx, wait_for_all_txes to test-utils

* Tests for parent execution

* Don't log that the cert finished if it was an error

* Fix lint

* HashSet -> BTreeSet
  • Loading branch information
mystenmark authored Jul 26, 2022
1 parent a512b81 commit 742da76
Show file tree
Hide file tree
Showing 15 changed files with 781 additions and 275 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 6 additions & 9 deletions crates/sui-core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1401,23 +1401,20 @@ impl AuthorityState {
let notifier_ticket = self.batch_notifier.ticket()?;
let seq = notifier_ticket.seq();

let digest = certificate.digest();
let effects_digest = &signed_effects.digest();
let res = self
.database
self.database
.update_state(
temporary_store,
certificate,
seq,
signed_effects,
effects_digest,
)
.await;

let digest = certificate.digest();

debug!(?digest, ?effects_digest, "commit_certificate finished");

res
.await
.tap_ok(|_| {
debug!(?digest, ?effects_digest, ?self.name, "commit_certificate finished");
})

// implicitly we drop the ticket here and that notifies the batch manager
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -739,7 +739,7 @@ where
let next_checkpoint_sequence_number = checkpoint_db.lock().next_checkpoint();
let mut fragments_num = 0;

for authority in committee.shuffle_by_stake() {
for authority in committee.shuffle_by_stake(None, None).iter() {
// We have ran out of authorities?
if available_authorities.is_empty() {
// We have created as many fragments as possible, so exit.
Expand Down
10 changes: 9 additions & 1 deletion crates/sui-core/src/authority_active/execution_driver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ use futures::{stream, StreamExt};

use super::ActiveAuthority;

use tap::TapFallible;

#[cfg(test)]
pub(crate) mod tests;

Expand Down Expand Up @@ -87,7 +89,13 @@ where
// zip results back together with seq
.zip(stream::iter(pending_transactions.iter()))
// filter out errors
.filter_map(|(result, (seq, _))| async move { result.ok().map(|_| seq) })
.filter_map(|(result, (seq, digest))| async move {
result
.tap_err(|e| info!(?seq, ?digest, "certificate execution failed: {}", e))
.tap_ok(|_| debug!(?seq, ?digest, "certificate execution complete"))
.ok()
.map(|_| seq)
})
.collect()
.await;

Expand Down
94 changes: 93 additions & 1 deletion crates/sui-core/src/authority_active/execution_driver/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,19 @@ use crate::{authority_active::ActiveAuthority, checkpoints::checkpoint_tests::Te
use crate::authority_active::checkpoint_driver::CheckpointMetrics;
use std::sync::Arc;
use std::time::Duration;
use sui_types::messages::ExecutionStatus;

use sui_adapter::genesis;
use sui_types::{crypto::get_key_pair, messages::ExecutionStatus, object::Object};

//use super::super::AuthorityState;
use crate::authority_aggregator::authority_aggregator_tests::{
crate_object_move_transaction, do_cert, do_transaction, extract_cert, get_latest_ref,
init_local_authorities, transfer_object_move_transaction,
};
use crate::checkpoints::checkpoint_tests::checkpoint_tests_setup;
use crate::test_utils::wait_for_tx;

use tracing::info;

#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn pending_exec_storage_notify() {
Expand Down Expand Up @@ -181,3 +191,85 @@ async fn pending_exec_full() {
.expect("DB should be there");
assert_eq!(0, certs_back.len());
}

#[tokio::test]
async fn test_parent_cert_exec() {
telemetry_subscribers::init_for_testing();

let (addr1, key1) = get_key_pair();
let gas_object1 = Object::with_owner_for_testing(addr1);
let gas_object2 = Object::with_owner_for_testing(addr1);
let (aggregator, authorities) =
init_local_authorities(4, vec![gas_object1.clone(), gas_object2.clone()]).await;
let authority_clients: Vec<_> = authorities
.iter()
.map(|a| &aggregator.authority_clients[&a.name])
.collect();

let framework_obj_ref = genesis::get_framework_object_ref();

// Make a schedule of transactions
let gas_ref_1 = get_latest_ref(authority_clients[0], gas_object1.id()).await;
let tx1 = crate_object_move_transaction(addr1, &key1, addr1, 100, framework_obj_ref, gas_ref_1);

// create an object and execute the cert on 3 authorities
do_transaction(authority_clients[0], &tx1).await;
do_transaction(authority_clients[1], &tx1).await;
do_transaction(authority_clients[2], &tx1).await;
let cert1 = extract_cert(&authority_clients, &aggregator.committee, tx1.digest()).await;

do_cert(authority_clients[0], &cert1).await;
do_cert(authority_clients[1], &cert1).await;
let effects1 = do_cert(authority_clients[2], &cert1).await;
info!(digest = ?tx1.digest(), "cert1 finished");

// now create a tx to transfer that object (only on 3 authorities), and then execute it on one
// authority only.
let (addr2, _) = get_key_pair();

let tx2 = transfer_object_move_transaction(
addr1,
&key1,
addr2,
effects1.created[0].0,
framework_obj_ref,
effects1.gas_object.0,
);

do_transaction(authority_clients[0], &tx2).await;
do_transaction(authority_clients[1], &tx2).await;
do_transaction(authority_clients[2], &tx2).await;
let cert2 = extract_cert(&authority_clients, &aggregator.committee, tx2.digest()).await;
do_cert(authority_clients[0], &cert2).await;
info!(digest = ?tx2.digest(), "cert2 finished");

// the 4th authority has never heard of either of these transactions. Tell it to execute the
// cert and verify that it is able to fetch parents and apply.
let active_state = Arc::new(
ActiveAuthority::new_with_ephemeral_storage(authorities[3].clone(), aggregator.clone())
.unwrap(),
);

let batch_state = authorities[3].clone();
tokio::task::spawn(async move {
batch_state
.run_batch_service(1, Duration::from_secs(1))
.await
});
active_state.clone().spawn_execute_process().await;

authorities[3]
.database
.add_pending_certificates(vec![(*tx2.digest(), None)])
.unwrap();

wait_for_tx(*tx2.digest(), authorities[3].clone()).await;

// verify it has the cert.
authority_clients[3]
.handle_transaction_info_request((*tx2.digest()).into())
.await
.unwrap()
.signed_effects
.unwrap();
}
Loading

0 comments on commit 742da76

Please sign in to comment.