Skip to content

Commit

Permalink
[state-sync-v2][executor] add unitests and some refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
lightmark authored and bors-libra committed Nov 3, 2021
1 parent d1cac46 commit 7d281ce
Show file tree
Hide file tree
Showing 5 changed files with 208 additions and 71 deletions.
32 changes: 31 additions & 1 deletion execution/executor-types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ use diem_types::{
proof::{accumulator::InMemoryAccumulator, AccumulatorExtensionProof},
transaction::{
default_protocol::{TransactionListWithProof, TransactionOutputListWithProof},
Transaction, TransactionInfo, TransactionStatus, TransactionToCommit, Version,
Transaction, TransactionInfo, TransactionOutput, TransactionStatus, TransactionToCommit,
Version,
},
write_set::WriteSet,
};
Expand Down Expand Up @@ -77,6 +78,18 @@ pub trait ChunkExecutor: Send + Sync {
events: Vec<ContractEvent>,
) -> anyhow::Result<Vec<ContractEvent>>;

fn execute_or_apply_chunk(
&self,
first_version: u64,
transactions: Vec<Transaction>,
transaction_outputs: Option<Vec<TransactionOutput>>,
transaction_infos: Vec<TransactionInfo>,
) -> Result<(
ProcessedVMOutput,
Vec<TransactionToCommit>,
Vec<ContractEvent>,
)>;

fn execute_and_commit_chunk(
&self,
txn_list_with_proof: TransactionListWithProof,
Expand All @@ -93,6 +106,23 @@ pub trait ChunkExecutor: Send + Sync {
events,
)
}

fn apply_and_commit_chunk(
&self,
txn_output_list_with_proof: TransactionOutputListWithProof,
verified_target_li: LedgerInfoWithSignatures,
epoch_change_li: Option<LedgerInfoWithSignatures>,
) -> Result<Vec<ContractEvent>> {
let (output, txns_to_commit, events) =
self.apply_chunk(txn_output_list_with_proof, verified_target_li.clone())?;
self.commit_chunk(
verified_target_li,
epoch_change_li,
output,
txns_to_commit,
events,
)
}
}

