Skip to content

Commit

Permalink
Event Store Initial Implementation (MystenLabs#2507)
Browse files Browse the repository at this point in the history
Evan Chan authored Jun 15, 2022

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
1 parent ea0c9ec commit 74d519b
Showing 8 changed files with 1,116 additions and 63 deletions.
316 changes: 297 additions & 19 deletions Cargo.lock

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions crates/sui-storage/Cargo.toml
Original file line number Diff line number Diff line change
@@ -14,9 +14,13 @@ futures = "0.3.21"
flexstr = "^0.9"
rand = "0.7.3"
serde = { version = "1.0.136", features = ["derive"] }
serde_json = "1.0.80"
tokio = { version = "1.17.0", features = ["full", "tracing"] }
rocksdb = "0.18.0"
tracing = "0.1.34"
sqlx = { version = "0.5", features = [ "runtime-tokio-rustls", "sqlite" ] }
strum = "^0.24"
strum_macros = "^0.24"

sui-types = { path = "../sui-types" }

@@ -26,6 +30,7 @@ workspace-hack = { path = "../workspace-hack"}
move-core-types = { git = "https://github.com/move-language/move", rev = "c2949bc7967de5b93f0850ce4987fc06c529f9f2", features = ["address20"] }

[dev-dependencies]
bcs = "0.1.3"
tempfile = "3.3.0"
num_cpus = "1.13.1"
pretty_assertions = "1.2.0"
Original file line number Diff line number Diff line change
@@ -11,15 +11,20 @@
//! Events are also archived into checkpoints so this API should support that as well.
//!
use async_trait::async_trait;
use move_core_types::language_storage::ModuleId;
use move_core_types::value::MoveValue;
use serde_json::Value;
use sui_types::base_types::{ObjectID, TransactionDigest};
use sui_types::event::{EventEnvelope, EventType};

pub mod sql;

use flexstr::SharedStr;

/// One event pulled out from the EventStore
#[allow(unused)]
#[derive(Clone, Debug, PartialEq)]
pub struct StoredEvent {
/// UTC timestamp in milliseconds
timestamp: u64,
@@ -28,84 +33,111 @@ pub struct StoredEvent {
tx_digest: Option<TransactionDigest>,
/// The variant name from SuiEvent, eg MoveEvent, Publish, etc.
event_type: SharedStr,
/// Object ID of the Move package generating the event
/// Package ID if available
package_id: Option<ObjectID>,
/// Module name of the Move package generating the event
module_name: Option<SharedStr>,
/// Function name that produced the event, for Move Events
function_name: Option<SharedStr>,
/// Object ID of NewObject, DeleteObject, package being published, or object being transferred
object_id: Option<ObjectID>,
/// Individual event fields. As much as possible these should be deconstructed and flattened,
/// ie `{'obj': {'fieldA': 'A', 'fieldB': 'B'}}` should really be broken down to
/// `[('obj.fieldA', 'A'), ('obj.fieldB', 'B')]
///
/// There is no guarantee of ordering in the fields.
///
/// ## Common field names
/// * `object_id` - used by TransferObject, DeleteObject
/// * `version` - used by TransferObject
/// * `destination` - address, in hex bytes, used by TransferObject
/// * `type` - used by TransferObject (TransferType - Coin, ToAddress, ToObject)
fields: Vec<(SharedStr, EventValue)>, // Change this to something based on CBOR for binary values, or our own value types for efficiency
}

/// Enum for different types of values returnable from events in the EventStore
// This is distinct from MoveValue because we want to explicitly represent (and translate)
// blobs and strings, allowing us to use more efficient representations.
#[derive(Clone, Debug, PartialEq)]
pub enum EventValue {
Move(MoveValue),
/// Efficient string representation, no allocation for small strings
String(SharedStr),
/// Arbitrary-length blob. Please use MoveValue::Address for ObjectIDs and similar things.
BinaryBlob(Vec<u8>),
Json(Value),
}

/// An EventStore supports event ingestion and flexible event querying
/// One can think of events as logs. They represent a log of what is happening to Sui.
/// Thus, all different kinds of events fit on a timeline, and one should be able to query for
/// different types of events that happen over that timeline.
trait EventStore<EventIt>
where
EventIt: Iterator<Item = StoredEvent>,
{
#[async_trait]
trait EventStore {
type EventIt: IntoIterator<Item = StoredEvent>;

/// Adds events to the EventStore.
/// Semantics: events are appended, no deduplication is done.
fn add_events(
async fn add_events(
&self,
events: &[EventEnvelope],
checkpoint_num: u64,
) -> Result<(), EventStoreError>;

/// Queries for events emitted by a given transaction, returned in order emitted
/// NOTE: Not all events come from transactions
fn events_for_transaction(&self, digest: TransactionDigest)
-> Result<EventIt, EventStoreError>;
async fn events_for_transaction(
&self,
digest: TransactionDigest,
) -> Result<Self::EventIt, EventStoreError>;

/// Queries for all events of a certain EventType within a given time window.
/// Will return at most limit of the most recent events within the window, sorted in ascending time.
fn events_by_type(
/// Will return at most limit of the most recent events within the window, sorted in descending time.
async fn events_by_type(
&self,
start_time: u64,
end_time: u64,
event_type: EventType,
limit: usize,
) -> Result<EventIt, EventStoreError>;
) -> Result<Self::EventIt, EventStoreError>;

/// Generic event iteration bounded by time. Return in ingestion order.
fn event_iterator(&self, start_time: u64, end_time: u64) -> Result<EventIt, EventStoreError>;
/// start_time is inclusive and end_time is exclusive.
async fn event_iterator(
&self,
start_time: u64,
end_time: u64,
limit: usize,
) -> Result<Self::EventIt, EventStoreError>;

/// Generic event iteration bounded by checkpoint number. Return in ingestion order.
/// Checkpoint numbers are inclusive on both ends.
fn events_by_checkpoint(
async fn events_by_checkpoint(
&self,
start_checkpoint: u64,
end_checkpoint: u64,
) -> Result<EventIt, EventStoreError>;
limit: usize,
) -> Result<Self::EventIt, EventStoreError>;

/// Queries all Move events belonging to a certain Module ID within a given time window.
/// Will return at most limit of the most recent events within the window, sorted in ascending time.
fn events_by_module_id(
/// Will return at most limit of the most recent events within the window, sorted in descending time.
async fn events_by_module_id(
&self,
start_time: u64,
end_time: u64,
module: ModuleId,
limit: usize,
) -> Result<EventIt, EventStoreError>;
) -> Result<Self::EventIt, EventStoreError>;
}

#[derive(Debug)]
pub enum EventStoreError {
GenericError(Box<dyn std::error::Error>),
SqlError(sqlx::Error),
LimitTooHigh(usize),
}

impl From<sqlx::Error> for EventStoreError {
fn from(err: sqlx::Error) -> Self {
EventStoreError::SqlError(err)
}
}
658 changes: 658 additions & 0 deletions crates/sui-storage/src/event_store/sql.rs

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions crates/sui-types/Cargo.toml
Original file line number Diff line number Diff line change
@@ -34,6 +34,7 @@ hkdf = "0.12.3"
digest = "0.10.3"
schemars ="0.8.10"
tonic = "0.7"
strum = "^0.24"
strum_macros = "^0.24"

name-variant = { git = "https://github.com/MystenLabs/mysten-infra", rev = "ff5c1d69057fe93be658377462ca2875a57a0223" }
5 changes: 5 additions & 0 deletions crates/sui-types/src/base_types.rs
Original file line number Diff line number Diff line change
@@ -405,6 +405,11 @@ impl TransactionDigest {
let random_bytes = rand::thread_rng().gen::<[u8; 32]>();
Self::new(random_bytes)
}

/// Translates digest into a Vec of bytes
pub fn to_bytes(&self) -> Vec<u8> {
self.0.to_vec()
}
}

impl AsRef<[u8]> for TransactionDigest {
55 changes: 44 additions & 11 deletions crates/sui-types/src/event.rs
Original file line number Diff line number Diff line change
@@ -3,14 +3,15 @@

use move_bytecode_utils::{layout::TypeLayoutBuilder, module_cache::GetModule};
use move_core_types::{
language_storage::{ModuleId, StructTag, TypeTag},
language_storage::{StructTag, TypeTag},
value::{MoveStruct, MoveTypeLayout},
};
use name_variant::NamedVariant;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use serde_with::{serde_as, Bytes};
use strum_macros::EnumDiscriminants;
use strum::VariantNames;
use strum_macros::{EnumDiscriminants, EnumVariantNames};

use crate::{
base_types::{ObjectID, SequenceNumber, SuiAddress, TransactionDigest},
@@ -23,9 +24,9 @@ use crate::{
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct EventEnvelope {
/// UTC timestamp in milliseconds since epoch (1/1/1970)
timestamp: u64,
pub timestamp: u64,
/// Transaction digest of associated transaction, if any
tx_digest: Option<TransactionDigest>,
pub tx_digest: Option<TransactionDigest>,
/// Specific event type
pub event: Event,
/// json value for MoveStruct (for MoveEvent only)
@@ -52,7 +53,7 @@ impl EventEnvelope {
}
}

#[derive(Eq, Debug, Clone, PartialEq, Deserialize, Serialize, Hash)]
#[derive(Eq, Debug, strum_macros::Display, Clone, PartialEq, Deserialize, Serialize, Hash)]
pub enum TransferType {
Coin,
ToAddress,
@@ -62,9 +63,19 @@ pub enum TransferType {
/// Specific type of event
#[serde_as]
#[derive(
Eq, Debug, Clone, PartialEq, NamedVariant, Deserialize, Serialize, Hash, EnumDiscriminants,
Eq,
Debug,
Clone,
PartialEq,
NamedVariant,
Deserialize,
Serialize,
Hash,
EnumDiscriminants,
EnumVariantNames,
)]
#[strum_discriminants(name(EventType))]
// Developer note: PLEASE only append new entries, do not modify existing entries (binary compat)
pub enum Event {
/// Move-specific event
MoveEvent {
@@ -96,31 +107,53 @@ impl Event {
Event::MoveEvent { type_, contents }
}

pub fn name_from_ordinal(ordinal: usize) -> &'static str {
Event::VARIANTS[ordinal]
}

/// Returns the EventType associated with an Event
pub fn event_type(&self) -> EventType {
self.into()
}

/// Returns the object or package ID associated with the event, if available. Specifically:
/// - For TransferObject: the object ID being transferred (eg moving child from parent, its the child)
/// - for Publish, the package ID (which is the object ID of the module)
/// - for DeleteObject and NewObject, the Object ID
pub fn object_id(&self) -> Option<ObjectID> {
match self {
Event::Publish { package_id } => Some(*package_id),
Event::TransferObject { object_id, .. } => Some(*object_id),
Event::DeleteObject(obj_id) => Some(*obj_id),
Event::NewObject(obj_id) => Some(*obj_id),
_ => None,
}
}

/// Extract a module ID, if available, from a SuiEvent
pub fn module_id(&self) -> Option<ModuleId> {
/// Extracts the Move package ID associated with the event, or the package published.
pub fn package_id(&self) -> Option<ObjectID> {
match self {
Event::MoveEvent { type_, .. } => Some(type_.address.into()),
Event::Publish { package_id } => Some(*package_id),
_ => None,
}
}

/// Extract a module name, if available, from a SuiEvent
// TODO: should we switch to IdentStr or &str? These are more complicated to make work due to lifetimes
pub fn module_name(&self) -> Option<&str> {
match self {
Event::MoveEvent {
type_: struct_tag, ..
} => Some(struct_tag.module.as_ident_str().as_str()),
_ => None,
}
}

/// Extracts the function name from a SuiEvent, if available
pub fn function_name(&self) -> Option<String> {
match self {
Event::MoveEvent {
type_: struct_tag, ..
} => Some(struct_tag.module_id()),
} => Some(struct_tag.name.to_string()),
_ => None,
}
}
71 changes: 56 additions & 15 deletions crates/workspace-hack/Cargo.toml

Large diffs are not rendered by default.

0 comments on commit 74d519b

Please sign in to comment.