Skip to content

Commit

Permalink
fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
Protryon committed Aug 26, 2021
1 parent 770c8b6 commit a81dd19
Show file tree
Hide file tree
Showing 9 changed files with 79 additions and 110 deletions.
5 changes: 1 addition & 4 deletions consensus/src/consensus/inner/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,12 @@ impl ConsensusInner {
async fn init(&mut self) -> Result<()> {
let canon = self.storage.canon().await?;
// no blocks present/genesis situation
if canon.block_height == 0 && canon.hash.is_empty() {
if canon.is_empty() {
// no blocks
let hash = self.public.genesis_block.header.hash();
let block = self.public.genesis_block.clone();
self.storage.insert_block(&block).await?;

let init_digest = self.ledger.extend(&[], &[], &[])?;
self.storage.store_init_digest(init_digest).await?;

self.commit_block(&hash, &block).await?;
}

Expand Down
35 changes: 12 additions & 23 deletions snarkos/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,11 @@ use snarkos_storage::{
export_canon_blocks,
key_value::KeyValueStore,
AsyncStorage,
DynStorage,
RocksDb,
SerialBlock,
SqliteStorage,
Storage,
VMBlock,
Validator,
};

use snarkvm_algorithms::{MerkleParameters, CRH, SNARK};
Expand Down Expand Up @@ -119,7 +118,7 @@ async fn start_server(config: Config) -> anyhow::Result<()> {
)?;

info!("Loading storage at '{}'...", path.to_str().unwrap_or_default());
let storage = {
let storage: DynStorage = {
let mut sqlite_path = path.clone();
sqlite_path.push("sqlite.db");

Expand All @@ -131,27 +130,17 @@ async fn start_server(config: Config) -> anyhow::Result<()> {
Arc::new(AsyncStorage::new(SqliteStorage::new(&sqlite_path)?))
};

/*
let storage = RocksDb::open(&path)?;
if storage.canon().await?.block_height == 0 {
let mut rocks_identity_path = path.clone();
rocks_identity_path.push("IDENTITY");
if rocks_identity_path.exists() {
info!("Empty sqlite DB with existing rocksdb found, migrating...");
let rocks_storage = RocksDb::open(&path)?;
let rocks_storage: DynStorage = Arc::new(AsyncStorage::new(KeyValueStore::new(rocks_storage)));

// For extra safety, validate storage too if a trim is requested.
let storage = if config.storage.validate || config.storage.trim {
let now = std::time::Instant::now();
let (_, moved_storage) = storage
.validate(None, snarkos_storage::validator::FixMode::Everything)
.await;
info!("Storage validated in {}ms", now.elapsed().as_millis());
if !config.storage.trim {
return Ok(());
}
moved_storage
} else {
storage
};
Arc::new(AsyncStorage::new(KeyValueStore::new(storage)))
*/
snarkos_storage::migrate(&rocks_storage, &storage).await?;
}
}

if let Some(max_head) = config.storage.max_head {
let canon_next = storage.get_block_hash(max_head + 1).await?;
Expand Down
3 changes: 3 additions & 0 deletions storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,8 @@ pub use digest::*;
pub mod validator;
pub use validator::*;

pub mod migrate;
pub use migrate::*;

/// The number of block hashes that are returned by the `Ledger::get_block_locator_hashes` call.
pub const NUM_LOCATOR_HASHES: u32 = 64;
49 changes: 49 additions & 0 deletions storage/src/migrate.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Copyright (C) 2019-2021 Aleo Systems Inc.
// This file is part of the snarkOS library.

// The snarkOS library is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// The snarkOS library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with the snarkOS library. If not, see <https://www.gnu.org/licenses/>.

use crate::DynStorage;
use anyhow::*;

pub async fn migrate(from: &DynStorage, to: &DynStorage) -> Result<()> {
let blocks = from.get_canon_blocks(None).await?;
let ledger_digests = from.get_ledger_digests().await?;
if blocks.len() != ledger_digests.len() {
return Err(anyhow!(
"canon, ledger digest lengths differed for migration -- corrupt DB?"
));
}

// transfer blocks
for (block, digest) in blocks.into_iter().zip(ledger_digests.into_iter()) {
to.insert_block(&block).await?;
to.commit_block(&block.header.hash(), digest).await?;
}

// transfer miner records
let record_commitments = from.get_record_commitments(None).await?;
let mut records = Vec::with_capacity(record_commitments.len());
for commitment in record_commitments {
records.push(
from.get_record(commitment)
.await?
.ok_or_else(|| anyhow!("missing record for commitment"))?,
);
}

to.store_records(&records[..]).await?;

Ok(())
}
7 changes: 0 additions & 7 deletions storage/src/storage/async_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ enum Message {
GetCanonBlocks(Option<u32>),
GetBlockHashes(Option<u32>, BlockFilter),
Validate(Option<u32>, FixMode),
StoreInitDigest(Digest),
#[cfg(feature = "test")]
StoreItem(KeyValueColumn, Vec<u8>, Vec<u8>),
#[cfg(feature = "test")]
Expand Down Expand Up @@ -132,7 +131,6 @@ impl fmt::Display for Message {
Message::GetCanonBlocks(limit) => write!(f, "GetCanonBlocks({:?})", limit),
Message::GetBlockHashes(limit, filter) => write!(f, "GetBlockHashes({:?}, {:?})", limit, filter),
Message::Validate(limit, fix_mode) => write!(f, "Validate({:?}, {:?})", limit, fix_mode),
Message::StoreInitDigest(digest) => write!(f, "StoreInitDigest({})", digest),
#[cfg(feature = "test")]
Message::StoreItem(col, key, value) => write!(f, "StoreItem({:?}, {:?}, {:?})", col, key, value),
#[cfg(feature = "test")]
Expand Down Expand Up @@ -197,7 +195,6 @@ impl<S: SyncStorage + 'static> Agent<S> {
Message::GetCanonBlocks(limit) => Box::new(self.inner.get_canon_blocks(limit)),
Message::GetBlockHashes(limit, filter) => Box::new(self.inner.get_block_hashes(limit, filter)),
Message::Validate(limit, fix_mode) => Box::new(self.inner.validate(limit, fix_mode)),
Message::StoreInitDigest(digest) => Box::new(self.wrap(move |f| f.store_init_digest(&digest))),
#[cfg(feature = "test")]
Message::StoreItem(col, key, value) => Box::new(self.wrap(move |f| f.store_item(col, key, value))),
#[cfg(feature = "test")]
Expand Down Expand Up @@ -377,10 +374,6 @@ impl Storage for AsyncStorage {
self.send(Message::Validate(limit, fix_mode)).await
}

async fn store_init_digest(&self, digest: Digest) -> Result<()> {
self.send(Message::StoreInitDigest(digest)).await
}

#[cfg(feature = "test")]
async fn store_item(&self, col: KeyValueColumn, key: Vec<u8>, value: Vec<u8>) -> Result<()> {
self.send(Message::StoreItem(col, key, value)).await
Expand Down
9 changes: 1 addition & 8 deletions storage/src/storage/key_value/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,7 @@ impl<S: KeyValueStorage + Validator + 'static> SyncStorage for KeyValueStore<S>
self.write_meta_u32(KEY_CURR_CM_INDEX, cm_index)?;
self.write_meta_u32(KEY_CURR_MEMO_INDEX, memo_index)?;

let is_genesis = canon.block_height == 0 && canon.hash.is_empty();
let is_genesis = canon.is_empty();

let new_best_block_number = if is_genesis { 0 } else { canon.block_height as u32 + 1 };

Expand Down Expand Up @@ -670,13 +670,6 @@ impl<S: KeyValueStorage + Validator + 'static> SyncStorage for KeyValueStore<S>
errors
}

fn store_init_digest(&mut self, digest: &Digest) -> Result<()> {
self.inner()
.store(KeyValueColumn::DigestIndex, &digest[..], &0u32.to_le_bytes()[..])?;
self.inner()
.store(KeyValueColumn::DigestIndex, &0u32.to_le_bytes()[..], &digest[..])
}

#[cfg(feature = "test")]
fn store_item(&mut self, col: KeyValueColumn, key: Vec<u8>, value: Vec<u8>) -> Result<()> {
self.inner().store(col, &key, &value)
Expand Down
69 changes: 7 additions & 62 deletions storage/src/storage/sqlite/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ fn read_static_blob<const S: usize>(row: &Row, index: usize) -> rusqlite::Result
}

impl SqliteStorage {
/// Counter used for tracking migrations, incremented on each schema change, and checked in [`migrate`] function below to update schema.
const SCHEMA_INDEX: u32 = 2;

fn migrate(&self, from: u32) -> Result<()> {
Expand Down Expand Up @@ -343,26 +344,13 @@ impl SyncStorage for SqliteStorage {
}

fn get_block_states(&mut self, hashes: &[Digest]) -> Result<Vec<BlockStatus>> {
// intentional N+1 query since rusqlite doesn't support WHERE ... IN here and it doesn't matter at the moment
let mut out = Vec::with_capacity(hashes.len());
for hash in hashes {
let state = self.get_block_state(hash)?;
out.push(state);
}
Ok(out)
// let dyn_hashes: Vec<&dyn ToSql> = hashes.iter().map(|x| -> &dyn ToSql { x }).collect::<Vec<_>>();
// let mut stmt = self.conn.prepare_cached("SELECT hash, canon_height FROM blocks WHERE hash IN rarray(?)")?;
// let mut output = IndexMap::<Digest, BlockStatus>::new();
// for hash in hashes {
// output.insert(hash.clone(), BlockStatus::Unknown);
// }
// for row in stmt.query_map(&dyn_hashes[..], |row| Ok((row.get(0)?, row.get(1)?)))? {
// let (hash, canon_height): (Digest, Option<usize>) = row?;
// *output.get_mut(&hash).unwrap() = match canon_height {
// Some(height) => BlockStatus::Committed(height),
// None => BlockStatus::Uncommitted,
// };
// }
// Ok(output.into_iter().map(|x| x.1).collect())
}

fn get_block(&mut self, hash: &Digest) -> Result<SerialBlock> {
Expand Down Expand Up @@ -424,9 +412,10 @@ impl SyncStorage for SqliteStorage {
BlockStatus::Unknown => return Err(anyhow!("attempted to commit unknown block")),
_ => (),
}
let next_canon_height = if canon.is_empty() { 0 } else { canon.block_height + 1 };
self.conn.execute(
r"UPDATE blocks SET canon_height = ?, canon_ledger_digest = ? WHERE hash = ?",
params![canon.block_height + 1, ledger_digest, hash],
params![next_canon_height, ledger_digest, hash],
)?;
self.get_block_state(hash)
}
Expand Down Expand Up @@ -516,7 +505,7 @@ impl SyncStorage for SqliteStorage {
transaction_blocks.block_order,
blocks.hash
FROM transactions
INNER JOIN transaction_blocks ON transaction_blocks.tranasction_id = transactions.id
INNER JOIN transaction_blocks ON transaction_blocks.transaction_id = transactions.id
INNER JOIN blocks ON blocks.id = transaction_blocks.block_id
WHERE transactions.transaction_id = ? AND blocks.canon_height IS NOT NULL
",
Expand Down Expand Up @@ -579,51 +568,6 @@ impl SyncStorage for SqliteStorage {
.map_err(Into::into)
}

fn store_init_digest(&mut self, digest: &Digest) -> Result<()> {
let mut block_query = self.conn.prepare_cached(
r"
INSERT INTO blocks (
hash,
previous_block_hash,
merkle_root_hash,
pedersen_merkle_root_hash,
proof,
time,
difficulty_target,
nonce,
canon_height,
canon_ledger_digest
)
VALUES (
?,
?,
?,
?,
?,
?,
?,
?,
?,
?
)
",
)?;
let empty = [0u8; 32];
block_query.execute::<&[&dyn ToSql]>(params![
&digest,
&empty[..],
&empty[..],
&empty[..],
&empty[..],
0u32,
0u32,
0u32,
0u32,
&empty[..]
])?;
Ok(())
}

fn get_record_commitments(&mut self, limit: Option<usize>) -> Result<Vec<Digest>> {
let mut stmt = self.conn.prepare_cached(
r"
Expand Down Expand Up @@ -896,7 +840,8 @@ impl SyncStorage for SqliteStorage {
}

fn validate(&mut self, _limit: Option<u32>, _fix_mode: FixMode) -> Vec<ValidatorError> {
unimplemented!()
warn!("called validator on sqlite, which is a NOP");
vec![]
}

#[cfg(feature = "test")]
Expand Down
9 changes: 6 additions & 3 deletions storage/src/storage/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,12 @@ pub struct CanonData {
pub hash: Digest,
}

impl CanonData {
pub fn is_empty(&self) -> bool {
self.block_height == 0 && self.hash.is_empty()
}
}

#[derive(Debug, Clone, Copy)]
pub enum BlockFilter {
CanonOnly(BlockOrder),
Expand Down Expand Up @@ -152,9 +158,6 @@ pub trait Storage: Send + Sync {
}
}

/// Stores the "pre-genesis" digest; only applicable to the genesis block txs.
async fn store_init_digest(&self, digest: Digest) -> Result<()>;

// miner convenience record management functions

/// Gets a list of stored record commitments subject to `limit`.
Expand Down
3 changes: 0 additions & 3 deletions storage/src/storage/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,9 +298,6 @@ pub trait SyncStorage {
/// Gets a transaction from a transaction id
fn get_transaction(&mut self, transaction_id: &Digest) -> Result<SerialTransaction>;

/// Stores the "pre-genesis" digest; only applicable to the genesis block txs.
fn store_init_digest(&mut self, digest: &Digest) -> Result<()>;

// miner convenience record management functions

/// Gets a list of stored record commitments subject to `limit`.
Expand Down

0 comments on commit a81dd19

Please sign in to comment.