Skip to content

Commit

Permalink
[rosetta] - remove genesis blob loading from rosetta (MystenLabs#7315)
Browse files Browse the repository at this point in the history
Rosetta no longer need to load balance changes from genesis blob after
this commit
MystenLabs@2a6ed91

This PR fixes rosetta CI check failure due to double counting the
genesis balances
  • Loading branch information
patrickkuo authored Jan 11, 2023
1 parent 99b5e46 commit f6fa7c5
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 116 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ services:
- -c
- |
/usr/local/bin/sui-rosetta generate-rosetta-cli-config &
/usr/local/bin/sui-rosetta start-online-remote-server --full-node-url http://sui-network:9000 --genesis-path ~/.sui/sui_config/genesis.blob
/usr/local/bin/sui-rosetta start-online-remote-server --full-node-url http://sui-network:9000
stdin_open: true
tty: true
rosetta-offline:
Expand Down
9 changes: 2 additions & 7 deletions crates/sui-rosetta/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use tokio::task::JoinHandle;
use tracing::info;

use mysten_metrics::spawn_monitored_task;
use sui_config::genesis::Genesis;
use sui_sdk::SuiClient;

use crate::errors::Error;
Expand Down Expand Up @@ -40,12 +39,8 @@ pub struct RosettaOnlineServer {
}

impl RosettaOnlineServer {
pub fn new(env: SuiEnv, client: SuiClient, genesis: Genesis, data_path: &Path) -> Self {
let blocks = Arc::new(PseudoBlockProvider::spawn(
client.clone(),
genesis,
data_path,
));
pub fn new(env: SuiEnv, client: SuiClient, data_path: &Path) -> Self {
let blocks = Arc::new(PseudoBlockProvider::spawn(client.clone(), data_path));
Self {
env,
context: OnlineServerContext::new(client, blocks),
Expand Down
14 changes: 2 additions & 12 deletions crates/sui-rosetta/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use serde_json::{json, Value};
use tracing::info;
use tracing::log::warn;

use sui_config::genesis::Genesis;
use sui_config::{sui_config_dir, Config, NodeConfig, SUI_FULLNODE_CONFIG, SUI_KEYSTORE_FILENAME};
use sui_node::{metrics, SuiNode};
use sui_rosetta::types::{CurveType, PrefundedAccount, SuiEnv};
Expand Down Expand Up @@ -45,8 +44,6 @@ pub enum RosettaServerCommand {
addr: SocketAddr,
#[clap(long)]
full_node_url: String,
#[clap(long, default_value = "genesis.blob")]
genesis_path: PathBuf,
#[clap(long, default_value = "data")]
data_path: PathBuf,
},
Expand Down Expand Up @@ -138,19 +135,13 @@ impl RosettaServerCommand {
env,
addr,
full_node_url,
genesis_path,
data_path,
} => {
info!(
"Starting Rosetta Online Server with remove Sui full node [{full_node_url}]."
);
let sui_client = wait_for_sui_client(full_node_url).await;
let rosetta = RosettaOnlineServer::new(
env,
sui_client,
Genesis::load(&genesis_path)?,
&data_path,
);
let rosetta = RosettaOnlineServer::new(env, sui_client, &data_path);
rosetta.serve(addr).await??;
}

Expand All @@ -171,11 +162,10 @@ impl RosettaServerCommand {
let registry_service = metrics::start_prometheus_server(config.metrics_address);
// Staring a full node for the rosetta server.
let rpc_address = format!("http://127.0.0.1:{}", config.json_rpc_address.port());
let genesis = config.genesis.genesis()?.clone();
let _node = SuiNode::start(&config, registry_service).await?;

let sui_client = wait_for_sui_client(rpc_address).await;
let rosetta = RosettaOnlineServer::new(env, sui_client, genesis, &data_path);
let rosetta = RosettaOnlineServer::new(env, sui_client, &data_path);
rosetta.serve(addr).await??;
}
};
Expand Down
2 changes: 1 addition & 1 deletion crates/sui-rosetta/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ pub async fn status(
current_block_identifier: current_block.block.block_identifier,
current_block_timestamp: current_block.block.timestamp,
genesis_block_identifier: blocks.genesis_block_identifier(),
oldest_block_identifier: Some(blocks.oldest_block_identifier().await?),
oldest_block_identifier: Some(blocks.oldest_block_identifier()?),
sync_status: Some(SyncStatus {
current_index: Some(index),
target_index: Some(target),
Expand Down
148 changes: 53 additions & 95 deletions crates/sui-rosetta/src/state.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use crate::operations::{Operation, Operations};
use crate::operations::Operations;
use crate::types::{
Block, BlockHash, BlockHeight, BlockIdentifier, BlockResponse, OperationType, Transaction,
TransactionIdentifier,
Expand All @@ -18,11 +18,10 @@ use std::path::Path;
use std::str::FromStr;
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use sui_config::genesis::Genesis;
use sui_sdk::rpc_types::SuiTransactionKind;
use sui_sdk::SuiClient;
use sui_storage::default_db_options;
use sui_types::base_types::{SuiAddress, TransactionDigest, TRANSACTION_DIGEST_LENGTH};
use sui_types::gas_coin::GasCoin;
use sui_types::base_types::SuiAddress;
use sui_types::query::TransactionQuery;
use tracing::{debug, error, info};
use typed_store::rocks::{DBMap, DBOptions};
Expand Down Expand Up @@ -60,7 +59,7 @@ pub trait BlockProvider {
async fn get_block_by_hash(&self, hash: BlockHash) -> Result<BlockResponse, Error>;
async fn current_block(&self) -> Result<BlockResponse, Error>;
fn genesis_block_identifier(&self) -> BlockIdentifier;
async fn oldest_block_identifier(&self) -> Result<BlockIdentifier, Error>;
fn oldest_block_identifier(&self) -> Result<BlockIdentifier, Error>;
async fn current_block_identifier(&self) -> Result<BlockIdentifier, Error>;
async fn get_balance_at_block(
&self,
Expand All @@ -73,7 +72,6 @@ pub trait BlockProvider {
pub struct PseudoBlockProvider {
database: Arc<BlockProviderTables>,
client: SuiClient,
genesis: Genesis,
}

#[async_trait]
Expand Down Expand Up @@ -123,13 +121,10 @@ impl BlockProvider for PseudoBlockProvider {
}

fn genesis_block_identifier(&self) -> BlockIdentifier {
BlockIdentifier {
index: 0,
hash: BlockHash::new([0u8; TRANSACTION_DIGEST_LENGTH]),
}
self.oldest_block_identifier().unwrap()
}

async fn oldest_block_identifier(&self) -> Result<BlockIdentifier, Error> {
fn oldest_block_identifier(&self) -> Result<BlockIdentifier, Error> {
self.database
.blocks
.iter()
Expand Down Expand Up @@ -168,11 +163,10 @@ impl BlockProvider for PseudoBlockProvider {
}

impl PseudoBlockProvider {
pub fn spawn(client: SuiClient, genesis: Genesis, db_path: &Path) -> Self {
pub fn spawn(client: SuiClient, db_path: &Path) -> Self {
let blocks = Self {
database: Arc::new(BlockProviderTables::open(db_path, None)),
client: client.clone(),
genesis,
};

let block_interval = option_env!("SUI_BLOCK_INTERVAL")
Expand All @@ -184,23 +178,9 @@ impl PseudoBlockProvider {
let f = blocks.clone();
spawn_monitored_task!(async move {
if f.database.is_empty() {
info!("Database is empty, indexing genesis block...");
let genesis = genesis_block(&f.genesis);
let genesis_txs = genesis
.block
.transactions
.iter()
.flat_map(|tx| tx.operations.clone())
.collect();

f.add_block_index(
&genesis.block.block_identifier,
&genesis.block.parent_block_identifier,
)
.unwrap();
if let Err(e) = f.update_balance(0, genesis_txs).await {
error!("Error updating balance, cause: {e:?}")
}
// We expect creating genesis block to success.
info!("Datastore is empty, processing genesis block.");
process_genesis_block(&client, &f).await.unwrap()
} else {
let current_block = f.current_block_identifier().await.unwrap();
info!("Resuming from block {}", current_block.index);
Expand All @@ -218,31 +198,23 @@ impl PseudoBlockProvider {

async fn create_next_block(&self, client: &SuiClient) -> Result<(), Error> {
let current_block = self.current_block_identifier().await?;
let total_tx = client.read_api().get_total_transaction_number().await?;
// Sui get_total_transaction_number starts from 1.
let total_tx = client.read_api().get_total_transaction_number().await? - 1;
if total_tx == 0 {
return Ok(());
}
if current_block.index < total_tx {
let tx_digests = if current_block.index == 0 {
client
.read_api()
.get_transactions(TransactionQuery::All, None, None, false)
.await?
.data
} else {
let cursor = current_block.hash;
let mut tx_digests = client
.read_api()
.get_transactions(TransactionQuery::All, Some(cursor), None, false)
.await?
.data;
if tx_digests.remove(0) != cursor {
return Err(Error::DataError(
"Incorrect transaction data returned from Sui.".to_string(),
));
}
tx_digests
};
let cursor = current_block.hash;
let mut tx_digests = client
.read_api()
.get_transactions(TransactionQuery::All, Some(cursor), None, false)
.await?
.data;
if tx_digests.remove(0) != cursor {
return Err(Error::DataError(
"Incorrect transaction data returned from Sui.".to_string(),
));
}

let mut index = current_block.index;
let mut parent_block_identifier = current_block;
Expand Down Expand Up @@ -328,10 +300,6 @@ impl PseudoBlockProvider {
parent_block_identifier: BlockIdentifier,
timestamp: u64,
) -> Result<BlockResponse, Error> {
if block_identifier.index == 0 {
return Ok(genesis_block(&self.genesis));
}

let tx = self
.client
.read_api()
Expand Down Expand Up @@ -373,10 +341,7 @@ fn extract_balance_changes_from_ops(
ops.into_iter()
.try_fold(HashMap::<SuiAddress, i128>::new(), |mut changes, op| {
match op.type_ {
OperationType::SuiBalanceChange
| OperationType::Gas
| OperationType::Genesis
| OperationType::PaySui => {
OperationType::SuiBalanceChange | OperationType::Gas | OperationType::PaySui => {
let addr = op
.account
.ok_or_else(|| {
Expand All @@ -394,46 +359,39 @@ fn extract_balance_changes_from_ops(
})
}

fn genesis_block(genesis: &Genesis) -> BlockResponse {
let id = BlockIdentifier {
async fn process_genesis_block(client: &SuiClient, f: &PseudoBlockProvider) -> Result<(), Error> {
let digest = *client
.read_api()
.get_transactions(TransactionQuery::All, None, Some(1), false)
.await?
.data
.first()
.ok_or_else(|| Error::InternalError(anyhow!("Cannot find genesis transaction.")))?;

let response = client.read_api().get_transaction(digest).await?;
if !response
.certificate
.data
.transactions
.iter()
.any(|tx| matches!(tx, SuiTransactionKind::Genesis(_)))
{
return Err(Error::InternalError(anyhow!(
"Transaction [{digest:?}] is not a Genesis transaction."
)));
}
let operations = response.try_into()?;
let block_identifier = BlockIdentifier {
index: 0,
hash: BlockHash::new([0u8; TRANSACTION_DIGEST_LENGTH]),
hash: digest,
};

let operations = genesis
.objects()
.iter()
.flat_map(|o| {
GasCoin::try_from(o)
.ok()
.and_then(|coin| o.owner.get_owner_address().ok().map(|addr| (addr, coin)))
})
.enumerate()
.map(|(index, (address, coin))| Operation::genesis(index as u64, address, coin))
.collect();

let transaction = Transaction {
transaction_identifier: TransactionIdentifier {
hash: TransactionDigest::new([0; 32]),
},
operations,
related_transactions: vec![],
metadata: None,
};
f.add_block_index(&block_identifier, &block_identifier)?;
f.update_balance(0, operations)
.await
.map_err(|e| anyhow!("Failed to update balance, cause : {e}",))?;

BlockResponse {
block: Block {
block_identifier: id,
parent_block_identifier: id,
timestamp: SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis() as u64,
transactions: vec![transaction],
metadata: None,
},
other_transactions: vec![],
}
Ok(())
}

#[derive(DBMapUtils)]
Expand Down

0 comments on commit f6fa7c5

Please sign in to comment.