pub trait BlockExecutor: Send + Sync {
Expand Down
85 changes: 68 additions & 17 deletions execution/executor/src/chunk_executor_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,26 @@
#![forbid(unsafe_code)]

use crate::logging::{LogEntry, LogSchema};
use anyhow::{ensure, format_err, Result};
use diem_logger::prelude::*;
use diem_types::{
contract_event::ContractEvent,
ledger_info::LedgerInfoWithSignatures,
protocol_spec::DpnProto,
transaction::default_protocol::{TransactionListWithProof, TransactionOutputListWithProof},
transaction::{
default_protocol::{TransactionListWithProof, TransactionOutputListWithProof},
Transaction, TransactionInfo, TransactionOutput, Version,
},
};
use diem_vm::VMExecutor;
use executor_types::{ChunkExecutor, ProcessedVMOutput};
use fail::fail_point;

use crate::{
metrics::{DIEM_EXECUTOR_APPLY_CHUNK_SECONDS, DIEM_EXECUTOR_EXECUTE_AND_COMMIT_CHUNK_SECONDS},
metrics::{
DIEM_EXECUTOR_APPLY_CHUNK_SECONDS, DIEM_EXECUTOR_COMMIT_CHUNK_SECONDS,
DIEM_EXECUTOR_EXECUTE_CHUNK_SECONDS,
},
Executor,
};
use diem_types::transaction::TransactionToCommit;
Expand All @@ -27,19 +34,18 @@ impl<V: VMExecutor> ChunkExecutor for Executor<DpnProto, V> {
txn_list_with_proof: TransactionListWithProof,
// Target LI that has been verified independently: the proofs are relative to this version.
verified_target_li: LedgerInfoWithSignatures,
) -> anyhow::Result<(
) -> Result<(
ProcessedVMOutput,
Vec<TransactionToCommit>,
Vec<ContractEvent>,
)> {
let _timer = DIEM_EXECUTOR_EXECUTE_AND_COMMIT_CHUNK_SECONDS.start_timer();
// 1. Update the cache in executor to be consistent with latest synced state.
self.reset_cache()?;
let read_lock = self.cache.read();
let _timer = DIEM_EXECUTOR_EXECUTE_CHUNK_SECONDS.start_timer();

let num_txn = txn_list_with_proof.transactions.len();
let first_version_in_request = txn_list_with_proof.first_transaction_version;

// 1. Update the cache in executor to be consistent with latest synced state.
self.reset_cache()?;
// 2. Verify input transaction list.
txn_list_with_proof.verify(
verified_target_li.ledger_info(),
Expand All @@ -54,9 +60,13 @@ impl<V: VMExecutor> ChunkExecutor for Executor<DpnProto, V> {
)?;

// 3. Execute transactions.
let first_version = read_lock.synced_trees().txn_accumulator().num_leaves();
drop(read_lock);
let (output, txns_to_commit, events) = self.execute_or_apply_chunk(
let first_version = self
.cache
.read()
.synced_trees()
.txn_accumulator()
.num_leaves();
let res = self.execute_or_apply_chunk(
first_version,
transactions,
txn_outputs,
Expand All @@ -70,7 +80,8 @@ impl<V: VMExecutor> ChunkExecutor for Executor<DpnProto, V> {
.num_txns_in_request(num_txn),
"sync_request_executed",
);
Ok((output, txns_to_commit, events))

Ok(res)
}

fn apply_chunk(
Expand All @@ -86,7 +97,6 @@ impl<V: VMExecutor> ChunkExecutor for Executor<DpnProto, V> {
let _timer = DIEM_EXECUTOR_APPLY_CHUNK_SECONDS.start_timer();
// 1. Update the cache in executor to be consistent with latest synced state.
self.reset_cache()?;
let read_lock = self.cache.read();

let num_txn = txn_output_list_with_proof.transactions_and_outputs.len();
let first_version_in_request = txn_output_list_with_proof.first_transaction_output_version;
Expand All @@ -111,9 +121,14 @@ impl<V: VMExecutor> ChunkExecutor for Executor<DpnProto, V> {
)?;

// 3. Execute transactions.
let first_version = read_lock.synced_trees().txn_accumulator().num_leaves();
drop(read_lock);
let (output, txns_to_commit, events) = self.execute_or_apply_chunk(
let first_version = self
.cache
.read()
.synced_trees()
.txn_accumulator()
.num_leaves();

let res = self.execute_or_apply_chunk(
first_version,
transactions,
txn_outputs,
Expand All @@ -127,7 +142,8 @@ impl<V: VMExecutor> ChunkExecutor for Executor<DpnProto, V> {
.num_txns_in_request(num_txn),
"sync_request_executed",
);
Ok((output, txns_to_commit, events))

Ok(res)
}

fn commit_chunk(
Expand All @@ -137,7 +153,9 @@ impl<V: VMExecutor> ChunkExecutor for Executor<DpnProto, V> {
output: ProcessedVMOutput,
txns_to_commit: Vec<TransactionToCommit>,
events: Vec<ContractEvent>,
) -> anyhow::Result<Vec<ContractEvent>> {
) -> Result<Vec<ContractEvent>> {
let _timer = DIEM_EXECUTOR_COMMIT_CHUNK_SECONDS.start_timer();

// 4. Commit to DB.
let first_version = self
.cache
Expand Down Expand Up @@ -183,4 +201,37 @@ impl<V: VMExecutor> ChunkExecutor for Executor<DpnProto, V> {

Ok(events)
}

fn execute_or_apply_chunk(
&self,
first_version: Version,
transactions: Vec<Transaction>,
transaction_outputs: Option<Vec<TransactionOutput>>,
transaction_infos: Vec<TransactionInfo>,
) -> Result<(
ProcessedVMOutput,
Vec<TransactionToCommit>,
Vec<ContractEvent>,
)> {
let num_txns = transactions.len();
let (processed_vm_output, txns_to_commit, events, txns_to_retry, _txn_infos_to_retry) =
self.replay_transactions_impl(
first_version,
transactions,
transaction_outputs,
transaction_infos,
)?;

ensure!(
txns_to_retry.is_empty(),
"The transaction at version {} got the status of 'Retry'",
num_txns
.checked_sub(txns_to_retry.len())
.ok_or_else(|| format_err!("integer overflow occurred"))?
.checked_add(first_version as usize)
.ok_or_else(|| format_err!("integer overflow occurred"))?,
);

Ok((processed_vm_output, txns_to_commit, events))
}
}
103 changes: 90 additions & 13 deletions execution/executor/src/executor_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@ use diem_types::{
account_address::AccountAddress,
block_info::BlockInfo,
ledger_info::{LedgerInfo, LedgerInfoWithSignatures},
transaction::{default_protocol::TransactionListWithProof, Transaction, Version},
transaction::{
default_protocol::{TransactionListWithProof, TransactionOutputListWithProof},
Transaction, TransactionStatus, Version,
},
};
use diemdb::DiemDB;
use executor_types::{BlockExecutor, ChunkExecutor, TransactionReplayer};
Expand Down Expand Up @@ -301,51 +304,125 @@ fn create_transaction_chunks(
(batches, ledger_info)
}

fn execute_and_commit_chunk(
chunks: Vec<TransactionListWithProof>,
ledger_info: LedgerInfoWithSignatures,
db: &DbReaderWriter,
executor: &Executor<DpnProto, MockVM>,
) {
// Execute the first chunk. After that we should still get the genesis ledger info from DB.
executor
.execute_and_commit_chunk(chunks[0].clone(), ledger_info.clone(), None)
.unwrap();
let li = db.reader.get_latest_ledger_info().unwrap();
assert_eq!(li.ledger_info().version(), 0);
assert_eq!(li.ledger_info().consensus_block_id(), HashValue::zero());

// Execute the second chunk. After that we should still get the genesis ledger info from DB.
executor
.execute_and_commit_chunk(chunks[1].clone(), ledger_info.clone(), None)
.unwrap();
let li = db.reader.get_latest_ledger_info().unwrap();
assert_eq!(li.ledger_info().version(), 0);
assert_eq!(li.ledger_info().consensus_block_id(), HashValue::zero());

// Execute an empty chunk. After that we should still get the genesis ledger info from DB.
executor
.execute_and_commit_chunk(
TransactionListWithProof::new_empty(),
ledger_info.clone(),
None,
)
.unwrap();
let li = db.reader.get_latest_ledger_info().unwrap();
assert_eq!(li.ledger_info().version(), 0);
assert_eq!(li.ledger_info().consensus_block_id(), HashValue::zero());

// Execute the second chunk again. After that we should still get the same thing.
executor
.execute_and_commit_chunk(chunks[1].clone(), ledger_info.clone(), None)
.unwrap();
let li = db.reader.get_latest_ledger_info().unwrap();
assert_eq!(li.ledger_info().version(), 0);
assert_eq!(li.ledger_info().consensus_block_id(), HashValue::zero());

// Execute the third chunk. After that we should get the new ledger info.
executor
.execute_and_commit_chunk(chunks[2].clone(), ledger_info.clone(), None)
.unwrap();
let li = db.reader.get_latest_ledger_info().unwrap();
assert_eq!(li, ledger_info);
}

#[test]
fn test_executor_execute_and_commit_chunk() {
fn test_executor_execute_or_apply_and_commit_chunk() {
let first_batch_size = 30;
let second_batch_size = 40;
let third_batch_size = 20;
let overlapping_size = 5;

let first_batch_start = 1;
let second_batch_start = first_batch_start + first_batch_size;
let third_batch_start = second_batch_start + second_batch_size - overlapping_size;

let (chunks, ledger_info) = {
let first_batch_start = 1;
let second_batch_start = first_batch_start + first_batch_size;
let third_batch_start = second_batch_start + second_batch_size - overlapping_size;
create_transaction_chunks(vec![
first_batch_start..first_batch_start + first_batch_size,
second_batch_start..second_batch_start + second_batch_size,
third_batch_start..third_batch_start + third_batch_size,
])
};
// First test with transactions only and reset chunks to be `Vec<TransactionOutputListWithProof>`.
let chunks = {
let TestExecutor {
_path,
db,
executor,
} = TestExecutor::new();
execute_and_commit_chunk(chunks, ledger_info.clone(), &db, &executor);

let ledger_version = db.reader.get_latest_version().unwrap();
let output1 = db
.reader
.get_transaction_outputs(first_batch_start, first_batch_size, ledger_version)
.unwrap();
let output2 = db
.reader
.get_transaction_outputs(second_batch_start, second_batch_size, ledger_version)
.unwrap();
let output3 = db
.reader
.get_transaction_outputs(third_batch_start, third_batch_size, ledger_version)
.unwrap();
vec![output1, output2, output3]
};

// Now we execute these two chunks of transactions.
// Test with transaction outputs.
let TestExecutor {
_path,
db,
executor,
} = TestExecutor::new();

// Execute the first chunk. After that we should still get the genesis ledger info from DB.
executor
.execute_and_commit_chunk(chunks[0].clone(), ledger_info.clone(), None)
.apply_and_commit_chunk(chunks[0].clone(), ledger_info.clone(), None)
.unwrap();
let li = db.reader.get_latest_ledger_info().unwrap();
assert_eq!(li.ledger_info().version(), 0);
assert_eq!(li.ledger_info().consensus_block_id(), HashValue::zero());

// Execute the second chunk. After that we should still get the genesis ledger info from DB.
executor
.execute_and_commit_chunk(chunks[1].clone(), ledger_info.clone(), None)
.apply_and_commit_chunk(chunks[1].clone(), ledger_info.clone(), None)
.unwrap();
let li = db.reader.get_latest_ledger_info().unwrap();
assert_eq!(li.ledger_info().version(), 0);
assert_eq!(li.ledger_info().consensus_block_id(), HashValue::zero());

// Execute an empty chunk. After that we should still get the genesis ledger info from DB.
executor
.execute_and_commit_chunk(
TransactionListWithProof::new_empty(),
.apply_and_commit_chunk(
TransactionOutputListWithProof::new_empty(),
ledger_info.clone(),
None,
)
Expand All @@ -356,15 +433,15 @@ fn test_executor_execute_and_commit_chunk() {

// Execute the second chunk again. After that we should still get the same thing.
executor
.execute_and_commit_chunk(chunks[1].clone(), ledger_info.clone(), None)
.apply_and_commit_chunk(chunks[1].clone(), ledger_info.clone(), None)
.unwrap();
let li = db.reader.get_latest_ledger_info().unwrap();
assert_eq!(li.ledger_info().version(), 0);
assert_eq!(li.ledger_info().consensus_block_id(), HashValue::zero());

// Execute the third chunk. After that we should get the new ledger info.
executor
.execute_and_commit_chunk(chunks[2].clone(), ledger_info.clone(), None)
.apply_and_commit_chunk(chunks[2].clone(), ledger_info.clone(), None)
.unwrap();
let li = db.reader.get_latest_ledger_info().unwrap();
assert_eq!(li, ledger_info);
Expand Down
Loading

0 comments on commit 7d281ce

Please sign in to comment.