Skip to content

Commit

Permalink
Block num by tx out index (#275)
Browse files Browse the repository at this point in the history
* add tx out index -> block index store

* add migration code

* support incremental upgrades

* debug -> trace

* lint
  • Loading branch information
eranrund authored Jul 8, 2020
1 parent c1b4ec3 commit ec85a10
Show file tree
Hide file tree
Showing 4 changed files with 220 additions and 39 deletions.
3 changes: 3 additions & 0 deletions ledger/db/src/ledger_trait.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ pub trait Ledger: Clone + Send {
/// Gets a block signature by its index in the blockchain.
fn get_block_signature(&self, block_number: u64) -> Result<BlockSignature, Error>;

/// Gets block index by a TxOut global index.
fn get_block_index_by_tx_out_index(&self, tx_out_index: u64) -> Result<u64, Error>;

/// Get the total number of TxOuts in the ledger.
fn num_txos(&self) -> Result<u64, Error>;

Expand Down
205 changes: 177 additions & 28 deletions ledger/db/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ pub const BLOCK_SIGNATURES_DB_NAME: &str = "ledger_db:block_signatures";
pub const KEY_IMAGES_DB_NAME: &str = "ledger_db:key_images";
pub const KEY_IMAGES_BY_BLOCK_DB_NAME: &str = "ledger_db:key_images_by_block";
pub const TX_OUTS_BY_BLOCK_DB_NAME: &str = "ledger_db:tx_outs_by_block";
pub const BLOCK_NUMBER_BY_TX_OUT_INDEX: &str = "ledger_db:block_number_by_tx_out_index";

// Keys used by the `counts` database.
const NUM_BLOCKS_KEY: &str = "num_blocks";
Expand Down Expand Up @@ -99,6 +100,10 @@ pub struct LedgerDB {
/// querying `tx_out_store`.
tx_outs_by_block: Database,

/// TxOut global index -> block number.
/// This map allows retrieval of the block a given TxOut belongs to.
block_number_by_tx_out_index: Database,

/// Location on filesystem.
path: PathBuf,
}
Expand Down Expand Up @@ -195,6 +200,14 @@ impl Ledger for LedgerDB {
Ok(signature)
}

/// Gets block index by a TxOut global index.
fn get_block_index_by_tx_out_index(&self, tx_out_index: u64) -> Result<u64, Error> {
let db_transaction = self.env.begin_ro_txn()?;
let key = u64_to_key_bytes(tx_out_index);
let block_index_bytes = db_transaction.get(self.block_number_by_tx_out_index, &key)?;
Ok(key_bytes_to_u64(&block_index_bytes))
}

/// Returns the index of the TxOut with the given hash.
fn get_tx_out_index_by_hash(&self, tx_out_hash: &[u8; 32]) -> Result<u64, Error> {
let db_transaction: RoTransaction = self.env.begin_ro_txn()?;
Expand Down Expand Up @@ -279,46 +292,68 @@ impl LedgerDB {
#[allow(clippy::unreadable_literal)]
pub fn open(path: PathBuf) -> Result<LedgerDB, Error> {
let env = Environment::new()
.set_max_dbs(21)
.set_max_dbs(22)
.set_map_size(MAX_LMDB_FILE_SIZE)
// TODO - needed because currently our test cloud machines have slow disks.
.set_flags(EnvironmentFlags::NO_SYNC)
.open(&path)?;

// Check if the database we opened is compatible with the current implementation.
let metadata_store = MetadataStore::new(&env)?;
let db_txn = env.begin_ro_txn()?;
let version = metadata_store.get_version(&db_txn)?;
global_log::info!("Ledger db is currently at version: {:?}", version);
db_txn.commit()?;

match version.is_compatible_with_latest() {
Ok(_) => {}
// Version 20200610 introduced the TxOut public key -> index store.
Err(Error::VersionIncompatible(20200427, 20200610)) => {
global_log::info!("Ledger db migrating from version 20200427 to 20200610, this might take awhile...");

TxOutStore::construct_tx_out_index_by_public_key_from_existing_data(&env)?;

let mut db_txn = env.begin_rw_txn()?;
metadata_store.set_version_to_latest(&mut db_txn)?;
global_log::info!(
"Ledger db migration complete, now at version: {:?}",
metadata_store.get_version(&db_txn),
);
db_txn.commit()?;
}
Err(err) => {
return Err(err);
}
};
loop {
// Check if the database we opened is compatible with the current implementation.
let db_txn = env.begin_ro_txn()?;
let version = metadata_store.get_version(&db_txn)?;
global_log::info!("Ledger db is currently at version: {:?}", version);
db_txn.commit()?;

match version.is_compatible_with_latest() {
Ok(_) => {
break;
}
// Version 20200610 introduced the TxOut public key -> index store.
Err(Error::VersionIncompatible(20200427, 20200610))
| Err(Error::VersionIncompatible(20200427, 20200707)) => {
global_log::info!("Ledger db migrating from version 20200427 to 20200610, this might take awhile...");

TxOutStore::construct_tx_out_index_by_public_key_from_existing_data(&env)?;

let mut db_txn = env.begin_rw_txn()?;
metadata_store.set_version(&mut db_txn, 20200610)?;
global_log::info!(
"Ledger db migration complete, now at version: {:?}",
metadata_store.get_version(&db_txn),
);
db_txn.commit()?;
}
// Version 20200707 introduced the TxOut global index -> block index store.
Err(Error::VersionIncompatible(20200610, 20200707)) => {
global_log::info!("Ledger db migrating from version 20200610 to 20200707, this might take awhile...");

Self::construct_block_number_by_tx_out_index_from_existing_data(&env)?;

let mut db_txn = env.begin_rw_txn()?;
metadata_store.set_version_to_latest(&mut db_txn)?;
global_log::info!(
"Ledger db migration complete, now at version: {:?}",
metadata_store.get_version(&db_txn),
);
db_txn.commit()?;
}
// Don't know how to migrate.
Err(err) => {
return Err(err);
}
};
}

let counts = env.open_db(Some(COUNTS_DB_NAME))?;
let blocks = env.open_db(Some(BLOCKS_DB_NAME))?;
let block_signatures = env.open_db(Some(BLOCK_SIGNATURES_DB_NAME))?;
let key_images = env.open_db(Some(KEY_IMAGES_DB_NAME))?;
let key_images_by_block = env.open_db(Some(KEY_IMAGES_BY_BLOCK_DB_NAME))?;
let tx_outs_by_block = env.open_db(Some(TX_OUTS_BY_BLOCK_DB_NAME))?;
let block_number_by_tx_out_index = env.open_db(Some(BLOCK_NUMBER_BY_TX_OUT_INDEX))?;

let tx_out_store = TxOutStore::new(&env)?;

Expand All @@ -331,6 +366,7 @@ impl LedgerDB {
key_images,
key_images_by_block,
tx_outs_by_block,
block_number_by_tx_out_index,
metadata_store,
tx_out_store,
})
Expand All @@ -339,7 +375,7 @@ impl LedgerDB {
/// Creates a fresh Ledger Database in the given path.
pub fn create(path: PathBuf) -> Result<(), Error> {
let env = Environment::new()
.set_max_dbs(20)
.set_max_dbs(22)
.set_map_size(MAX_LMDB_FILE_SIZE)
.open(&path)
.unwrap_or_else(|_| {
Expand All @@ -355,6 +391,7 @@ impl LedgerDB {
env.create_db(Some(KEY_IMAGES_DB_NAME), DatabaseFlags::empty())?;
env.create_db(Some(KEY_IMAGES_BY_BLOCK_DB_NAME), DatabaseFlags::empty())?;
env.create_db(Some(TX_OUTS_BY_BLOCK_DB_NAME), DatabaseFlags::empty())?;
env.create_db(Some(BLOCK_NUMBER_BY_TX_OUT_INDEX), DatabaseFlags::empty())?;

MetadataStore::create(&env)?;
TxOutStore::create(&env)?;
Expand Down Expand Up @@ -463,12 +500,21 @@ impl LedgerDB {
)?;

// Write the actual TxOuts.
let block_index_bytes = u64_to_key_bytes(block_index);

for tx_out in tx_outs {
if self.contains_tx_out_public_key(&tx_out.public_key)? {
return Err(Error::DuplicateOutputPublicKey);
}

self.tx_out_store.push(tx_out, db_transaction)?;
let tx_out_index = self.tx_out_store.push(tx_out, db_transaction)?;

db_transaction.put(
self.block_number_by_tx_out_index,
&u64_to_key_bytes(tx_out_index),
&block_index_bytes,
WriteFlags::NO_OVERWRITE,
)?;
}

// Done.
Expand Down Expand Up @@ -538,6 +584,61 @@ impl LedgerDB {
// All good
Ok(())
}

/// A utility function for constructing the block_number_by_tx_out_index store using existing
/// data.
fn construct_block_number_by_tx_out_index_from_existing_data(
env: &Environment,
) -> Result<(), Error> {
// When constructing the block index by tx out index database, we first need to create it.
let block_number_by_tx_out_index_db =
env.create_db(Some(BLOCK_NUMBER_BY_TX_OUT_INDEX), DatabaseFlags::empty())?;

// Open pre-existing databases that has data we need.
let tx_outs_by_block_db = env.open_db(Some(TX_OUTS_BY_BLOCK_DB_NAME))?;
let counts_db = env.open_db(Some(COUNTS_DB_NAME))?;

// After the database has been created, populate it with the existing data.
let mut db_txn = env.begin_rw_txn()?;

let num_blocks = key_bytes_to_u64(&db_txn.get(counts_db, &NUM_BLOCKS_KEY)?);

let mut percents: u64 = 0;
for block_num in 0..num_blocks {
// Get information about the TxOuts in the block.
let bytes = db_txn.get(tx_outs_by_block_db, &u64_to_key_bytes(block_num))?;
let tx_outs_by_block: TxOutsByBlockValue = decode(&bytes)?;

global_log::trace!(
"Assigning tx outs #{} - #{} to block #{}",
tx_outs_by_block.first_tx_out_index,
tx_outs_by_block.first_tx_out_index + tx_outs_by_block.num_tx_outs,
block_num,
);

for i in 0..tx_outs_by_block.num_tx_outs {
let tx_out_index = tx_outs_by_block.first_tx_out_index + i;

db_txn.put(
block_number_by_tx_out_index_db,
&u64_to_key_bytes(tx_out_index),
&u64_to_key_bytes(block_num),
WriteFlags::NO_OVERWRITE,
)?;
}

// Throttled logging.
let new_percents = block_num * 100 / num_blocks;
if new_percents != percents {
percents = new_percents;
global_log::info!(
"Constructing block_number_by_tx_out_index: {}% complete",
percents
);
}
}
Ok(db_txn.commit()?)
}
}

// Specifies how we encode the u64 chunk number in lmdb
Expand Down Expand Up @@ -699,6 +800,9 @@ mod ledger_db_test {
let key_images = ledger_db.get_key_images_by_block(0).unwrap();
assert_eq!(key_images.len(), 0);

let block_index = ledger_db.get_block_index_by_tx_out_index(0).unwrap();
assert_eq!(block_index, 0);

// === Create and append a non-origin block. ===

let recipient_account_key = AccountKey::random(&mut rng);
Expand Down Expand Up @@ -747,6 +851,12 @@ mod ledger_db_test {
ledger_db.get_tx_out_by_index((i + 1) as u64).unwrap(),
*tx_out
);

// All tx outs are in the second block.
let block_index = ledger_db
.get_block_index_by_tx_out_index((i + 1) as u64)
.unwrap();
assert_eq!(block_index, 1);
}

assert!(ledger_db
Expand Down Expand Up @@ -878,6 +988,45 @@ mod ledger_db_test {
}
}

#[test]
// Getting a block number by tx out index should return the correct block number, if it exists.
fn test_get_block_index_by_tx_out_index() {
let mut ledger_db = create_db();
let n_blocks = 43;
let (_expected_blocks, expected_block_contents) = populate_db(&mut ledger_db, n_blocks, 1);

for (block_index, block_contents) in expected_block_contents.iter().enumerate() {
for tx_out in block_contents.outputs.iter() {
let tx_out_index = ledger_db
.get_tx_out_index_by_public_key(&tx_out.public_key)
.expect("Failed getting tx out index");

let block_index_by_tx_out = ledger_db
.get_block_index_by_tx_out_index(tx_out_index)
.expect("Failed getting block index by tx out index");
assert_eq!(block_index as u64, block_index_by_tx_out);
}
}
}

#[test]
// Getting a block index by a tx out index return an error if the tx out index doesn't exist.
fn test_get_block_index_by_tx_out_index_doesnt_exist() {
let mut ledger_db = create_db();
let n_blocks = 43;
populate_db(&mut ledger_db, n_blocks, 1);

let out_of_range = 999;

match ledger_db.get_block_index_by_tx_out_index(out_of_range) {
Ok(_block_index) => panic!("Should not return a block index."),
Err(Error::NotFound) => {
// This is expected.
}
Err(e) => panic!("Unexpected error {:?}", e),
}
}

#[test]
// `Ledger::contains_key_image` should find key images that exist.
fn test_contains_key_image() {
Expand Down
26 changes: 24 additions & 2 deletions ledger/db/src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use prost::Message;
// If this is properly maintained, we could check during ledger db opening for any
// incompatibilities, and either refuse to open or perform a migration.
#[allow(clippy::unreadable_literal)]
pub const LATEST_VERSION: u64 = 20200610;
pub const LATEST_VERSION: u64 = 20200707;

// Metadata information about the ledger databse.
#[derive(Clone, Message)]
Expand All @@ -25,8 +25,13 @@ pub struct MetadataVersion {
impl MetadataVersion {
/// Construct a MetadataVersion instance with the most up to date versioning information.
pub fn latest() -> Self {
Self::with_database_format_version(LATEST_VERSION)
}

/// Construct a MetadataVersion instance with a specific database_format_version.
pub fn with_database_format_version(database_format_version: u64) -> Self {
Self {
database_format_version: LATEST_VERSION,
database_format_version,
created_by_crate_version: env!("CARGO_PKG_VERSION").to_owned(),
}
}
Expand Down Expand Up @@ -95,4 +100,21 @@ impl MetadataStore {
WriteFlags::empty(),
)?)
}

// Set version to a specific version.
pub fn set_version(
&self,
db_txn: &mut RwTransaction,
database_format_version: u64,
) -> Result<(), Error> {
let metadata_version =
MetadataVersion::with_database_format_version(database_format_version);

Ok(db_txn.put(
self.metadata,
&METADATA_VERSION_KEY,
&encode(&metadata_version),
WriteFlags::empty(),
)?)
}
}
Loading

0 comments on commit ec85a10

Please sign in to comment.