Skip to content

Commit

Permalink
Updates to snarkos-storage
Browse files Browse the repository at this point in the history
  • Loading branch information
howardwu committed Oct 15, 2021
1 parent 2e6f503 commit f3dcb32
Show file tree
Hide file tree
Showing 58 changed files with 970 additions and 2,316 deletions.
467 changes: 356 additions & 111 deletions Cargo.lock

Large diffs are not rendered by default.

3 changes: 1 addition & 2 deletions consensus/src/consensus/inner/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,7 @@ impl ConsensusInner {
}
},
ConsensusMessage::FetchMemoryPool(size) => {
let out: Vec<SerialTransaction> =
self.memory_pool.get_candidates(size).into_iter().cloned().collect();
let out: Vec<Transaction<N>> = self.memory_pool.get_candidates(size).into_iter().cloned().collect();
response.send(Box::new(out)).ok();
}
ConsensusMessage::CreateTransaction(request) => {
Expand Down
12 changes: 6 additions & 6 deletions consensus/src/consensus/inner/commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use tokio::task;

impl ConsensusInner {
/// Receive a block from an external source and process it based on ledger state.
pub(super) async fn receive_block(&mut self, block: &SerialBlock) -> Result<(), ConsensusError> {
pub(super) async fn receive_block(&mut self, block: &Block<N>) -> Result<(), ConsensusError> {
self.storage.insert_block(block).await?;

let hash = block.header.hash();
Expand All @@ -41,7 +41,7 @@ impl ConsensusInner {
Ok(())
}

pub(super) async fn try_commit_block(&mut self, hash: &Digest, block: &SerialBlock) -> Result<(), ConsensusError> {
pub(super) async fn try_commit_block(&mut self, hash: &Digest, block: &Block<N>) -> Result<(), ConsensusError> {
let canon = self.storage.canon().await?;

match self.storage.get_block_state(&block.header.previous_block_hash).await? {
Expand Down Expand Up @@ -145,7 +145,7 @@ impl ConsensusInner {
pub(super) async fn verify_and_commit_block(
&mut self,
hash: &Digest,
block: &SerialBlock,
block: &Block<N>,
) -> Result<(), ConsensusError> {
let now = std::time::Instant::now();

Expand Down Expand Up @@ -175,7 +175,7 @@ impl ConsensusInner {

/// Check if the block is valid.
/// Verify transactions and transaction fees.
pub(super) async fn verify_block(&mut self, block: &SerialBlock) -> Result<bool, ConsensusError> {
pub(super) async fn verify_block(&mut self, block: &Block<N>) -> Result<bool, ConsensusError> {
let canon = self.storage.canon().await?;
// Verify the block header
if block.header.previous_block_hash != canon.hash {
Expand Down Expand Up @@ -300,7 +300,7 @@ impl ConsensusInner {
)
}

async fn inner_commit_block(&mut self, block: &SerialBlock) -> Result<Digest, ConsensusError> {
async fn inner_commit_block(&mut self, block: &Block<N>) -> Result<Digest, ConsensusError> {
let mut commitments = vec![];
let mut serial_numbers = vec![];
let mut memos = vec![];
Expand All @@ -320,7 +320,7 @@ impl ConsensusInner {
Ok(digest)
}

pub(super) async fn commit_block(&mut self, hash: &Digest, block: &SerialBlock) -> Result<(), ConsensusError> {
pub(super) async fn commit_block(&mut self, hash: &Digest, block: &Block<N>) -> Result<(), ConsensusError> {
let digest = self.inner_commit_block(block).await?;

self.storage.commit_block(hash, digest).await?;
Expand Down
27 changes: 7 additions & 20 deletions consensus/src/consensus/inner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@
// You should have received a copy of the GNU General Public License
// along with the snarkOS library. If not, see <https://www.gnu.org/licenses/>.

use std::sync::Arc;

use super::message::{ConsensusMessage, CreateTransactionRequest, TransactionResponse};
use crate::{
error::ConsensusError,
memory_pool::MempoolEntry,
Expand All @@ -25,26 +24,14 @@ use crate::{
DynLedger,
MemoryPool,
};
use anyhow::*;
use snarkos_storage::{
BlockStatus,
Digest,
DynStorage,
ForkDescription,
SerialBlock,
SerialTransaction,
VMTransaction,
};
use snarkvm_dpc::{
testnet1::{instantiated::Components, Record as DPCRecord, TransactionKernel},
DPCScheme,
};
use snarkos_storage::{BlockStatus, Digest, DynStorage, ForkDescription};
use snarkvm_dpc::{testnet1::instantiated::Components, Record};
use snarkvm_posw::txids_to_roots;
use snarkvm_utilities::has_duplicates;

use anyhow::*;
use rand::thread_rng;

use super::message::{ConsensusMessage, CreateTransactionRequest, TransactionResponse};
use std::sync::Arc;

mod agent;
mod commit;
Expand All @@ -54,15 +41,15 @@ pub struct ConsensusInner {
pub public: Arc<Consensus>,
pub ledger: DynLedger,
pub memory_pool: MemoryPool,
pub storage: DynStorage,
pub storage: DynStorage<N>,
pub recommit_taint: Option<u32>, // height of first recommitted block
}

impl ConsensusInner {
/// Adds entry to memory pool if valid in the current ledger.
pub(crate) fn insert_into_mempool(
&mut self,
transaction: SerialTransaction,
transaction: Transaction<N>,
) -> Result<Option<Digest>, ConsensusError> {
let transaction_id: Digest = transaction.id.into();

Expand Down
19 changes: 9 additions & 10 deletions consensus/src/consensus/inner/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,15 @@
// You should have received a copy of the GNU General Public License
// along with the snarkOS library. If not, see <https://www.gnu.org/licenses/>.

use snarkos_storage::VMRecord;
use snarkvm_dpc::testnet1::instantiated::{Testnet1DPC, Testnet1Transaction};
use snarkvm_dpc::{testnet1::Testnet1DPC, Transaction};
use tokio::task;

use crate::DeserializedLedger;

use super::*;

impl ConsensusInner {
pub(super) async fn receive_transaction(&mut self, transaction: Box<SerialTransaction>) -> bool {
pub(super) async fn receive_transaction(&mut self, transaction: Box<Transaction<N>>) -> bool {
if transaction.value_balance.is_negative() {
error!("Received a transaction that was a coinbase transaction");
return false;
Expand Down Expand Up @@ -59,7 +58,7 @@ impl ConsensusInner {
/// Check if the transactions are valid.
pub(super) async fn verify_transactions(
&mut self,
transactions: Vec<SerialTransaction>,
transactions: Vec<Transaction<N>>,
) -> Result<bool, ConsensusError> {
let consensus = self.public.clone();
self.push_recommit_taint().await?;
Expand All @@ -86,7 +85,7 @@ impl ConsensusInner {

let verification_result = consensus
.dpc
.verify_transactions(&deserialized[..], &ledger.deserialize::<Components>());
.verify_transactions(&deserialized[..], &ledger.deserialize::<N>());

(ledger, Ok(verification_result))
})
Expand Down Expand Up @@ -114,12 +113,12 @@ impl ConsensusInner {
request
.old_records
.iter()
.map(|x| <DPCRecord<Components> as VMRecord>::deserialize(x))
.map(|x| <DPCRecord<N> as VMRecord>::deserialize(x))
.collect::<Result<Vec<_>, _>>()?,
request
.new_records
.into_iter()
.map(|x| DPCRecord::<Components>::deserialize(&x))
.map(|x| DPCRecord::<N>::deserialize(&x))
.collect::<Result<Vec<_>>>()?,
request.memo,
&mut rng,
Expand All @@ -134,7 +133,7 @@ impl ConsensusInner {
&old_private_keys,
transaction_kernel,
program_proofs,
&self.ledger.deserialize::<Components>(),
&self.ledger.deserialize::<N>(),
&mut rng,
)?;

Expand All @@ -155,7 +154,7 @@ impl ConsensusInner {

let mut rng = thread_rng();
// Offline execution to generate a DPC transaction
let transaction_kernel: Box<TransactionKernel<Components>> = request
let transaction_kernel: Box<TransactionKernel<N>> = request
.kernel
.downcast()
.expect("illegal kernel passed to create partial transaction");
Expand All @@ -170,7 +169,7 @@ impl ConsensusInner {
&old_private_keys,
*transaction_kernel,
program_proofs,
&self.ledger.deserialize::<Components>(),
&self.ledger.deserialize::<N>(),
&mut rng,
)?;

Expand Down
19 changes: 10 additions & 9 deletions consensus/src/consensus/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,17 @@
// You should have received a copy of the GNU General Public License
// along with the snarkOS library. If not, see <https://www.gnu.org/licenses/>.

use std::any::Any;
use snarkos_storage::PrivateKey;
use snarkvm_dpc::{Block, Record, Transaction};

use snarkos_storage::{PrivateKey, SerialBlock, SerialRecord, SerialTransaction};
use std::any::Any;
use tokio::sync::oneshot;

#[derive(Debug)]
pub struct CreateTransactionRequest {
pub old_records: Vec<SerialRecord>,
pub old_records: Vec<Record<N>>,
pub old_account_private_keys: Vec<PrivateKey>,
pub new_records: Vec<SerialRecord>,
pub new_records: Vec<Record<N>>,
pub memo: [u8; 32],
}
pub struct CreatePartialTransactionRequest {
Expand All @@ -33,14 +34,14 @@ pub struct CreatePartialTransactionRequest {
}

pub struct TransactionResponse {
pub records: Vec<SerialRecord>,
pub transaction: SerialTransaction,
pub records: Vec<Record<N>>,
pub transaction: Transaction<N>,
}

pub(super) enum ConsensusMessage {
ReceiveTransaction(Box<SerialTransaction>),
VerifyTransactions(Vec<SerialTransaction>),
ReceiveBlock(Box<SerialBlock>),
ReceiveTransaction(Box<Transaction<N>>),
VerifyTransactions(Vec<Transaction<N>>),
ReceiveBlock(Box<Block<N>>),
FetchMemoryPool(usize), // max size of memory pool to fetch
CreateTransaction(Box<CreateTransactionRequest>),
CreatePartialTransaction(CreatePartialTransactionRequest),
Expand Down
42 changes: 19 additions & 23 deletions consensus/src/consensus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,30 +14,26 @@
// You should have received a copy of the GNU General Public License
// along with the snarkOS library. If not, see <https://www.gnu.org/licenses/>.

use std::{convert::TryInto, sync::Arc};

use rand::{thread_rng, Rng};
use crate::{error::ConsensusError, ConsensusParameters, DynLedger, MemoryPool};
use snarkos_metrics::wrapped_mpsc;
use snarkos_storage::{Address, Digest, DynStorage, SerialBlock, SerialRecord, SerialTransaction, VMRecord};
use snarkos_storage::{Address, Digest, DynStorage};
use snarkvm_algorithms::CRH;
use snarkvm_dpc::{
testnet1::{
instantiated::{Components, Testnet1DPC},
payload::Payload,
Record as DPCRecord,
},
testnet1::instantiated::{Components, Testnet1DPC},
Account,
AccountScheme,
AleoAmount,
DPCComponents,
Block,
Payload,
ProgramScheme,
Record,
Transaction,
};
use snarkvm_utilities::{to_bytes_le, ToBytes};
use tokio::sync::oneshot;

use crate::{error::ConsensusError, ConsensusParameters, DynLedger, MemoryPool};

use anyhow::*;
use rand::{thread_rng, Rng};
use std::{convert::TryInto, sync::Arc};
use tokio::sync::oneshot;

mod inner;
pub use inner::ConsensusInner;
Expand All @@ -49,8 +45,8 @@ mod utility;
pub struct Consensus {
pub parameters: ConsensusParameters,
pub dpc: Arc<Testnet1DPC>,
pub storage: DynStorage,
genesis_block: SerialBlock,
pub storage: DynStorage<N>,
genesis_block: Block<N>,
sender: wrapped_mpsc::Sender<ConsensusMessageWrapped>,
}

Expand All @@ -59,9 +55,9 @@ impl Consensus {
pub async fn new(
parameters: ConsensusParameters,
dpc: Arc<Testnet1DPC>,
genesis_block: SerialBlock,
genesis_block: Block<N>,
ledger: DynLedger,
storage: DynStorage,
storage: DynStorage<N>,
memory_pool: MemoryPool,
) -> Arc<Self> {
let (sender, receiver) = wrapped_mpsc::channel(snarkos_metrics::queues::CONSENSUS, 256);
Expand Down Expand Up @@ -110,29 +106,29 @@ impl Consensus {
}

/// Receives a live transaction (into the memory pool)
pub async fn receive_transaction(&self, transaction: SerialTransaction) -> bool {
pub async fn receive_transaction(&self, transaction: Transaction<N>) -> bool {
self.send(ConsensusMessage::ReceiveTransaction(Box::new(transaction)))
.await
}

/// Verify a set of transactions
/// Used for tests and RPC
pub async fn verify_transactions(&self, transactions: Vec<SerialTransaction>) -> bool {
pub async fn verify_transactions(&self, transactions: Vec<Transaction<N>>) -> bool {
self.send(ConsensusMessage::VerifyTransactions(transactions)).await
}

/// Receives any block into consensus
pub async fn receive_block(&self, block: SerialBlock) -> bool {
pub async fn receive_block(&self, block: Block<N>) -> bool {
self.send(ConsensusMessage::ReceiveBlock(Box::new(block))).await
}

pub async fn shallow_receive_block(&self, block: SerialBlock) -> Result<()> {
pub async fn shallow_receive_block(&self, block: Block<N>) -> Result<()> {
self.storage.insert_block(&block).await?;
Ok(())
}

/// Fetches a snapshot of the memory pool
pub async fn fetch_memory_pool(&self) -> Vec<SerialTransaction> {
pub async fn fetch_memory_pool(&self) -> Vec<Transaction<N>> {
self.send(ConsensusMessage::FetchMemoryPool(self.parameters.max_block_size))
.await
}
Expand Down
Loading

0 comments on commit f3dcb32

Please sign in to comment.