Skip to content

Commit

Permalink
add subjective timestamps on sui-nodes (MystenLabs#2537)
Browse files Browse the repository at this point in the history
* add timestamps

* move to index store
  • Loading branch information
longbowlu authored Jun 14, 2022
1 parent e83495a commit c99677a
Show file tree
Hide file tree
Showing 19 changed files with 762 additions and 689 deletions.
2 changes: 2 additions & 0 deletions crates/generate-json-rpc-spec/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ async fn create_transfer_response(
TransactionEffectsResponse {
certificate,
effects,
timestamp_ms: None,
},
))
} else {
Expand Down Expand Up @@ -375,6 +376,7 @@ async fn get_nft_response(
let tx = TransactionResponse::EffectResponse(TransactionEffectsResponse {
certificate,
effects,
timestamp_ms: None,
});
Ok((tx, object))
} else {
Expand Down
25 changes: 23 additions & 2 deletions crates/sui-core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use crate::{
};
use arc_swap::ArcSwap;
use async_trait::async_trait;
use chrono::prelude::*;
use move_binary_format::CompiledModule;
use move_bytecode_utils::module_cache::SyncModuleCache;
use move_core_types::{
Expand Down Expand Up @@ -536,13 +537,15 @@ impl AuthorityState {
digest: &TransactionDigest,
cert: &CertifiedTransaction,
effects: &SignedTransactionEffects,
timestamp_ms: u64,
) -> SuiResult {
indexes.index_tx(
cert.sender_address(),
cert.data.input_objects()?.iter().map(|o| o.object_id()),
effects.effects.mutated_and_created(),
seq,
digest,
timestamp_ms,
)
}

Expand All @@ -562,16 +565,22 @@ impl AuthorityState {
}
};

let timestamp_ms = Self::unixtime_now_ms();

// Index tx
if let Some(indexes) = &self.indexes {
if let Err(e) = self.index_tx(indexes.as_ref(), seq, digest, &cert, &effects) {
if let Err(e) =
self.index_tx(indexes.as_ref(), seq, digest, &cert, &effects, timestamp_ms)
{
warn!(?digest, "Couldn't index tx: {}", e);
}
}

// Emit events
if let Some(event_handler) = &self.event_handler {
event_handler.process_events(&effects.effects).await;
event_handler
.process_events(&effects.effects, timestamp_ms)
.await;
}

Ok(())
Expand Down Expand Up @@ -620,6 +629,11 @@ impl AuthorityState {
Ok(())
}

pub fn unixtime_now_ms() -> u64 {
let ts_ms = Utc::now().timestamp_millis();
u64::try_from(ts_ms).expect("Travelling in time machine")
}

/// Check if we need to submit this transaction to consensus. We usually do, unless (i) we already
/// processed the transaction and we can immediately return the effects, or (ii) we already locked
/// all shared-objects of the transaction and can (re-)attempt execution.
Expand Down Expand Up @@ -1100,6 +1114,13 @@ impl AuthorityState {
}
}

pub async fn get_timestamp_ms(
&self,
digest: &TransactionDigest,
) -> Result<Option<u64>, anyhow::Error> {
Ok(self.get_indexes()?.get_timestamp_ms(digest)?)
}

pub async fn get_transactions_by_input_object(
&self,
object: ObjectID,
Expand Down
2 changes: 1 addition & 1 deletion crates/sui-core/src/authority/authority_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ impl<const ALL_OBJ_VER: bool, S: Eq + Serialize + for<'de> Deserialize<'de>>
{
/// Open an authority store by directory path
pub fn open<P: AsRef<Path>>(path: P, db_options: Option<Options>) -> Self {
let (options, point_lookup) = default_db_options(db_options);
let (options, point_lookup) = default_db_options(db_options, None);

let db = {
let path = &path;
Expand Down
2 changes: 1 addition & 1 deletion crates/sui-core/src/checkpoints/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ impl CheckpointStore {
name: AuthorityName,
secret: StableSyncAuthoritySigner,
) -> Result<CheckpointStore, SuiError> {
let (options, point_lookup) = default_db_options(db_options);
let (options, point_lookup) = default_db_options(db_options, None);

let db = open_cf_opts(
&path,
Expand Down
17 changes: 5 additions & 12 deletions crates/sui-core/src/event_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@

use crate::authority::{AuthorityStore, AuthorityStoreWrapper};
use crate::streamer::Streamer;
use chrono::prelude::*;
use move_bytecode_utils::module_cache::SyncModuleCache;
use std::convert::TryFrom;
use std::sync::Arc;
use sui_types::{
error::{SuiError, SuiResult},
Expand All @@ -17,11 +15,6 @@ use tracing::{debug, error};

const EVENT_DISPATCH_BUFFER_SIZE: usize = 1000;

pub fn get_unixtime_ms() -> u64 {
let ts_ms = Utc::now().timestamp_millis();
u64::try_from(ts_ms).expect("Travelling in time machine")
}

pub struct EventHandler {
module_cache: SyncModuleCache<AuthorityStoreWrapper>,
streamer_queue: Sender<EventEnvelope>,
Expand All @@ -37,16 +30,16 @@ impl EventHandler {
}
}

pub async fn process_events(&self, effects: &TransactionEffects) {
pub async fn process_events(&self, effects: &TransactionEffects, timestamp_ms: u64) {
// serializely dispatch event processing to honor events' orders.
for event in &effects.events {
if let Err(e) = self.process_event(event).await {
if let Err(e) = self.process_event(event, timestamp_ms).await {
error!(error =? e, "Failed to send EventEnvolope to dispatch");
}
}
}

pub async fn process_event(&self, event: &Event) -> SuiResult {
pub async fn process_event(&self, event: &Event, timestamp_ms: u64) -> SuiResult {
let envolope = match event {
Event::MoveEvent { .. } => {
debug!(event =? event, "Process MoveEvent.");
Expand All @@ -57,13 +50,13 @@ impl EventHandler {
error: e.to_string(),
}
})?;
EventEnvelope::new(get_unixtime_ms(), None, event.clone(), Some(json_value))
EventEnvelope::new(timestamp_ms, None, event.clone(), Some(json_value))
}
Ok(None) => unreachable!("Expect a MoveStruct from a MoveEvent."),
Err(e) => return Err(e),
}
}
_ => EventEnvelope::new(get_unixtime_ms(), None, event.clone(), None),
_ => EventEnvelope::new(timestamp_ms, None, event.clone(), None),
};

// TODO store events here
Expand Down
1 change: 1 addition & 0 deletions crates/sui-core/src/gateway_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -929,6 +929,7 @@ where
TransactionEffectsResponse {
certificate: certificate.try_into()?,
effects: effects.into(),
timestamp_ms: None,
},
));
}
Expand Down
1 change: 1 addition & 0 deletions crates/sui-core/src/gateway_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ mod gateway_types_tests;
pub struct TransactionEffectsResponse {
pub certificate: SuiCertifiedTransaction,
pub effects: SuiTransactionEffects,
pub timestamp_ms: Option<u64>,
}

#[derive(Serialize, Deserialize, Debug, JsonSchema)]
Expand Down
1 change: 1 addition & 0 deletions crates/sui-core/src/query_helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ impl<const ALL_OBJ_VER: bool, S: Eq + Serialize + for<'de> Deserialize<'de>>
Some(certificate) => Ok(TransactionEffectsResponse {
certificate: certificate.try_into()?,
effects: database.get_effects(&digest)?.into(),
timestamp_ms: None,
}),
None => Err(anyhow!(SuiError::TransactionNotFound { digest })),
}
Expand Down
5 changes: 4 additions & 1 deletion crates/sui-gateway/src/read_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,10 @@ impl RpcReadApiServer for ReadApi {
&self,
digest: TransactionDigest,
) -> RpcResult<TransactionEffectsResponse> {
Ok(self.state.get_transaction(digest).await?)
Ok(TransactionEffectsResponse {
timestamp_ms: self.state.get_timestamp_ms(&digest).await?,
..self.state.get_transaction(digest).await?
})
}
}

Expand Down
70 changes: 41 additions & 29 deletions crates/sui-open-rpc/samples/objects.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,27 @@
"fields": {
"description": "An NFT created by the wallet Command Line Tool",
"id": {
"id": "0x335e6f185f5b30b1a0813f16334cc617f7a07486",
"id": "0xd77a71ae5148faecb61b22b65432fd84a97e2972",
"version": 1
},
"name": "Example NFT",
"url": "ipfs://bafkreibngqhl3gaa7daob4i2vccziay2jjlp435cf66vhono7nrvww53ty"
"url": {
"type": "0x1::ascii::String",
"fields": {
"bytes": "aXBmczovL2JhZmtyZWlibmdxaGwzZ2FhN2Rhb2I0aTJ2Y2N6aWF5MmpqbHA0MzVjZjY2dmhvbm83bnJ2d3c1M3R5"
}
}
}
},
"owner": {
"AddressOwner": "0x9bdf05607f01739b73c082af418327be9dcf6015"
"AddressOwner": "0x51372a6dcf5799e4c39f7cd6bc68e27bdc92728c"
},
"previousTransaction": "3OR/i3o6y2eB2P1pG8HcazfcmRSl2n6KOemT8Z25S5c=",
"previousTransaction": "5xyblLSuf6ObBcGzwcp6NFy+1oSi5HsjzyleIlF5K+A=",
"storageRebate": 25,
"reference": {
"objectId": "0x335e6f185f5b30b1a0813f16334cc617f7a07486",
"objectId": "0xd77a71ae5148faecb61b22b65432fd84a97e2972",
"version": 1,
"digest": "NPSrjJNi5MTCVYkOJDVNhMuw7XKKCXk2qn+EJMHib2Q="
"digest": "CWR+Q2LYPSXfQIbn7oTaCG0z9guVfqUTlK5Ohpmvbjc="
}
}
},
Expand All @@ -36,20 +41,20 @@
"fields": {
"balance": 100000,
"id": {
"id": "0x06b066a852eca1458799bc6f735b92ed660863c5",
"id": "0x02f7f9837d57a46a77f59b3fe387cad5dc426ab4",
"version": 0
}
}
},
"owner": {
"AddressOwner": "0x9bdf05607f01739b73c082af418327be9dcf6015"
"AddressOwner": "0x51372a6dcf5799e4c39f7cd6bc68e27bdc92728c"
},
"previousTransaction": "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA=",
"storageRebate": 0,
"reference": {
"objectId": "0x06b066a852eca1458799bc6f735b92ed660863c5",
"objectId": "0x02f7f9837d57a46a77f59b3fe387cad5dc426ab4",
"version": 0,
"digest": "3tmpN6lal1Gpn5Ox7GDm1bmnTx9KxxnDSkmLXm+ArgA="
"digest": "sHsUYMENjaREp4QFeewwjOjy20eYYcnsusfUSWLaXas="
}
}
},
Expand All @@ -59,16 +64,16 @@
"data": {
"dataType": "package",
"disassembled": {
"M1": "// Move bytecode v5\nmodule 803bf11ade99118bd1cdc5f8acc8583431eb90cb.M1 {\nstruct Forge has store, key {\n\tid: VersionedID,\n\tswords_created: u64\n}\nstruct Sword has store, key {\n\tid: VersionedID,\n\tmagic: u64,\n\tstrength: u64\n}\n\ninit(Arg0: &mut TxContext) {\nB0:\n\t0: CopyLoc[0](Arg0: &mut TxContext)\n\t1: Call[6](new_id(&mut TxContext): VersionedID)\n\t2: LdU64(0)\n\t3: Pack[0](Forge)\n\t4: StLoc[1](loc0: Forge)\n\t5: MoveLoc[1](loc0: Forge)\n\t6: MoveLoc[0](Arg0: &mut TxContext)\n\t7: FreezeRef\n\t8: Call[7](sender(&TxContext): address)\n\t9: Call[0](transfer<Forge>(Forge, address))\n\t10: Ret\n}\npublic magic(Arg0: &Sword): u64 {\nB0:\n\t0: MoveLoc[0](Arg0: &Sword)\n\t1: ImmBorrowField[0](Sword.magic: u64)\n\t2: ReadRef\n\t3: Ret\n}\npublic strength(Arg0: &Sword): u64 {\nB0:\n\t0: MoveLoc[0](Arg0: &Sword)\n\t1: ImmBorrowField[1](Sword.strength: u64)\n\t2: ReadRef\n\t3: Ret\n}\npublic(script) sword_create(Arg0: &mut Forge, Arg1: u64, Arg2: u64, Arg3: address, Arg4: &mut TxContext) {\nB0:\n\t0: MoveLoc[4](Arg4: &mut TxContext)\n\t1: Call[6](new_id(&mut TxContext): VersionedID)\n\t2: MoveLoc[1](Arg1: u64)\n\t3: MoveLoc[2](Arg2: u64)\n\t4: Pack[1](Sword)\n\t5: StLoc[5](loc0: Sword)\n\t6: MoveLoc[5](loc0: Sword)\n\t7: MoveLoc[3](Arg3: address)\n\t8: Call[1](transfer<Sword>(Sword, address))\n\t9: CopyLoc[0](Arg0: &mut Forge)\n\t10: ImmBorrowField[2](Forge.swords_created: u64)\n\t11: ReadRef\n\t12: LdU64(1)\n\t13: Add\n\t14: MoveLoc[0](Arg0: &mut Forge)\n\t15: MutBorrowField[2](Forge.swords_created: u64)\n\t16: WriteRef\n\t17: Ret\n}\npublic(script) sword_transfer(Arg0: Sword, Arg1: address) {\nB0:\n\t0: MoveLoc[0](Arg0: Sword)\n\t1: MoveLoc[1](Arg1: address)\n\t2: Call[1](transfer<Sword>(Sword, address))\n\t3: Ret\n}\npublic swords_created(Arg0: &Forge): u64 {\nB0:\n\t0: MoveLoc[0](Arg0: &Forge)\n\t1: ImmBorrowField[2](Forge.swords_created: u64)\n\t2: ReadRef\n\t3: Ret\n}\n}"
"M1": "// Move bytecode v5\nmodule 90613dc81d6867706c5d27e2e9b8f4b1fca3cd86.M1 {\nstruct Forge has store, key {\n\tid: VersionedID,\n\tswords_created: u64\n}\nstruct Sword has store, key {\n\tid: VersionedID,\n\tmagic: u64,\n\tstrength: u64\n}\n\ninit(Arg0: &mut TxContext) {\nB0:\n\t0: CopyLoc[0](Arg0: &mut TxContext)\n\t1: Call[6](new_id(&mut TxContext): VersionedID)\n\t2: LdU64(0)\n\t3: Pack[0](Forge)\n\t4: StLoc[1](loc0: Forge)\n\t5: MoveLoc[1](loc0: Forge)\n\t6: MoveLoc[0](Arg0: &mut TxContext)\n\t7: FreezeRef\n\t8: Call[7](sender(&TxContext): address)\n\t9: Call[0](transfer<Forge>(Forge, address))\n\t10: Ret\n}\npublic magic(Arg0: &Sword): u64 {\nB0:\n\t0: MoveLoc[0](Arg0: &Sword)\n\t1: ImmBorrowField[0](Sword.magic: u64)\n\t2: ReadRef\n\t3: Ret\n}\npublic strength(Arg0: &Sword): u64 {\nB0:\n\t0: MoveLoc[0](Arg0: &Sword)\n\t1: ImmBorrowField[1](Sword.strength: u64)\n\t2: ReadRef\n\t3: Ret\n}\nentry public sword_create(Arg0: &mut Forge, Arg1: u64, Arg2: u64, Arg3: address, Arg4: &mut TxContext) {\nB0:\n\t0: MoveLoc[4](Arg4: &mut TxContext)\n\t1: Call[6](new_id(&mut TxContext): VersionedID)\n\t2: MoveLoc[1](Arg1: u64)\n\t3: MoveLoc[2](Arg2: u64)\n\t4: Pack[1](Sword)\n\t5: StLoc[5](loc0: Sword)\n\t6: MoveLoc[5](loc0: Sword)\n\t7: MoveLoc[3](Arg3: address)\n\t8: Call[1](transfer<Sword>(Sword, address))\n\t9: CopyLoc[0](Arg0: &mut Forge)\n\t10: ImmBorrowField[2](Forge.swords_created: u64)\n\t11: ReadRef\n\t12: LdU64(1)\n\t13: Add\n\t14: MoveLoc[0](Arg0: &mut Forge)\n\t15: MutBorrowField[2](Forge.swords_created: u64)\n\t16: WriteRef\n\t17: Ret\n}\nentry public sword_transfer(Arg0: Sword, Arg1: address) {\nB0:\n\t0: MoveLoc[0](Arg0: Sword)\n\t1: MoveLoc[1](Arg1: address)\n\t2: Call[1](transfer<Sword>(Sword, address))\n\t3: Ret\n}\npublic swords_created(Arg0: &Forge): u64 {\nB0:\n\t0: MoveLoc[0](Arg0: &Forge)\n\t1: ImmBorrowField[2](Forge.swords_created: u64)\n\t2: ReadRef\n\t3: Ret\n}\n}"
}
},
"owner": "Immutable",
"previousTransaction": "nOIeKrwslhm80kuRNrK5bV6pgOfy0iBKHlc7VLPF4TM=",
"previousTransaction": "+bNZtv2XN+c/o9s5wZwS4EHRzw6QOo2MkfNBX+GJcrg=",
"storageRebate": 0,
"reference": {
"objectId": "0x803bf11ade99118bd1cdc5f8acc8583431eb90cb",
"objectId": "0x90613dc81d6867706c5d27e2e9b8f4b1fca3cd86",
"version": 1,
"digest": "XaJuHx/LIq0aIaT2eMdbpJSZDUCMujg6vuY2iCBu0s0="
"digest": "W3L+Zzp6p7wJYgODJQDeFQ5pbmrDhzPvmDrkpfaZrCA="
}
}
},
Expand All @@ -77,38 +82,45 @@
"details": {
"data": {
"dataType": "moveObject",
"type": "0x2661e3b0b6931e01ea04506bdaa2c35edb675fa2::Hero::Hero",
"type": "0x237966f94764a944512afde1acbad0511f5e4fa4::Hero::Hero",
"fields": {
"experience": 0,
"game_id": "0xdc85468dc7bb6fd070272c93320b8c5da3f2c941",
"game_id": "0x0f7423f9669a7f9aee253a8c5aa3319045c271b0",
"hp": 100,
"id": {
"id": "0x855b95af937b6f585bbf5c8763967e333b29aa28",
"id": "0x84752f5a3dc5c08181cb457b7fc7bc307d6ccfe9",
"version": 1
},
"sword": {
"type": "0x2661e3b0b6931e01ea04506bdaa2c35edb675fa2::Hero::Sword",
"type": "0x1::option::Option<0x237966f94764a944512afde1acbad0511f5e4fa4::Hero::Sword>",
"fields": {
"game_id": "0xdc85468dc7bb6fd070272c93320b8c5da3f2c941",
"id": {
"id": "0xda4140d028eab81c66a8343ee3d35f8da2aa4d34",
"version": 0
},
"magic": 10,
"strength": 1
"vec": [
{
"type": "0x237966f94764a944512afde1acbad0511f5e4fa4::Hero::Sword",
"fields": {
"game_id": "0x0f7423f9669a7f9aee253a8c5aa3319045c271b0",
"id": {
"id": "0x63c838153c84a9a6cb6d277f940115a7efce8d4e",
"version": 0
},
"magic": 10,
"strength": 1
}
}
]
}
}
}
},
"owner": {
"AddressOwner": "0x9bdf05607f01739b73c082af418327be9dcf6015"
"AddressOwner": "0x51372a6dcf5799e4c39f7cd6bc68e27bdc92728c"
},
"previousTransaction": "u4MC7Z8cx7Al/FsWWqFUrmxeyiTuQzoln0E59LaO3dE=",
"previousTransaction": "pZS8rlnRECnXy38Lt8Hlm0XB95TLBFidz0iEha4EX9E=",
"storageRebate": 22,
"reference": {
"objectId": "0x855b95af937b6f585bbf5c8763967e333b29aa28",
"objectId": "0x84752f5a3dc5c08181cb457b7fc7bc307d6ccfe9",
"version": 1,
"digest": "ez3cP6wEdromkjgD11pVlNHELi30XCCT5BfXhVYlvlg="
"digest": "cqgIERMs2AajwjCVJjsJDEZfXO/3cWOxZUFO/2huPuA="
}
}
}
Expand Down
Loading

0 comments on commit c99677a

Please sign in to comment.