Skip to content

Commit

Permalink
[follower] Remove transaction from batch, and verify the digests rece…
Browse files Browse the repository at this point in the history
…ived. (MystenLabs#1431)

* Remove trasnaction from batch, and verify the digests received.

Co-authored-by: George Danezis <[email protected]>
  • Loading branch information
2 people authored and lanvidr committed Apr 18, 2022
1 parent bcb18e7 commit 84d4646
Show file tree
Hide file tree
Showing 5 changed files with 163 additions and 148 deletions.
7 changes: 5 additions & 2 deletions sui_core/src/authority_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ impl crate::authority::AuthorityState {
if !transactions.is_empty() {
// Make a new batch, to put the old transactions not in a batch in.
let last_signed_batch = SignedBatch::new(
AuthorityBatch::make_next(&last_batch, &transactions[..]),
// Unwrap safe due to check not empty
AuthorityBatch::make_next(&last_batch, &transactions[..])?,
&*self.secret,
self.name,
);
Expand Down Expand Up @@ -155,13 +156,15 @@ impl crate::authority::AuthorityState {

// Logic to make a batch
if make_batch {
// Test it is not empty.
if current_batch.is_empty() {
continue;
}

// Make and store a new batch.
let new_batch = SignedBatch::new(
AuthorityBatch::make_next(&prev_batch, &current_batch),
// Unwrap safe since we tested above it is not empty
AuthorityBatch::make_next(&prev_batch, &current_batch).unwrap(),
&*self.secret,
self.name,
);
Expand Down
111 changes: 67 additions & 44 deletions sui_core/src/safe_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::io;
use sui_types::crypto::PublicKeyBytes;
use sui_types::{base_types::*, committee::*, fp_ensure};

use sui_types::batch::{AuthorityBatch, SignedBatch, UpdateItem};
use sui_types::batch::{AuthorityBatch, SignedBatch, TxSequenceNumber, UpdateItem};
use sui_types::{
error::{SuiError, SuiResult},
messages::*,
Expand Down Expand Up @@ -174,6 +174,10 @@ impl<C> SafeClient<C> {
&self,
request: BatchInfoRequest,
signed_batch: &SignedBatch,
transactions_and_last_batch: &Option<(
Vec<(TxSequenceNumber, TransactionDigest)>,
AuthorityBatch,
)>,
) -> SuiResult {
// check the signature of the batch
signed_batch
Expand All @@ -190,21 +194,20 @@ impl<C> SafeClient<C> {
}
);

// reconstruct the batch and make sure the constructed digest matches the provided one
let provided_digest = signed_batch.batch.transactions_digest;
// If we have seen a previous batch, use it to make sure the next batch
// is constructed correctly:

let reconstructed_batch = AuthorityBatch::make_next_with_previous_digest(
Some(provided_digest),
&signed_batch.batch.transaction_batch.0,
);
let computed_digest = reconstructed_batch.transactions_digest;
if let Some((transactions, prev_batch)) = transactions_and_last_batch {
let reconstructed_batch = AuthorityBatch::make_next(prev_batch, transactions)?;

fp_ensure!(
reconstructed_batch == signed_batch.batch,
SuiError::ByzantineAuthoritySuspicion {
authority: self.address
}
);
}

fp_ensure!(
provided_digest == computed_digest,
SuiError::ByzantineAuthoritySuspicion {
authority: self.address
}
);
Ok(())
}

Expand Down Expand Up @@ -310,39 +313,59 @@ where
.handle_batch_stream(request.clone())
.await?;

let seq_requested = request.end - request.start;
let mut seq_to_be_returned = seq_requested as usize;
// check for overflow
if seq_requested > usize::MAX as u64 {
seq_to_be_returned = usize::MAX;
}

let client = self.clone();
let stream = Box::pin(
batch_info_items
.then(move |batch_info_item| {
let req_clone = request.clone();
let client = client.clone();
async move {
match &batch_info_item {
Ok(BatchInfoResponseItem(UpdateItem::Batch(signed_batch))) => {
if let Err(err) =
client.check_update_item_batch_response(req_clone, signed_batch)
{
client.report_client_error(err.clone());
return Err(err);
}
batch_info_item
}
Ok(BatchInfoResponseItem(UpdateItem::Transaction((_seq, _digest)))) => {
batch_info_item
}
Err(e) => Err(e.clone()),
let address = self.address;
let stream = Box::pin(batch_info_items.scan(
(0u64, None),
move |(seq, txs_and_last_batch), batch_info_item| {
let req_clone = request.clone();
let client = client.clone();

// We check if we have exceeded the batch boundary for this request.
if !(*seq < request.end) {
// If we exceed it return None to end stream
return futures::future::ready(None);
}

let x = match &batch_info_item {
Ok(BatchInfoResponseItem(UpdateItem::Batch(signed_batch))) => {
if let Err(err) = client.check_update_item_batch_response(
req_clone,
&signed_batch,
&txs_and_last_batch,
) {
client.report_client_error(err.clone());
Some(Err(err))
} else {
// Save the seqeunce number of this batch
*seq = signed_batch.batch.next_sequence_number;
// Insert a fresh vector for the new batch of transactions
let _ =
txs_and_last_batch.insert((Vec::new(), signed_batch.batch.clone()));
Some(batch_info_item)
}
}
})
.take(seq_to_be_returned),
);
Ok(BatchInfoResponseItem(UpdateItem::Transaction((seq, digest)))) => {
// A stream always starts with a batch, so the previous should have initialized it.
// And here we insert the tuple into the batch.
if txs_and_last_batch
.as_mut()
.map(|txs| txs.0.push((*seq, *digest)))
.is_none()
{
let err = SuiError::ByzantineAuthoritySuspicion { authority: address };
client.report_client_error(err.clone());
Some(Err(err))
} else {
Some(batch_info_item)
}
}
Err(e) => Some(Err(e.clone())),
};

futures::future::ready(x)
},
));
Ok(Box::pin(stream))
}
}
158 changes: 78 additions & 80 deletions sui_core/src/unit_tests/batch_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,6 @@ async fn test_batch_store_retrieval() {
.batches_and_transactions(94, 120)
.expect("Retrieval failed!");

println!("{:?}", batches);
assert_eq!(3, batches.len());
assert_eq!(90, batches.first().unwrap().batch.next_sequence_number);
assert_eq!(115, batches.last().unwrap().batch.next_sequence_number);
Expand Down Expand Up @@ -471,30 +470,39 @@ impl AuthorityAPI for TrustworthyAuthorityClient {
let name = self.0.lock().await.name;
let batch_size = 3;

let stream = stream::unfold(
(request.start, AuthorityBatch::initial()),
move |(seq, last_batch)| {
let auth_secret = secret.clone();
async move {
if seq <= request.end {
let mut transactions = Vec::new();
for i in 0..batch_size {
transactions.push((seq + i, TransactionDigest::random()));
}
let next = AuthorityBatch::make_next_with_previous_digest(
Some(last_batch.digest()),
&transactions,
);

let item = SignedBatch::new(next.clone(), &*auth_secret, name);
let response = BatchInfoResponseItem(UpdateItem::Batch(item));
Some((Ok(response), (seq + batch_size, next)))
} else {
None
}
}
},
);
let mut items = Vec::new();
let mut last_batch = AuthorityBatch::initial();
items.push({
let item = SignedBatch::new(last_batch.clone(), &*secret, name);
BatchInfoResponseItem(UpdateItem::Batch(item))
});
let mut seq = 0;
while last_batch.next_sequence_number < request.end {
let mut transactions = Vec::new();
for _i in 0..batch_size {
let rnd = TransactionDigest::random();
transactions.push((seq, rnd));
items.push(BatchInfoResponseItem(UpdateItem::Transaction((seq, rnd))));
seq += 1;
}

let new_batch = AuthorityBatch::make_next(&last_batch, &transactions).unwrap();
last_batch = new_batch;
items.push({
let item = SignedBatch::new(last_batch.clone(), &*secret, name);
BatchInfoResponseItem(UpdateItem::Batch(item))
});
}

items.reverse();

let stream = stream::unfold(items, |mut items| async move {
if let Some(item) = items.pop() {
Some((Ok(item), items))
} else {
None
}
});
Ok(Box::pin(stream))
}
}
Expand Down Expand Up @@ -575,57 +583,45 @@ impl AuthorityAPI for ByzantineAuthorityClient {
let name = self.0.lock().await.name;
let batch_size = 3;

let stream = stream::unfold(
(request.start, AuthorityBatch::initial()),
move |(seq, last_batch)| {
let auth_secret = secret.clone();
async move {
if request.end % 2 == 0 {
if seq <= request.end {
let mut transactions = Vec::new();
for i in 0..batch_size {
transactions.push((seq + i, TransactionDigest::random()));
}
let next = AuthorityBatch::make_next_with_previous_digest(
Some(last_batch.digest()),
&transactions,
);

let mut item = SignedBatch::new(next.clone(), &*auth_secret, name);
// Remove a transaction after creating the batch
item.batch.transaction_batch.0.pop();
// And then add in a fake transaction
item.batch
.transaction_batch
.0
.push((seq + batch_size, TransactionDigest::random()));
let response = BatchInfoResponseItem(UpdateItem::Batch(item));
Some((Ok(response), (seq + batch_size, next)))
} else {
None
}
} else {
// Byzantine authority sends you too much, not what you asked for.
if seq <= request.end * 100 {
let mut transactions = Vec::new();
for i in 0..batch_size {
transactions.push((seq + i, TransactionDigest::random()));
}
let next = AuthorityBatch::make_next_with_previous_digest(
Some(last_batch.digest()),
&transactions,
);

let item = SignedBatch::new(next.clone(), &*auth_secret, name);
let response = BatchInfoResponseItem(UpdateItem::Batch(item));
Some((Ok(response), (seq + batch_size, next)))
} else {
None
}
}
}
},
);
let mut items = Vec::new();
let mut last_batch = AuthorityBatch::initial();
items.push({
let item = SignedBatch::new(last_batch.clone(), &*secret, name);
BatchInfoResponseItem(UpdateItem::Batch(item))
});
let mut seq = 0;
while last_batch.next_sequence_number < request.end {
let mut transactions = Vec::new();
for _i in 0..batch_size {
let rnd = TransactionDigest::random();
transactions.push((seq, rnd));
items.push(BatchInfoResponseItem(UpdateItem::Transaction((seq, rnd))));
seq += 1;
}

// Introduce byzantine behaviour:
// Pop last transaction
let (seq, _) = transactions.pop().unwrap();
// Insert a different one
transactions.push((seq, TransactionDigest::random()));

let new_batch = AuthorityBatch::make_next(&last_batch, &transactions).unwrap();
last_batch = new_batch;
items.push({
let item = SignedBatch::new(last_batch.clone(), &*secret, name);
BatchInfoResponseItem(UpdateItem::Batch(item))
});
}

items.reverse();

let stream = stream::unfold(items, |mut items| async move {
if let Some(item) = items.pop() {
Some((Ok(item), items))
} else {
None
}
});
Ok(Box::pin(stream))
}
}
Expand Down Expand Up @@ -678,8 +674,9 @@ async fn test_safe_batch_stream() {
.collect::<Vec<Result<BatchInfoResponseItem, SuiError>>>()
.await;

// Length should be within sequenced range
assert!(items.len() <= (request.end - request.start) as usize && !items.is_empty());
// Check length
assert!(!items.is_empty());
assert_eq!(items.len(), 15 + 6); // 15 items, and 6 batches (enclosing them)

let mut error_found = false;
for item in items {
Expand Down Expand Up @@ -722,8 +719,9 @@ async fn test_safe_batch_stream() {
.collect::<Vec<Result<BatchInfoResponseItem, SuiError>>>()
.await;

// Length should be within sequenced range, despite authority that never stop sending
assert!(items.len() <= (request.end - request.start) as usize && !items.is_empty());
// Check length
assert!(!items.is_empty());
assert_eq!(items.len(), 15 + 6); // 15 items, and 6 batches (enclosing them)

let request_b = BatchInfoRequest { start: 0, end: 10 };
batch_stream = safe_client_from_byzantine
Expand Down
8 changes: 0 additions & 8 deletions sui_core/tests/staged/sui.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ AuthorityBatch:
TUPLEARRAY:
CONTENT: U8
SIZE: 32
- transaction_batch:
TYPENAME: TransactionBatch
AuthoritySignInfo:
STRUCT:
- authority:
Expand Down Expand Up @@ -762,12 +760,6 @@ SuiError:
- error: STR
93:
OnlyOneConsensusClientPermitted: UNIT
TransactionBatch:
NEWTYPESTRUCT:
SEQ:
TUPLE:
- U64
- TYPENAME: TransactionDigest
TransactionData:
STRUCT:
- kind:
Expand Down
Loading

0 comments on commit 84d4646

Please sign in to comment.