Skip to content

Commit

Permalink
configurable batch action client
Browse files Browse the repository at this point in the history
solve todo of being able to gracefully exit gossip loop

updated happy path test
  • Loading branch information
lanvidr committed May 25, 2022
1 parent 22912fa commit dacc007
Show file tree
Hide file tree
Showing 9 changed files with 505 additions and 220 deletions.
2 changes: 1 addition & 1 deletion crates/sui-core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -602,7 +602,7 @@ impl AuthorityState {
) -> Result<
(
VecDeque<UpdateItem>,
// Should subscribe, computer start, computed end
// Should subscribe, computed start, computed end
(bool, TxSequenceNumber, TxSequenceNumber),
),
SuiError,
Expand Down
26 changes: 13 additions & 13 deletions crates/sui-core/src/authority_active.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
(1) Share transactions received with other authorities, to complete their execution
in case clients fail before sharing a transaction with sufficient authorities.
(2) Share certificates with other authorities in case clients fail before a
certificate has its executon finalized.
certificate has its execution finalized.
(3) Gossip executed certificates digests with other authorities through following
each other and using push / pull to execute certificates.
(4) Perform the active operations necessary to progress the periodic checkpointing
Expand Down Expand Up @@ -36,6 +36,7 @@ use std::{
};
use sui_types::{base_types::AuthorityName, error::SuiResult};
use tokio::sync::Mutex;
use tokio::task::JoinHandle;

use crate::{
authority::AuthorityState, authority_aggregator::AuthorityAggregator,
Expand Down Expand Up @@ -84,8 +85,9 @@ impl AuthorityHealth {
self.no_contact_before = Instant::now();
}

pub fn can_contact_now(&self) -> bool {
self.no_contact_before < Instant::now()
pub fn can_initiate_contact_now(&self) -> bool {
let now = Instant::now();
self.no_contact_before <= now
}
}

Expand Down Expand Up @@ -147,34 +149,32 @@ impl<A> ActiveAuthority<A> {
entry.set_no_contact_for(Duration::from_millis(delay));
}

// Resets retries to zero and sets no contact to zero delay.
/// Resets retries to zero and sets no contact to zero delay.
pub async fn set_success_backoff(&self, name: AuthorityName) {
let mut lock = self.health.lock().await;
let mut entry = lock.entry(name).or_default();
entry.retries = 0;
entry.reset_no_contact();
}

// Checks given the current time if we should contact this authority, ie
// if we are past any `no contact` delay.
/// Checks given the current time if we should contact this authority, ie
/// if we are past any `no contact` delay.
pub async fn can_contact(&self, name: AuthorityName) -> bool {
let mut lock = self.health.lock().await;
let entry = lock.entry(name).or_default();
entry.can_contact_now()
entry.can_initiate_contact_now()
}
}

