Skip to content

Commit

Permalink
sui: unconditionally process certs even if a client connection drops
Browse files Browse the repository at this point in the history
  • Loading branch information
bmwill committed Aug 2, 2022
1 parent 929d3a0 commit 2a2e482
Showing 1 changed file with 60 additions and 46 deletions.
106 changes: 60 additions & 46 deletions crates/sui-core/src/authority_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ impl AuthorityServer {
.server_builder()
.add_service(ValidatorServer::new(ValidatorService {
state: self.state,
consensus_adapter: self.consensus_adapter,
consensus_adapter: Arc::new(self.consensus_adapter),
_checkpoint_consensus_handle: None,
}))
.bind(&address)
Expand All @@ -156,7 +156,7 @@ impl AuthorityServer {

pub struct ValidatorService {
state: Arc<AuthorityState>,
consensus_adapter: ConsensusAdapter,
consensus_adapter: Arc<ConsensusAdapter>,
_checkpoint_consensus_handle: Option<JoinHandle<()>>,
}

Expand Down Expand Up @@ -238,61 +238,27 @@ impl ValidatorService {

Ok(Self {
state,
consensus_adapter,
consensus_adapter: Arc::new(consensus_adapter),
_checkpoint_consensus_handle: checkpoint_consensus_handle,
})
}
}

#[async_trait]
impl Validator for ValidatorService {
async fn transaction(
&self,
request: tonic::Request<Transaction>,
) -> Result<tonic::Response<TransactionInfoResponse>, tonic::Status> {
let mut transaction = request.into_inner();

transaction
.verify()
.map_err(|e| tonic::Status::invalid_argument(e.to_string()))?;
//TODO This is really really bad, we should have different types for signature-verified transactions
transaction.is_verified = true;

let tx_digest = transaction.digest();

// Enable Trace Propagation across spans/processes using tx_digest
let span = tracing::debug_span!(
"process_tx",
?tx_digest,
tx_kind = transaction.data.kind_as_str()
);

let info = self
.state
.handle_transaction(transaction)
.instrument(span)
.await
.map_err(|e| tonic::Status::internal(e.to_string()))?;

Ok(tonic::Response::new(info))
}

async fn handle_certificate(
&self,
state: Arc<AuthorityState>,
consensus_adapter: Arc<ConsensusAdapter>,
request: tonic::Request<CertifiedTransaction>,
) -> Result<tonic::Response<TransactionInfoResponse>, tonic::Status> {
let mut certificate = request.into_inner();
// 1) Verify certificate
certificate
.verify(&self.state.committee.load())
.verify(&state.committee.load())
.map_err(|e| tonic::Status::invalid_argument(e.to_string()))?;
//TODO This is really really bad, we should have different types for signature verified transactions
certificate.is_verified = true;

// 2) Check idempotency
let digest = certificate.digest();
if let Some(response) = self
.state
if let Some(response) = state
.check_tx_already_executed(digest)
.await
.map_err(|e| tonic::Status::internal(e.to_string()))?
Expand All @@ -303,13 +269,12 @@ impl Validator for ValidatorService {
// 3) If it's a shared object transaction and requires consensus, we need to do so.
// This will wait until either timeout or we have heard back from consensus.
if certificate.contains_shared_object()
&& !self
.state
&& !state
.transaction_shared_locks_exist(&certificate)
.await
.map_err(|e| tonic::Status::internal(e.to_string()))?
{
self.consensus_adapter
consensus_adapter
.submit(&ConsensusTransaction::UserTransaction(Box::new(
certificate.clone(),
)))
Expand All @@ -325,15 +290,64 @@ impl Validator for ValidatorService {
tx_kind = certificate.data.kind_as_str()
);

let response = self
.state
let response = state
.handle_certificate(certificate)
.instrument(span)
.await
.map_err(|e| tonic::Status::internal(e.to_string()))?;

Ok(tonic::Response::new(response))
}
}

#[async_trait]
impl Validator for ValidatorService {
async fn transaction(
&self,
request: tonic::Request<Transaction>,
) -> Result<tonic::Response<TransactionInfoResponse>, tonic::Status> {
let mut transaction = request.into_inner();

transaction
.verify()
.map_err(|e| tonic::Status::invalid_argument(e.to_string()))?;
//TODO This is really really bad, we should have different types for signature-verified transactions
transaction.is_verified = true;

let tx_digest = transaction.digest();

// Enable Trace Propagation across spans/processes using tx_digest
let span = tracing::debug_span!(
"process_tx",
?tx_digest,
tx_kind = transaction.data.kind_as_str()
);

let info = self
.state
.handle_transaction(transaction)
.instrument(span)
.await
.map_err(|e| tonic::Status::internal(e.to_string()))?;

Ok(tonic::Response::new(info))
}

async fn handle_certificate(
&self,
request: tonic::Request<CertifiedTransaction>,
) -> Result<tonic::Response<TransactionInfoResponse>, tonic::Status> {
let state = self.state.clone();
let consensus_adapter = self.consensus_adapter.clone();

// Spawns a task which handles the certificate. The task will unconditionally continue
// processing in the event that the client connection is dropped.
tokio::spawn(
async move { Self::handle_certificate(state, consensus_adapter, request).await },
)
.await
.unwrap()
}

async fn account_info(
&self,
Expand Down

0 comments on commit 2a2e482

Please sign in to comment.