Skip to content

Commit

Permalink
[event] trigger events processing (MystenLabs#2326)
Browse files Browse the repository at this point in the history
* rebase & comments

* format
  • Loading branch information
longbowlu authored Jun 3, 2022
1 parent 9ee971a commit c809552
Show file tree
Hide file tree
Showing 10 changed files with 151 additions and 24 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/sui-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ edition = "2021"
rocksdb = "0.18.0"
anyhow = { version = "1.0.57", features = ["backtrace"] }
bcs = "0.1.3"
chrono = "0.4.0"
futures = "0.3.21"
rand = "0.7.3"
bytes = "1.1.0"
Expand Down
11 changes: 11 additions & 0 deletions crates/sui-core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::{
authority_batch::{BroadcastReceiver, BroadcastSender},
checkpoints::CheckpointStore,
epoch::EpochInfoLocals,
event_handler::EventHandler,
execution_engine,
gateway_types::TransactionEffectsResponse,
transaction_input_checker,
Expand Down Expand Up @@ -244,6 +245,8 @@ pub struct AuthorityState {

module_cache: SyncModuleCache<AuthorityStoreWrapper>, // TODO: use strategies (e.g. LRU?) to constraint memory usage

event_handler: Option<Arc<EventHandler>>,

/// The checkpoint store
pub(crate) checkpoints: Option<Arc<Mutex<CheckpointStore>>>,

Expand Down Expand Up @@ -526,6 +529,11 @@ impl AuthorityState {
self.update_state(temporary_store, &certificate, &signed_effects)
.await?;

// Each certificate only reaches here once
if let Some(event_handler) = &self.event_handler {
event_handler.process_events(&signed_effects.effects).await;
}

Ok(TransactionInfoResponse {
signed_transaction: self.database.get_transaction(&transaction_digest)?,
certified_transaction: Some(certificate),
Expand Down Expand Up @@ -822,6 +830,9 @@ impl AuthorityState {
database: store.clone(),
indexes,
module_cache: SyncModuleCache::new(AuthorityStoreWrapper(store.clone())),
// `event_handler` uses a separate in-mem cache from `module_cache`
// this is because they largely deal with different types of MoveStructs
event_handler: Some(Arc::new(EventHandler::new(store.clone()))),
checkpoints,
batch_channels: tx,
batch_notifier: Arc::new(
Expand Down
76 changes: 76 additions & 0 deletions crates/sui-core/src/event_handler.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Copyright (c) 2022, Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use crate::authority::{AuthorityStore, AuthorityStoreWrapper};
use crate::streamer::Streamer;
use chrono::prelude::*;
use move_bytecode_utils::module_cache::SyncModuleCache;
use std::convert::TryFrom;
use std::sync::Arc;
use sui_types::{
error::{SuiError, SuiResult},
event::{Event, EventEnvelope},
messages::TransactionEffects,
};
use tokio::sync::mpsc::{self, Sender};
use tracing::{debug, error};

const EVENT_DISPATCH_BUFFER_SIZE: usize = 1000;

pub fn get_unixtime_ms() -> u64 {
let ts_ms = Utc::now().timestamp_millis();
u64::try_from(ts_ms).expect("Travelling in time machine")
}

pub struct EventHandler {
module_cache: SyncModuleCache<AuthorityStoreWrapper>,
streamer_queue: Sender<EventEnvelope>,
}

impl EventHandler {
pub fn new(validator_store: Arc<AuthorityStore>) -> Self {
let (tx, rx) = mpsc::channel::<EventEnvelope>(EVENT_DISPATCH_BUFFER_SIZE);
Streamer::spawn(rx);
Self {
module_cache: SyncModuleCache::new(AuthorityStoreWrapper(validator_store)),
streamer_queue: tx,
}
}

pub async fn process_events(&self, effects: &TransactionEffects) {
// serializely dispatch event processing to honor events' orders.
for event in &effects.events {
if let Err(e) = self.process_event(event).await {
error!(error =? e, "Failed to send EventEnvolope to dispatch");
}
}
}

pub async fn process_event(&self, event: &Event) -> SuiResult {
let envolope = match event {
Event::MoveEvent { .. } => {
debug!(event =? event, "Process MoveEvent.");
match event.extract_move_struct(&self.module_cache) {
Ok(Some(move_struct)) => EventEnvelope::new(
get_unixtime_ms(),
None,
event.clone(),
Some(move_struct),
),
Ok(None) => unreachable!("Expect a MoveStruct from a MoveEvent."),
Err(e) => return Err(e),
}
}
_ => EventEnvelope::new(get_unixtime_ms(), None, event.clone(), None),
};

// TODO store events here

self.streamer_queue
.send(envolope)
.await
.map_err(|e| SuiError::EventFailedToDispatch {
error: e.to_string(),
})
}
}
2 changes: 2 additions & 0 deletions crates/sui-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ pub mod authority_server;
pub mod checkpoints;
pub mod consensus_adapter;
pub mod epoch;
pub mod event_handler;
pub mod execution_engine;
pub mod gateway_state;
pub mod gateway_types;
pub mod safe_client;
pub mod streamer;
pub mod transaction_input_checker;
21 changes: 21 additions & 0 deletions crates/sui-core/src/streamer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Copyright (c) 2022, Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0
use sui_types::event::EventEnvelope;
use tokio::{sync::mpsc::Receiver, task::JoinHandle};
use tracing::debug;

pub struct Streamer {
event_queue: Receiver<EventEnvelope>,
}

impl Streamer {
pub fn spawn(rx: Receiver<EventEnvelope>) -> JoinHandle<()> {
tokio::spawn(async move { Self { event_queue: rx }.stream().await })
}

pub async fn stream(&mut self) {
while let Some(event_envelope) = self.event_queue.recv().await {
debug!(event_envelope =? event_envelope, "Get event");
}
}
}
4 changes: 2 additions & 2 deletions crates/sui-core/tests/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
#[test]
fn test_format() {
// If this test breaks and you intended a format change, you need to run to get the fresh format:
// # cargo -q run --example generate-format -- print > sui_core/tests/staged/sui.yaml
// # cargo -q run --example generate-format -- print > crates/sui-core/tests/staged/sui.yaml

let status = std::process::Command::new("cargo")
.current_dir("..")
Expand All @@ -17,7 +17,7 @@ fn test_format() {
status.success(),
"\n\
If this test breaks and you intended a format change, you need to run to get the fresh format:\n\
cargo -q run --example generate-format -- print > sui_core/tests/staged/sui.yaml\n\
cargo -q run --example generate-format -- print > crates/sui-core/tests/staged/sui.yaml\n\
"
);
}
44 changes: 24 additions & 20 deletions crates/sui-core/tests/staged/sui.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -573,72 +573,76 @@ SuiError:
STRUCT:
- error: STR
92:
EventFailedToDispatch:
STRUCT:
- error: STR
93:
QuorumNotReached:
STRUCT:
- errors:
SEQ:
TYPENAME: SuiError
93:
94:
ObjectSerializationError:
STRUCT:
- error: STR
94:
ConcurrentTransactionError: UNIT
95:
IncorrectRecipientError: UNIT
ConcurrentTransactionError: UNIT
96:
IncorrectRecipientError: UNIT
97:
TooManyIncorrectAuthorities:
STRUCT:
- errors:
SEQ:
TUPLE:
- TYPENAME: PublicKeyBytes
- TYPENAME: SuiError
97:
98:
InconsistentGatewayResult:
STRUCT:
- error: STR
98:
99:
GatewayInvalidTxRangeQuery:
STRUCT:
- error: STR
99:
OnlyOneConsensusClientPermitted: UNIT
100:
OnlyOneConsensusClientPermitted: UNIT
101:
ConsensusConnectionBroken:
NEWTYPE: STR
101:
102:
FailedToHearBackFromConsensus:
NEWTYPE: STR
102:
103:
SharedObjectLockingFailure:
NEWTYPE: STR
103:
ListenerCapacityExceeded: UNIT
104:
ListenerCapacityExceeded: UNIT
105:
ConsensusSuiSerializationError:
NEWTYPE: STR
105:
NotASharedObjectTransaction: UNIT
106:
NotASharedObjectTransaction: UNIT
107:
SignatureSeedInvalidLength:
NEWTYPE: U64
107:
108:
HkdfError:
NEWTYPE: STR
108:
109:
SignatureKeyGenError:
NEWTYPE: STR
109:
ValidatorHaltedAtEpochEnd: UNIT
110:
ValidatorHaltedAtEpochEnd: UNIT
111:
InconsistentEpochState:
STRUCT:
- error: STR
111:
112:
RpcError:
NEWTYPE: STR
112:
113:
UnsupportedFeatureError:
STRUCT:
- error: STR
Expand Down
3 changes: 3 additions & 0 deletions crates/sui-types/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,9 @@ pub enum SuiError {
#[error("Authority Error: {error:?}")]
GenericAuthorityError { error: String },

#[error("Failed to dispatch event: {error:?}")]
EventFailedToDispatch { error: String },

#[error(
"Failed to achieve quorum between authorities, cause by : {:#?}",
errors.iter().map(| e | ToString::to_string(&e)).collect::<Vec<String>>()
Expand Down
12 changes: 10 additions & 2 deletions crates/sui-types/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,23 @@ pub struct EventEnvelope {
/// Transaction digest of associated transaction, if any
tx_digest: Option<TransactionDigest>,
/// Specific event type
event: Event,
pub event: Event,
/// MoveStruct (for MoveEvent only)
pub move_struct: Option<MoveStruct>,
}

impl EventEnvelope {
pub fn new(timestamp: u64, tx_digest: Option<TransactionDigest>, event: Event) -> Self {
pub fn new(
timestamp: u64,
tx_digest: Option<TransactionDigest>,
event: Event,
move_struct: Option<MoveStruct>,
) -> Self {
Self {
timestamp,
tx_digest,
event,
move_struct,
}
}

Expand Down

0 comments on commit c809552

Please sign in to comment.