Skip to content

Commit

Permalink
Refactor validation.rs (MystenLabs#9894)
Browse files Browse the repository at this point in the history
## Description 

Refactor validation.rs with a generic `cross_validate_entities`. This is
so it's easier to return items from `check_transactions` and
`check_objects`, in preparation for writing data to file. I think it can
be refactored once again into a generic `check_entities` but that can be
done later.

Also improved logging. As seen below, we now log:
```
Entity: **Objects mismatch at index 0**: expected: SuiObjectResponse ... **received Objects[1]: SuiObjectResponse**
```

```
2023-03-25T22:30:34.572924Z ERROR sui_rpc_loadgen::payload::validation: Entity: **Objects mismatch at index 0**: expected: SuiObjectResponse { data: Some(SuiObjectData { object_id: 0x0000000000000000000000000000000000000000000000000000000000000006, version: SequenceNumber(40544), digest: o#56gzxTeoivGwBNtXX4fE7JHg8SbmXCr2VLaLWTenxSs6, type_: Some(Struct(Other(StructTag { address: 0000000000000000000000000000000000000000000000000000000000000002, module: Identifier("clock"), name: Identifier("Clock"), type_params: [] }))), owner: Some(Shared { initial_shared_version: SequenceNumber(1) }), previous_transaction: Some(TransactionDigest(FgCTgs8i3mtSVWDXe45zGQimPZP7cSzEABsCXoGKRMsA)), storage_rebate: Some(0), display: None, content: Some(MoveObject(SuiParsedMoveObject { type_: StructTag { address: 0000000000000000000000000000000000000000000000000000000000000002, module: Identifier("clock"), name: Identifier("Clock"), type_params: [] }, has_public_transfer: false, fields: WithFields({"id": UID { id: 0x0000000000000000000000000000000000000000000000000000000000000006 }, "timestamp_ms": String("1679783431997")}) })), bcs: None }), error: None }, **received Objects[1]: SuiObjectResponse** { data: Some(SuiObjectData { object_id: 0x0000000000000000000000000000000000000000000000000000000000000006, version: SequenceNumber(40545), digest: o#2BFrLYq8RAehHeCz2eJZi4NmpK5TLgAt62UKZbjf8Kaa, type_: Some(Struct(Other(StructTag { address: 0000000000000000000000000000000000000000000000000000000000000002, module: Identifier("clock"), name: Identifier("Clock"), type_params: [] }))), owner: Some(Shared { initial_shared_version: SequenceNumber(1) }), previous_transaction: Some(TransactionDigest(HJK4BrnmcXnaS5g8ae4eUgqsYgYoHXrG72MQboZYf9cK)), storage_rebate: Some(0), display: None, content: Some(MoveObject(SuiParsedMoveObject { type_: StructTag { address: 0000000000000000000000000000000000000000000000000000000000000002, module: Identifier("clock"), name: Identifier("Clock"), type_params: [] }, has_public_transfer: false, fields: WithFields({"id": UID { id: 0x0000000000000000000000000000000000000000000000000000000000000006 }, "timestamp_ms": String("1679783432511")}) })), bcs: None }), error: None }
```
And similarly, 
`2023-03-25T23:01:07.798393Z ERROR sui_rpc_loadgen::payload::validation:
Entity: Transactions lengths do not match at index 1: first has length
478 vs 1 has length 477`
## Test Plan 

How did you test the new or updated feature?

---
If your changes are not user-facing and not a breaking change, you can
skip the following section. Otherwise, please indicate what changed, and
then add to the Release Notes section as highlighted during the release
process.

### Type of Change (Check all that apply)

- [ ] user-visible impact
- [ ] breaking change for a client SDKs
- [ ] breaking change for FNs (FN binary must upgrade)
- [ ] breaking change for validators or node operators (must upgrade
binaries)
- [ ] breaking change for on-chain data layout
- [ ] necessitate either a data wipe or data migration

### Release notes
  • Loading branch information
wlmyng authored Mar 26, 2023
1 parent b42eb10 commit bf933bf
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 134 deletions.
2 changes: 1 addition & 1 deletion crates/sui-rpc-loadgen/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ Run the following command to see available commands:
cargo run --bin sui-rpc-loadgen -- -h
```

To try this locally, refer to [sef](../sui-test-validator/README.md). Recommend setting `database-url` to an env variable.
To try this locally, refer to [sef](../sui-test-validator/README.md). Recommend setting `database-url` to an env variable. Note: run `RUST_LOG="consensus=off" cargo run sui-test-validator -- --with-indexer` to rebuild.

### Example 1: Get All Checkpoints

Expand Down
15 changes: 3 additions & 12 deletions crates/sui-rpc-loadgen/src/payload/query_transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,19 +78,10 @@ impl<'a> ProcessPayload<'a, &'a QueryTransactions> for RpcCommandProcessor {
.await;

// compare results
let digests = results[0]
.data
.iter()
.map(|transaction| transaction.digest)
.collect::<Vec<TransactionDigest>>();
let transactions: Vec<Vec<SuiTransactionResponse>> =
results.iter().map(|page| page.data.clone()).collect();

cross_validate_entities(
&digests,
&results[0].data,
&results[1].data,
"TransactionDigest",
"Transaction",
);
cross_validate_entities(&transactions, "Transactions");
}

Ok(())
Expand Down
238 changes: 117 additions & 121 deletions crates/sui-rpc-loadgen/src/payload/validation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,40 +6,52 @@ use std::collections::HashSet;
use std::time::Instant;
use sui_json_rpc::api::QUERY_MAX_RESULT_LIMIT;
use sui_json_rpc_types::{
SuiObjectDataOptions, SuiTransactionEffectsAPI, SuiTransactionResponse,
SuiObjectDataOptions, SuiObjectResponse, SuiTransactionEffectsAPI, SuiTransactionResponse,
SuiTransactionResponseOptions,
};
use sui_sdk::SuiClient;
use sui_types::base_types::{ObjectID, TransactionDigest};
use tracing::log::warn;
use tracing::{debug, error};

pub(crate) fn cross_validate_entities<T, U>(
keys: &[T],
first: &[U],
second: &[U],
key_name: &str,
entity_name: &str,
) where
T: std::fmt::Debug,
pub(crate) fn cross_validate_entities<U>(entities: &Vec<Vec<U>>, entity_name: &str)
where
U: PartialEq + std::fmt::Debug,
{
if first.len() != second.len() {
if entities.len() < 2 {
error!("Unable to cross validate as {} less than 2", entity_name);
return;
}

let length = entities[0].len();
if let Some((vec_index, v)) = entities.iter().enumerate().find(|(_, v)| v.len() != length) {
error!(
"Entity: {} lengths do not match: {} vs {}",
entity_name,
first.len(),
second.len()
"Entity: {} lengths do not match at index {}: first vec has length {} vs vec {} has length {}",
entity_name, vec_index, length, vec_index, v.len()
);
return;
}

for (i, (a, b)) in first.iter().zip(second.iter()).enumerate() {
if a != b {
error!(
"Entity: {} mismatch with index {}: {}: {:?}, first: {:?}, second: {:?}",
entity_name, i, key_name, keys[i], a, b
);
// Iterate through all indices (from 0 to length - 1) of the inner vectors.
for i in 0..length {
// Create an iterator that produces references to elements at position i in each inner vector of entities.
let mut iter = entities.iter().map(|v| &v[i]);

// Compare first against rest of the iter (other inner vectors)
if let Some(first) = iter.next() {
for (j, other) in iter.enumerate() {
if first != other {
// Example error: Entity: ExampleEntity mismatch at index 2: expected: 3, received ExampleEntity[1]: 4
error!(
"Entity: {} mismatch at index {}: expected: {:?}, received {}: {:?}",
entity_name,
i,
first,
format!("{}[{}]", entity_name, j + 1),
other
);
}
}
}
}
}
Expand All @@ -50,58 +62,52 @@ pub(crate) async fn check_transactions(
cross_validate: bool,
verify_objects: bool,
) {
let transactions = join_all(clients.iter().enumerate().map(|(i, client)| async move {
let start_time = Instant::now();
let transactions = client
.read_api()
.multi_get_transactions_with_options(
digests.to_vec(),
SuiTransactionResponseOptions::full_content(), // todo(Will) support options for this
)
.await;
let elapsed_time = start_time.elapsed();
debug!(
"MultiGetTransactions Request latency {:.4} for rpc at url {i}",
elapsed_time.as_secs_f64()
);
transactions
}))
.await;

// TODO: support more than 2 transactions
if cross_validate && transactions.len() == 2 {
if let (Some(t1), Some(t2)) = (transactions.get(0), transactions.get(1)) {
let first = match t1 {
Ok(vec) => vec.as_slice(),
Err(err) => {
error!("Error unwrapping first vec of transactions: {:?}", err);
error!("Logging digests, {:?}", digests);
return;
}
};
let second = match t2 {
Ok(vec) => vec.as_slice(),
Err(err) => {
error!("Error unwrapping second vec of transactions: {:?}", err);
error!("Logging digests, {:?}", digests);
return;
}
};
let transactions: Vec<Vec<SuiTransactionResponse>> =
join_all(clients.iter().enumerate().map(|(i, client)| async move {
let start_time = Instant::now();
let transactions = client
.read_api()
.multi_get_transactions_with_options(
digests.to_vec(),
SuiTransactionResponseOptions::full_content(), // todo(Will) support options for this
)
.await;
let elapsed_time = start_time.elapsed();
debug!(
"MultiGetTransactions Request latency {:.4} for rpc at url {i}",
elapsed_time.as_secs_f64()
);
transactions
}))
.await
.into_iter()
.enumerate()
.filter_map(|(i, result)| match result {
Ok(transactions) => Some(transactions),
Err(err) => {
warn!(
"Failed to fetch transactions for vec {i}: {:?}. Logging digests, {:?}",
err, digests
);
None
}
})
.collect();

cross_validate_entities(digests, first, second, "TransactionDigest", "Transaction");
if cross_validate {
cross_validate_entities(&transactions, "Transactions");
}

if verify_objects {
let object_ids = first
.iter()
.chain(second.iter())
.flat_map(get_all_object_ids)
.collect::<HashSet<_>>()
.into_iter()
.collect::<Vec<_>>();
if verify_objects {
let object_ids = transactions
.iter()
.flatten()
.flat_map(get_all_object_ids)
.collect::<HashSet<_>>()
.into_iter()
.collect::<Vec<_>>();

check_objects(clients, &object_ids, cross_validate).await;
}
}
check_objects(clients, &object_ids, cross_validate).await;
}
}

Expand Down Expand Up @@ -129,60 +135,50 @@ pub(crate) async fn check_objects(
object_ids: &[ObjectID],
cross_validate: bool,
) {
let objects = join_all(clients.iter().enumerate().map(|(i, client)| async move {
// TODO: support chunking so that we don't exceed query limit
let object_ids = if object_ids.len() > QUERY_MAX_RESULT_LIMIT {
warn!(
"The input size for multi_get_object_with_options has exceed the query limit\
let objects: Vec<Vec<SuiObjectResponse>> =
join_all(clients.iter().enumerate().map(|(i, client)| async move {
// TODO: support chunking so that we don't exceed query limit
let object_ids = if object_ids.len() > QUERY_MAX_RESULT_LIMIT {
warn!(
"The input size for multi_get_object_with_options has exceed the query limit\
{QUERY_MAX_RESULT_LIMIT}: {}, time to implement chunking",
object_ids.len()
);
&object_ids[0..QUERY_MAX_RESULT_LIMIT]
} else {
object_ids
};
let start_time = Instant::now();
let transactions = client
.read_api()
.multi_get_object_with_options(
object_ids.to_vec(),
SuiObjectDataOptions::full_content(), // todo(Will) support options for this
)
.await;
let elapsed_time = start_time.elapsed();
debug!(
"MultiGetObject Request latency {:.4} for rpc at url {i}",
elapsed_time.as_secs_f64()
);
transactions
}))
.await;

// TODO: support more than 2 transactions
if cross_validate && objects.len() == 2 {
if let (Some(t1), Some(t2)) = (objects.get(0), objects.get(1)) {
let first = match t1 {
Ok(vec) => vec.as_slice(),
Err(err) => {
error!(
"Error unwrapping first vec of objects: {:?} for objectIDs {:?}",
err, object_ids
);
return;
}
};
let second = match t2 {
Ok(vec) => vec.as_slice(),
Err(err) => {
error!(
"Error unwrapping second vec of objects: {:?} for objectIDs {:?}",
err, object_ids
);
return;
}
object_ids.len()
);
&object_ids[0..QUERY_MAX_RESULT_LIMIT]
} else {
object_ids
};
let start_time = Instant::now();
let objects = client
.read_api()
.multi_get_object_with_options(
object_ids.to_vec(),
SuiObjectDataOptions::full_content(), // todo(Will) support options for this
)
.await;
let elapsed_time = start_time.elapsed();
debug!(
"MultiGetObject Request latency {:.4} for rpc at url {i}",
elapsed_time.as_secs_f64()
);
objects
}))
.await
.into_iter()
.enumerate()
.filter_map(|(i, result)| match result {
Ok(obj_vec) => Some(obj_vec),
Err(err) => {
error!(
"Failed to fetch objects for vec {i}: {:?}. Logging objectIDs, {:?}",
err, object_ids
);
None
}
})
.collect();

cross_validate_entities(object_ids, first, second, "ObjectID", "Object");
}
if cross_validate {
cross_validate_entities(&objects, "Objects");
}
}

0 comments on commit bf933bf

Please sign in to comment.