impl<A> ActiveAuthority<A>
where
A: AuthorityAPI + Send + Sync + 'static + Clone,
{
// TODO: Active tasks go here + logic to spawn them all
pub async fn spawn_all_active_processes(self) -> Option<()> {
/// Spawn all active tasks.
pub async fn spawn_all_active_processes(self) -> JoinHandle<()> {
// Spawn a task to take care of gossip
let _gossip_join = tokio::task::spawn(async move {
tokio::task::spawn(async move {
gossip_process(&self, 4).await;
});

Some(())
})
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,291 @@
// Copyright (c) 2021, Facebook, Inc. and its affiliates
// Copyright (c) 2022, Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use crate::authority::AuthorityState;
use crate::authority::AuthorityStore;
use crate::authority_aggregator::authority_aggregator_tests::*;
use crate::authority_client::{AuthorityAPI, BatchInfoResponseItemStream};
use async_trait::async_trait;
use std::borrow::Borrow;
use std::collections::BTreeMap;
use std::sync::Arc;
use std::{env, fs};
use sui_adapter::genesis;
use sui_types::base_types::*;
use sui_types::batch::{AuthorityBatch, SignedBatch, UpdateItem};
use sui_types::committee::Committee;
use sui_types::crypto::{get_key_pair, KeyPair, PublicKeyBytes};
use sui_types::error::SuiError;
use sui_types::messages::{
AccountInfoRequest, AccountInfoResponse, BatchInfoRequest, BatchInfoResponseItem,
ConfirmationTransaction, ConsensusTransaction, ObjectInfoRequest, ObjectInfoResponse,
Transaction, TransactionInfoRequest, TransactionInfoResponse,
};
use sui_types::messages_checkpoint::{CheckpointRequest, CheckpointResponse};
use sui_types::object::Object;

#[derive(Clone)]
pub struct TestBatch {
pub digests: Vec<TransactionDigest>,
}

#[derive(Clone)]
pub enum BatchAction {
EmitError(),
EmitUpdateItem(),
}

#[derive(Clone)]
pub enum BatchActionInternal {
EmitError(),
EmitUpdateItem(TestBatch),
}

#[derive(Clone)]
pub struct ConfigurableBatchActionClient {
state: Arc<AuthorityState>,
pub action_sequence_internal: Vec<BatchActionInternal>,
}

impl ConfigurableBatchActionClient {
#[cfg(test)]
pub async fn new(committee: Committee, address: PublicKeyBytes, secret: KeyPair) -> Self {
// Random directory
let dir = env::temp_dir();
let path = dir.join(format!("DB_{:?}", ObjectID::random()));
fs::create_dir(&path).unwrap();

let store = Arc::new(AuthorityStore::open(path, None));
let state = AuthorityState::new(
committee.clone(),
address,
Arc::pin(secret),
store,
None,
None,
&sui_config::genesis::Genesis::get_default_genesis(),
)
.await;

ConfigurableBatchActionClient {
state: Arc::new(state),
action_sequence_internal: Vec::new(),
}
}

#[cfg(test)]
pub fn register_action_sequence(&mut self, actions: Vec<BatchActionInternal>) {
self.action_sequence_internal = actions;
}
}

#[async_trait]
impl AuthorityAPI for ConfigurableBatchActionClient {
async fn handle_transaction(
&self,
transaction: Transaction,
) -> Result<TransactionInfoResponse, SuiError> {
let state = self.state.clone();
state.handle_transaction(transaction).await
}

async fn handle_confirmation_transaction(
&self,
transaction: ConfirmationTransaction,
) -> Result<TransactionInfoResponse, SuiError> {
let state = self.state.clone();
let result = state.handle_confirmation_transaction(transaction).await;
result
}

async fn handle_consensus_transaction(
&self,
_transaction: ConsensusTransaction,
) -> Result<TransactionInfoResponse, SuiError> {
Ok(TransactionInfoResponse {
signed_transaction: None,
certified_transaction: None,
signed_effects: None,
})
}

async fn handle_account_info_request(
&self,
_request: AccountInfoRequest,
) -> Result<AccountInfoResponse, SuiError> {
Ok(AccountInfoResponse {
object_ids: vec![],
owner: Default::default(),
})
}

async fn handle_object_info_request(
&self,
request: ObjectInfoRequest,
) -> Result<ObjectInfoResponse, SuiError> {
let state = self.state.clone();
let x = state.handle_object_info_request(request).await;
x
}

/// Handle Object information requests for this account.
async fn handle_transaction_info_request(
&self,
request: TransactionInfoRequest,
) -> Result<TransactionInfoResponse, SuiError> {
let result = self.state.handle_transaction_info_request(request).await;
result
}

/// Handle Batch information requests for this authority.
async fn handle_batch_stream(
&self,
_request: BatchInfoRequest,
) -> Result<BatchInfoResponseItemStream, SuiError> {
let mut last_batch = AuthorityBatch::initial();
let actions = &self.action_sequence_internal;
let secret = self.state.secret.clone();
let name = self.state.name;
let mut items: Vec<Result<BatchInfoResponseItem, SuiError>> = Vec::new();
let mut seq = 0;
let _ = actions.into_iter().for_each(|action| {
match action {
BatchActionInternal::EmitUpdateItem(test_batch) => {
let mut temp_items: Vec<Result<BatchInfoResponseItem, SuiError>> = Vec::new();
let mut transactions = Vec::new();
for digest in test_batch.digests.clone() {
transactions.push((seq, digest));
// Safe client requires batches arrive first
temp_items.push(Ok(BatchInfoResponseItem(UpdateItem::Transaction((
seq, digest,
)))));
seq += 1;
}
let new_batch = AuthorityBatch::make_next(&last_batch, &transactions).unwrap();
last_batch = new_batch;
items.push({
let item = SignedBatch::new(last_batch.clone(), &*secret, name);
Ok(BatchInfoResponseItem(UpdateItem::Batch(item)))
});
for temp_item in temp_items {
items.push(temp_item)
}
}
BatchActionInternal::EmitError() => {
items.push(Err(SuiError::GenericAuthorityError {
error: "Synthetic authority error".to_string(),
}))
}
};
});

Ok(Box::pin(tokio_stream::iter(items)))
}

async fn handle_checkpoint(
&self,
_request: CheckpointRequest,
) -> Result<CheckpointResponse, SuiError> {
todo!();
}
}

#[cfg(test)]
pub async fn init_configurable_authorities(
authority_action: Vec<BatchAction>,
) -> (
BTreeMap<AuthorityName, ConfigurableBatchActionClient>,
Vec<Arc<AuthorityState>>,
Vec<TransactionDigest>,
) {
let authority_count = 4;
let (addr1, key1) = get_key_pair();
let mut gas_objects = Vec::new();
for _i in 0..authority_action.len() {
gas_objects.push(Object::with_owner_for_testing(addr1));
}
let genesis_objects = authority_genesis_objects(authority_count, gas_objects.clone());

// Create committee.
let mut key_pairs = Vec::new();
let mut voting_rights = BTreeMap::new();
for _ in 0..authority_count {
let (_, key_pair) = get_key_pair();
let authority_name = *key_pair.public_key_bytes();
voting_rights.insert(authority_name, 1);
key_pairs.push((authority_name, key_pair));
}
let committee = Committee::new(0, voting_rights);

// Create Authority Clients and States.
let mut clients = Vec::new();
let mut names = Vec::new();
let mut states = Vec::new();
for ((authority_name, secret), objects) in key_pairs.into_iter().zip(genesis_objects) {
let client =
ConfigurableBatchActionClient::new(committee.clone(), authority_name, secret).await;
for object in objects {
client.state.insert_genesis_object(object).await;
}
states.push(client.state.clone());
names.push(authority_name);
clients.push(client);
}

// Execute transactions for every EmitUpdateItem Action, use the digest of the transaction to
// create a batch action internal sequence.
let mut executed_digests = Vec::new();
let mut batch_action_internal = Vec::new();
let framework_obj_ref = genesis::get_framework_object_ref();

for (action, gas_object) in authority_action.iter().zip(gas_objects) {
if let BatchAction::EmitUpdateItem() = action {
let temp_client = clients[0].borrow();
let gas_ref = get_latest_ref(temp_client, gas_object.id()).await;
let transaction =
crate_object_move_transaction(addr1, &key1, addr1, 100, framework_obj_ref, gas_ref);

for tx_client in clients.iter_mut().take(committee.quorum_threshold()) {
// Do transactions.
do_transaction(tx_client, &transaction).await;
}
// Add the digest and number to the internal actions.
let t_b = TestBatch {
digests: vec![*transaction.digest()],
};
batch_action_internal.push(BatchActionInternal::EmitUpdateItem(t_b));
executed_digests.push(transaction.digest().clone());
}
if let BatchAction::EmitError() = action {
batch_action_internal.push(BatchActionInternal::EmitError());
}
}

// Create BtreeMap of names to clients.
let mut authority_clients = BTreeMap::new();
for (name, client) in names.into_iter().zip(clients) {
authority_clients.insert(name, client);
}

// Execute certificate for each digest, and register the action sequence on the authorities who executed the certificates.
for digest in executed_digests.clone() {
// Get a cert
let authority_clients_ref: Vec<_> = authority_clients.values().collect();
let authority_clients_slice = authority_clients_ref.as_slice();
let cert1 = extract_cert(authority_clients_slice, &committee, &digest).await;

// Submit the cert to 2f+1 authorities.
for (_, cert_client) in authority_clients
.iter_mut()
.take(committee.quorum_threshold())
{
_ = do_cert(cert_client, &cert1).await;

// Register the internal actions to client
cert_client.register_action_sequence(batch_action_internal.clone());
}
}

(authority_clients, states, executed_digests)
}
Loading

0 comments on commit dacc007

Please sign in to comment.