diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index fd50cfd7..3eb1376f 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -161,12 +161,17 @@ jobs: - name: Install protoc run: sudo apt install -y protobuf-compiler - - name: Install llvm + - name: Get llvm install script run: wget https://apt.llvm.org/llvm.sh - run: chmod +x llvm.sh - - run: sudo ./llvm.sh all + + - name: Install llvm + id: install_llvm + continue-on-error: true + run: sudo ./llvm.sh all || true - name: Install stable toolchain + if: steps.install_llvm.outcome == 'success' && steps.install_llvm.conclusion == 'success' uses: dtolnay/rust-toolchain@stable - name: Add wasm32 target @@ -204,12 +209,17 @@ jobs: - name: Install protoc run: sudo apt install -y protobuf-compiler - - name: Install llvm + - name: Get llvm install script run: wget https://apt.llvm.org/llvm.sh - run: chmod +x llvm.sh - - run: sudo ./llvm.sh all + + - name: Install llvm + id: install_llvm + continue-on-error: true + run: sudo ./llvm.sh all || true - name: Install stable toolchain + if: steps.install_llvm.outcome == 'success' && steps.install_llvm.conclusion == 'success' uses: dtolnay/rust-toolchain@stable - name: Install wasm-pack diff --git a/Cargo.lock b/Cargo.lock index 17620c68..f24bb987 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -234,9 +234,9 @@ checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" [[package]] name = "axum" -version = "0.6.7" +version = "0.6.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2fb79c228270dcf2426e74864cabc94babb5dbab01a4314e702d2f16540e1591" +checksum = "13d8068b6ccb8b34db9de397c7043f91db8b4c66414952c6db944f238c4d3db3" dependencies = [ "async-trait", "axum-core", @@ -256,7 +256,6 @@ dependencies = [ "serde", "sync_wrapper", "tower", - "tower-http", "tower-layer", "tower-service", ] @@ -1237,12 +1236,6 @@ dependencies = [ "pin-project-lite", ] -[[package]] -name = "http-range-header" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0bfe8eed0a9285ef776bb792479ea3834e8b94e13d615c2f66d03dd50a435a29" - [[package]] name = "httparse" version = "1.8.0" @@ -1714,7 +1707,6 @@ dependencies = [ "async-trait", "derive_more", "futures", - "kaspa-addresses", "kaspa-consensus-core", "kaspa-hashes", "kaspa-notify", @@ -1845,11 +1837,13 @@ dependencies = [ "kaspa-addressmanager", "kaspa-connectionmanager", "kaspa-consensus-core", + "kaspa-consensus-notify", "kaspa-consensusmanager", "kaspa-core", "kaspa-hashes", "kaspa-mining", "kaspa-muhash", + "kaspa-notify", "kaspa-p2p-lib", "kaspa-utils", "log", @@ -1915,6 +1909,7 @@ dependencies = [ "kaspa-hashes", "kaspa-index-core", "kaspa-math", + "kaspa-mining", "kaspa-notify", "kaspa-txscript", "kaspa-utils", @@ -1947,13 +1942,17 @@ name = "kaspa-rpc-service" version = "0.1.0" dependencies = [ "async-trait", + "kaspa-addresses", "kaspa-consensus-core", "kaspa-consensus-notify", "kaspa-consensusmanager", "kaspa-core", "kaspa-hashes", "kaspa-index-core", + "kaspa-math", + "kaspa-mining", "kaspa-notify", + "kaspa-p2p-flows", "kaspa-rpc-core", "kaspa-txscript", "kaspa-utils", @@ -2010,6 +2009,7 @@ name = "kaspa-txscript" version = "0.1.0" dependencies = [ "blake2b_simd", + "borsh", "criterion", "hex", "indexmap", @@ -2022,6 +2022,7 @@ dependencies = [ "parking_lot", "rand 0.8.5", "secp256k1", + "serde", "sha2 0.10.6", "smallvec", "thiserror", @@ -3015,14 +3016,14 @@ dependencies = [ [[package]] name = "rustls" -version = "0.20.8" +version = "0.21.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fff78fc74d175294f4e83b28343315ffcfb114b156f0185e9741cb5570f50e2f" +checksum = "07180898a28ed6a7f7ba2311594308f595e3dd2e3c3812fa0a80a47b45f17e5d" dependencies = [ "log", "ring", + "rustls-webpki", "sct", - "webpki", ] [[package]] @@ -3034,6 +3035,16 @@ dependencies = [ "base64 0.21.0", ] +[[package]] +name = "rustls-webpki" +version = "0.100.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d6207cd5ed3d8dca7816f8f3725513a34609c0c765bf652b8c3cb4cfd87db46b" +dependencies = [ + "ring", + "untrusted", +] + [[package]] name = "rustversion" version = "1.0.12" @@ -3448,13 +3459,12 @@ dependencies = [ [[package]] name = "tokio-rustls" -version = "0.23.4" +version = "0.24.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c43ee83903113e03984cb9e5cebe6c04a5116269e900e3ddba8f068a62adda59" +checksum = "e0d409377ff5b1e3ca6437aa86c1eb7d40c134bfec254e44c830defa92669db5" dependencies = [ "rustls", "tokio", - "webpki", ] [[package]] @@ -3505,14 +3515,14 @@ dependencies = [ [[package]] name = "tonic" -version = "0.8.3" +version = "0.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8f219fad3b929bef19b1f86fbc0358d35daed8f2cac972037ac0dc10bbb8d5fb" +checksum = "38bd8e87955eb13c1986671838177d6792cdc52af9bffced0d2c8a9a7f741ab3" dependencies = [ "async-stream", "async-trait", "axum", - "base64 0.13.1", + "base64 0.21.0", "bytes", "flate2", "futures-core", @@ -3525,24 +3535,21 @@ dependencies = [ "percent-encoding", "pin-project", "prost", - "prost-derive", "rustls-pemfile", "tokio", "tokio-rustls", "tokio-stream", - "tokio-util", "tower", "tower-layer", "tower-service", "tracing", - "tracing-futures", ] [[package]] name = "tonic-build" -version = "0.8.4" +version = "0.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5bf5e9b9c0f7e0a7c027dcfaba7b2c60816c7049171f679d99ee2ff65d0de8c4" +checksum = "0f60a933bbea70c95d633c04c951197ddf084958abaa2ed502a3743bdd8d8dd7" dependencies = [ "prettyplease", "proc-macro2", @@ -3571,25 +3578,6 @@ dependencies = [ "tracing", ] -[[package]] -name = "tower-http" -version = "0.3.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f873044bf02dd1e8239e9c1293ea39dad76dc594ec16185d0a1bf31d8dc8d858" -dependencies = [ - "bitflags", - "bytes", - "futures-core", - "futures-util", - "http", - "http-body", - "http-range-header", - "pin-project-lite", - "tower", - "tower-layer", - "tower-service", -] - [[package]] name = "tower-layer" version = "0.3.2" @@ -3609,7 +3597,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8" dependencies = [ "cfg-if", - "log", "pin-project-lite", "tracing-attributes", "tracing-core", @@ -3635,16 +3622,6 @@ dependencies = [ "once_cell", ] -[[package]] -name = "tracing-futures" -version = "0.2.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "97d095ae15e245a057c8e8451bab9b3ee1e1f68e9ba2b4fbc18d0ac5237835f2" -dependencies = [ - "pin-project", - "tracing", -] - [[package]] name = "triggered" version = "0.1.2" @@ -3896,16 +3873,6 @@ dependencies = [ "wasm-bindgen", ] -[[package]] -name = "webpki" -version = "0.22.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f095d78192e208183081cc07bc5515ef55216397af48b873e5edcd72637fa1bd" -dependencies = [ - "ring", - "untrusted", -] - [[package]] name = "wepoll-ffi" version = "0.1.2" diff --git a/consensus/core/src/api/mod.rs b/consensus/core/src/api/mod.rs index 6ba0839f..4aa74a1c 100644 --- a/consensus/core/src/api/mod.rs +++ b/consensus/core/src/api/mod.rs @@ -3,6 +3,7 @@ use kaspa_muhash::MuHash; use std::sync::Arc; use crate::{ + acceptance_data::AcceptanceData, block::{Block, BlockTemplate}, blockstatus::BlockStatus, coinbase::MinerData, @@ -15,9 +16,10 @@ use crate::{ }, header::Header, pruning::{PruningPointProof, PruningPointsList}, - trusted::{TrustedBlock, TrustedGhostdagData, TrustedHeader}, + sync_info::SyncInfo, + trusted::{ExternalGhostdagData, TrustedBlock, TrustedGhostdagData, TrustedHeader}, tx::{MutableTransaction, Transaction, TransactionOutpoint, UtxoEntry}, - BlockHashSet, + BlockHashSet, ChainPath, }; use kaspa_hashes::Hash; pub type BlockValidationFuture = BoxFuture<'static, BlockProcessResult>; @@ -55,7 +57,15 @@ pub trait ConsensusApi: Send + Sync { unimplemented!() } - fn get_sink_timestamp(&self) -> Option { + fn get_sink(&self) -> Hash { + unimplemented!() + } + + fn get_sink_timestamp(&self) -> u64 { + unimplemented!() + } + + fn get_sync_info(&self) -> SyncInfo { unimplemented!() } @@ -66,6 +76,10 @@ pub trait ConsensusApi: Send + Sync { unimplemented!() } + fn get_virtual_chain_from_block(&self, hash: Hash) -> ConsensusResult { + unimplemented!() + } + fn get_virtual_parents(&self) -> BlockHashSet { unimplemented!() } @@ -123,6 +137,10 @@ pub trait ConsensusApi: Send + Sync { unimplemented!() } + fn get_anticone(&self, hash: Hash) -> ConsensusResult> { + unimplemented!() + } + fn get_pruning_point_proof(&self) -> Arc { unimplemented!() } @@ -143,6 +161,41 @@ pub trait ConsensusApi: Send + Sync { unimplemented!() } + fn get_block_even_if_header_only(&self, hash: Hash) -> ConsensusResult { + unimplemented!() + } + + fn get_ghostdag_data(&self, hash: Hash) -> ConsensusResult { + unimplemented!() + } + + fn get_block_children(&self, hash: Hash) -> Option>> { + unimplemented!() + } + + fn get_block_parents(&self, hash: Hash) -> Option>> { + unimplemented!() + } + + fn get_block_status(&self, hash: Hash) -> Option { + unimplemented!() + } + + fn get_block_acceptance_data(&self, hash: Hash) -> ConsensusResult> { + unimplemented!() + } + + /// Returns acceptance data for a set of blocks belonging to the selected parent chain. + /// + /// See `self::get_virtual_chain` + fn get_blocks_acceptance_data(&self, hashes: &[Hash]) -> ConsensusResult>> { + unimplemented!() + } + + fn is_chain_block(&self, hash: Hash) -> ConsensusResult { + unimplemented!() + } + fn get_pruning_point_utxos( &self, expected_pruning_point: Hash, @@ -153,10 +206,6 @@ pub trait ConsensusApi: Send + Sync { unimplemented!() } - fn get_block_status(&self, hash: Hash) -> Option { - unimplemented!() - } - fn get_missing_block_body_hashes(&self, high: Hash) -> ConsensusResult> { unimplemented!() } diff --git a/consensus/core/src/config/mod.rs b/consensus/core/src/config/mod.rs index 5237bf3e..0ef43fd4 100644 --- a/consensus/core/src/config/mod.rs +++ b/consensus/core/src/config/mod.rs @@ -26,11 +26,21 @@ pub struct Config { // TODO: // is_archival: bool, // enable_sanity_check_pruning_utxoset: bool, + // + + // TODO: move non-consensus parameters like utxoindex to a higher scoped Config + /// Enable the UTXO index + pub utxoindex: bool, + + /// Allow the node to accept blocks from RPC while not synced + /// (this flag is mainly used for testing) + // TODO: add and handle a matching kaspad command argument + pub allow_submit_block_when_not_synced: bool, } impl Config { pub fn new(params: Params) -> Self { - Self { params, perf: PERF_PARAMS, process_genesis: true } + Self { params, perf: PERF_PARAMS, process_genesis: true, utxoindex: false, allow_submit_block_when_not_synced: false } } pub fn to_builder(&self) -> ConfigBuilder { @@ -74,6 +84,14 @@ impl ConfigBuilder { self } + pub fn apply_args(mut self, edit_func: F) -> Self + where + F: Fn(&mut Config), + { + edit_func(&mut self.config); + self + } + pub fn skip_proof_of_work(mut self) -> Self { self.config.params.skip_proof_of_work = true; self diff --git a/consensus/core/src/config/params.rs b/consensus/core/src/config/params.rs index 050f09f3..ea4fab9e 100644 --- a/consensus/core/src/config/params.rs +++ b/consensus/core/src/config/params.rs @@ -1,6 +1,7 @@ use super::genesis::{GenesisBlock, DEVNET_GENESIS, GENESIS, SIMNET_GENESIS, TESTNET_GENESIS}; use crate::{networktype::NetworkType, BlockLevel, KType}; use kaspa_addresses::Prefix; +use kaspa_math::Uint256; use std::time::{SystemTime, UNIX_EPOCH}; /// Consensus parameters. Contains settings and configurations which are consensus-sensitive. @@ -17,6 +18,10 @@ pub struct Params { pub timestamp_deviation_tolerance: u64, pub target_time_per_block: u64, pub max_block_parents: u8, + /// Defines the highest allowed proof of work difficulty value for a block as a [`Uint256`] + pub max_difficulty: Uint256, + pub max_difficulty_f64: f64, + /// Size of window that is inspected to calculate the required difficulty of each block pub difficulty_window_size: usize, pub mergeset_size_limit: u64, pub merge_depth: u64, @@ -66,6 +71,13 @@ impl Params { } } +/// Highest proof of work difficulty value a Kaspa block can have for each network. +/// It is the value 2^255 - 1. +/// +/// Computed value: `Uint256::from_u64(1).wrapping_shl(255) - 1.into()` +pub const DIFFICULTY_MAX: Uint256 = Uint256([18446744073709551615, 18446744073709551615, 18446744073709551615, 9223372036854775807]); +pub const DIFFICULTY_MAX_AS_F64: f64 = 5.78960446186581e76; + const DEFAULT_GHOSTDAG_K: KType = 18; pub const MAINNET_PARAMS: Params = Params { dns_seeders: &[ @@ -96,6 +108,8 @@ pub const MAINNET_PARAMS: Params = Params { timestamp_deviation_tolerance: 132, target_time_per_block: 1000, max_block_parents: 10, + max_difficulty: DIFFICULTY_MAX, + max_difficulty_f64: DIFFICULTY_MAX_AS_F64, difficulty_window_size: 2641, mergeset_size_limit: (DEFAULT_GHOSTDAG_K as u64) * 10, merge_depth: 3600, @@ -146,6 +160,8 @@ pub const TESTNET_PARAMS: Params = Params { timestamp_deviation_tolerance: 132, target_time_per_block: 1000, max_block_parents: 10, + max_difficulty: DIFFICULTY_MAX, + max_difficulty_f64: DIFFICULTY_MAX_AS_F64, difficulty_window_size: 2641, mergeset_size_limit: (DEFAULT_GHOSTDAG_K as u64) * 10, merge_depth: 3600, @@ -192,6 +208,8 @@ pub const SIMNET_PARAMS: Params = Params { timestamp_deviation_tolerance: 132, target_time_per_block: 1000, max_block_parents: 10, + max_difficulty: DIFFICULTY_MAX, + max_difficulty_f64: DIFFICULTY_MAX_AS_F64, difficulty_window_size: 2641, mergeset_size_limit: (DEFAULT_GHOSTDAG_K as u64) * 10, merge_depth: 3600, @@ -238,6 +256,8 @@ pub const DEVNET_PARAMS: Params = Params { timestamp_deviation_tolerance: 132, target_time_per_block: 1000, max_block_parents: 10, + max_difficulty: DIFFICULTY_MAX, + max_difficulty_f64: DIFFICULTY_MAX_AS_F64, difficulty_window_size: 2641, mergeset_size_limit: (DEFAULT_GHOSTDAG_K as u64) * 10, merge_depth: 3600, @@ -273,3 +293,15 @@ pub const DEVNET_PARAMS: Params = Params { max_block_level: 250, pruning_proof_m: 1000, }; + +#[cfg(test)] +mod tests { + use crate::config::params::{DIFFICULTY_MAX, DIFFICULTY_MAX_AS_F64}; + use kaspa_math::Uint256; + + #[test] + fn test_difficulty_max_consts() { + assert_eq!(DIFFICULTY_MAX, Uint256::from_u64(1).wrapping_shl(255) - 1.into()); + assert_eq!(DIFFICULTY_MAX_AS_F64, DIFFICULTY_MAX.as_f64()); + } +} diff --git a/consensus/core/src/errors/consensus.rs b/consensus/core/src/errors/consensus.rs index 2a2407b0..81fbb423 100644 --- a/consensus/core/src/errors/consensus.rs +++ b/consensus/core/src/errors/consensus.rs @@ -8,6 +8,12 @@ pub enum ConsensusError { #[error("couldn't find block {0}")] BlockNotFound(Hash), + #[error("block {0} is invalid")] + InvalidBlock(Hash), + + #[error("some data is missing for block {0}")] + MissingData(Hash), + #[error("got unexpected pruning point")] UnexpectedPruningPoint, diff --git a/consensus/core/src/lib.rs b/consensus/core/src/lib.rs index 6be542c9..bf6253ea 100644 --- a/consensus/core/src/lib.rs +++ b/consensus/core/src/lib.rs @@ -21,6 +21,7 @@ pub mod networktype; pub mod pruning; pub mod sign; pub mod subnets; +pub mod sync_info; pub mod trusted; pub mod tx; pub mod utxo; diff --git a/consensus/core/src/sync_info.rs b/consensus/core/src/sync_info.rs new file mode 100644 index 00000000..8719782a --- /dev/null +++ b/consensus/core/src/sync_info.rs @@ -0,0 +1,15 @@ +use borsh::{BorshDeserialize, BorshSchema, BorshSerialize}; +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Debug, Serialize, Deserialize, BorshSerialize, BorshDeserialize, BorshSchema, Default)] +#[serde(rename_all = "camelCase")] +pub struct SyncInfo { + pub header_count: u64, + pub block_count: u64, +} + +impl SyncInfo { + pub fn new(block_count: u64, header_count: u64) -> Self { + Self { block_count, header_count } + } +} diff --git a/consensus/notify/src/collector.rs b/consensus/notify/src/collector.rs index 98e620f4..cf700391 100644 --- a/consensus/notify/src/collector.rs +++ b/consensus/notify/src/collector.rs @@ -1,4 +1,5 @@ use crate::notification::Notification; -use kaspa_notify::collector::CollectorFrom; +use kaspa_notify::{collector::CollectorFrom, converter::ConverterFrom}; -pub type ConsensusCollector = CollectorFrom; +pub type ConsensusConverter = ConverterFrom; +pub type ConsensusCollector = CollectorFrom; diff --git a/consensus/notify/src/service.rs b/consensus/notify/src/service.rs index cc170da5..26c41d3f 100644 --- a/consensus/notify/src/service.rs +++ b/consensus/notify/src/service.rs @@ -1,11 +1,15 @@ -use crate::{notification::Notification, notifier::ConsensusNotifier, root::ConsensusNotificationRoot}; +use crate::{ + collector::{ConsensusCollector, ConsensusConverter}, + notification::Notification, + notifier::ConsensusNotifier, + root::ConsensusNotificationRoot, +}; use async_channel::Receiver; use kaspa_core::{ task::service::{AsyncService, AsyncServiceError, AsyncServiceFuture}, trace, }; use kaspa_notify::{ - collector::CollectorFrom, events::{EventSwitches, EVENT_TYPE_ARRAY}, subscriber::Subscriber, }; @@ -22,7 +26,7 @@ pub struct NotifyService { impl NotifyService { pub fn new(root: Arc, notification_receiver: Receiver) -> Self { let root_events: EventSwitches = EVENT_TYPE_ARRAY[..].into(); - let collector = Arc::new(CollectorFrom::::new(notification_receiver)); + let collector = Arc::new(ConsensusCollector::new(notification_receiver, Arc::new(ConsensusConverter::new()))); let subscriber = Arc::new(Subscriber::new(root_events, root, 0)); let notifier = Arc::new(ConsensusNotifier::new(root_events, vec![collector], vec![subscriber], 1, NOTIFY_SERVICE)); Self { notifier, shutdown: SingleTrigger::default() } diff --git a/consensus/src/consensus/factory.rs b/consensus/src/consensus/factory.rs index 4dc5928a..42cfdd17 100644 --- a/consensus/src/consensus/factory.rs +++ b/consensus/src/consensus/factory.rs @@ -192,7 +192,7 @@ impl ConsensusFactory for Factory { let db = kaspa_database::prelude::open_db(dir, true, self.db_parallelism); // TODO: pass lock to consensus let session_lock = Arc::new(TokioRwLock::new(())); - let consensus = Arc::new(Consensus::new(db.clone(), &config, self.notification_root.clone(), self.counters.clone())); + let consensus = Arc::new(Consensus::new(db.clone(), Arc::new(config), self.notification_root.clone(), self.counters.clone())); // We write the new active entry only once the instance was created successfully. // This way we can safely avoid processing genesis in future process runs @@ -211,7 +211,7 @@ impl ConsensusFactory for Factory { let session_lock = Arc::new(TokioRwLock::new(())); let consensus = Arc::new(Consensus::new( db.clone(), - &self.config.to_builder().skip_adding_genesis().build(), + Arc::new(self.config.to_builder().skip_adding_genesis().build()), self.notification_root.clone(), self.counters.clone(), )); diff --git a/consensus/src/consensus/mod.rs b/consensus/src/consensus/mod.rs index 2726a62a..0f8fe669 100644 --- a/consensus/src/consensus/mod.rs +++ b/consensus/src/consensus/mod.rs @@ -13,18 +13,18 @@ use crate::{ statuses::MTStatusesService, }, stores::{ - acceptance_data::DbAcceptanceDataStore, + acceptance_data::{AcceptanceDataStoreReader, DbAcceptanceDataStore}, block_transactions::{BlockTransactionsStoreReader, DbBlockTransactionsStore}, block_window_cache::BlockWindowCacheStore, daa::DbDaaStore, depth::DbDepthStore, - ghostdag::DbGhostdagStore, + ghostdag::{DbGhostdagStore, GhostdagStoreReader}, headers::{DbHeadersStore, HeaderStoreReader}, headers_selected_tip::{DbHeadersSelectedTipStore, HeadersSelectedTipStoreReader}, past_pruning_points::{DbPastPruningPointsStore, PastPruningPointsStoreReader}, pruning::{DbPruningStore, PruningStoreReader}, reachability::DbReachabilityStore, - relations::DbRelationsStore, + relations::{DbRelationsStore, RelationsStoreReader}, selected_chain::DbSelectedChainStore, statuses::{DbStatusesStore, StatusesStoreReader}, tips::{DbTipsStore, TipsStoreReader}, @@ -51,6 +51,7 @@ use crate::{ }, }; use kaspa_consensus_core::{ + acceptance_data::AcceptanceData, api::ConsensusApi, block::{Block, BlockTemplate}, blockhash::BlockHashExtensions, @@ -65,12 +66,12 @@ use kaspa_consensus_core::{ header::Header, muhash::MuHashExtensions, pruning::{PruningPointProof, PruningPointsList}, - trusted::TrustedBlock, + sync_info::SyncInfo, + trusted::{ExternalGhostdagData, TrustedBlock}, tx::{MutableTransaction, Transaction, TransactionOutpoint, UtxoEntry}, - BlockHashSet, + BlockHashSet, ChainPath, }; use kaspa_consensus_notify::root::ConsensusNotificationRoot; -use kaspa_utils::option::OptionExtensions; use crossbeam_channel::{unbounded as unbounded_crossbeam, Receiver as CrossbeamReceiver, Sender as CrossbeamSender}; use futures_util::future::BoxFuture; @@ -131,6 +132,7 @@ pub struct Consensus { pruning_point_utxo_set_store: Arc>, pub(super) virtual_stores: Arc>, pub(super) past_pruning_points_store: Arc, + acceptance_data_store: Arc, // TODO: remove all pub from stores and processors when StoreManager is implemented // Append-only stores @@ -171,17 +173,16 @@ pub struct Consensus { pub counters: Arc, // Config - config: Config, + config: Arc, } impl Consensus { pub fn new( db: Arc, - config: &Config, + config: Arc, notification_root: Arc, counters: Arc, ) -> Self { - let config = config.clone(); let params = &config.params; let perf_params = &config.perf; // @@ -417,6 +418,7 @@ impl Consensus { past_median_time_manager.clone(), params.max_block_mass, params.genesis.clone(), + notification_root.clone(), counters.clone(), )); @@ -436,7 +438,7 @@ impl Consensus { body_tips_store.clone(), utxo_diffs_store.clone(), utxo_multisets_store, - acceptance_data_store, + acceptance_data_store.clone(), virtual_stores.clone(), pruning_point_utxo_set_store.clone(), ghostdag_manager.clone(), @@ -533,6 +535,7 @@ impl Consensus { block_transactions_store, pruning_point_utxo_set_store, virtual_stores, + acceptance_data_store, past_pruning_points_store, statuses_service, @@ -679,17 +682,31 @@ impl ConsensusApi for Consensus { } } - fn get_sink_timestamp(&self) -> Option { - // TODO: unwrap on virtual state read when staging consensus is implemented - self.virtual_processor.virtual_stores.read().state.get().unwrap_option().map(|state| { - let sink = state.ghostdag_data.selected_parent; - self.headers_store.get_timestamp(sink).unwrap() - }) + fn get_sink(&self) -> Hash { + self.virtual_processor.virtual_stores.read().state.get().unwrap().ghostdag_data.selected_parent + } + + fn get_sink_timestamp(&self) -> u64 { + self.headers_store.get_timestamp(self.get_sink()).unwrap() + } + + fn get_sync_info(&self) -> SyncInfo { + // TODO: actually get those numbers + SyncInfo::default() } fn is_nearly_synced(&self) -> bool { // See comment within `config.is_nearly_synced` - self.get_sink_timestamp().has_value_and(|&t| self.config.is_nearly_synced(t)) + self.config.is_nearly_synced(self.get_sink_timestamp()) + } + + fn get_virtual_chain_from_block(&self, hash: Hash) -> ConsensusResult { + // Calculate chain changes between the given hash and the + // sink. Note that we explicitly don't + // do the calculation against the virtual itself so that we + // won't later need to remove it from the result. + self.validate_block_exists(hash)?; + Ok(self.dag_traversal_manager.calculate_chain_path(hash, self.get_sink())) } fn get_virtual_parents(&self) -> BlockHashSet { @@ -781,6 +798,7 @@ impl ConsensusApi for Consensus { // max_blocks has to be greater than the merge set size limit fn get_hashes_between(&self, low: Hash, high: Hash, max_blocks: usize) -> ConsensusResult<(Vec, Hash)> { + assert!(max_blocks as u64 > self.config.mergeset_size_limit); self.validate_block_exists(low)?; self.validate_block_exists(high)?; @@ -796,6 +814,11 @@ impl ConsensusApi for Consensus { self.headers_selected_tip_store.read().get().unwrap().hash } + fn get_anticone(&self, hash: Hash) -> ConsensusResult> { + self.validate_block_exists(hash)?; + Ok(self.dag_traversal_manager.anticone(hash, self.virtual_stores.read().state.get().unwrap().parents.iter().copied(), None)?) + } + fn get_pruning_point_proof(&self) -> Arc { self.pruning_proof_manager.get_pruning_point_proof() } @@ -842,10 +865,57 @@ impl ConsensusApi for Consensus { }) } + fn get_block_even_if_header_only(&self, hash: Hash) -> ConsensusResult { + let Some(status) = self.statuses_store.read().get(hash).unwrap_option().filter(|&status| status.has_block_header()) else { + return Err(ConsensusError::BlockNotFound(hash)); + }; + Ok(Block { + header: self.headers_store.get_header(hash).unwrap(), + transactions: if status.is_header_only() { Arc::new(vec![]) } else { self.block_transactions_store.get(hash).unwrap() }, + }) + } + + fn get_ghostdag_data(&self, hash: Hash) -> ConsensusResult { + match self.get_block_status(hash) { + None => return Err(ConsensusError::BlockNotFound(hash)), + Some(BlockStatus::StatusInvalid) => return Err(ConsensusError::InvalidBlock(hash)), + _ => {} + }; + Ok((&*self.ghostdag_store.get_data(hash).unwrap()).into()) + } + + fn get_block_children(&self, hash: Hash) -> Option>> { + self.relations_service.get_children(hash).unwrap_option() + } + + fn get_block_parents(&self, hash: Hash) -> Option>> { + self.relations_service.get_parents(hash).unwrap_option() + } + fn get_block_status(&self, hash: Hash) -> Option { self.statuses_store.read().get(hash).unwrap_option() } + fn get_block_acceptance_data(&self, hash: Hash) -> ConsensusResult> { + self.validate_block_exists(hash)?; + Ok(self.acceptance_data_store.get(hash).unwrap()) + } + + fn get_blocks_acceptance_data(&self, hashes: &[Hash]) -> ConsensusResult>> { + hashes + .iter() + .copied() + .map(|hash| { + self.validate_block_exists(hash)?; + self.acceptance_data_store.get(hash).map_err(|_| ConsensusError::MissingData(hash)) + }) + .collect::>>() + } + + fn is_chain_block(&self, hash: Hash) -> ConsensusResult { + self.is_chain_ancestor_of(hash, self.get_sink()) + } + fn get_missing_block_body_hashes(&self, high: Hash) -> ConsensusResult> { self.validate_block_exists(high)?; Ok(self.sync_manager.get_missing_block_body_hashes(high)?) diff --git a/consensus/src/consensus/test_consensus.rs b/consensus/src/consensus/test_consensus.rs index de40edfa..76d5d793 100644 --- a/consensus/src/consensus/test_consensus.rs +++ b/consensus/src/consensus/test_consensus.rs @@ -57,7 +57,7 @@ impl TestConsensus { let notification_root = Arc::new(ConsensusNotificationRoot::new(notification_sender)); let counters = Arc::new(ProcessingCounters::default()); Self { - consensus: Arc::new(Consensus::new(db, config, notification_root, counters)), + consensus: Arc::new(Consensus::new(db, Arc::new(config.clone()), notification_root, counters)), params: config.params.clone(), temp_db_lifetime: Default::default(), } @@ -72,7 +72,7 @@ impl TestConsensus { let notification_root = Arc::new(ConsensusNotificationRoot::new(notification_sender)); let counters = Arc::new(ProcessingCounters::default()); Self { - consensus: Arc::new(Consensus::new(db, config, notification_root, counters)), + consensus: Arc::new(Consensus::new(db, Arc::new(config.clone()), notification_root, counters)), params: config.params.clone(), temp_db_lifetime, } @@ -84,7 +84,7 @@ impl TestConsensus { let notification_root = Arc::new(ConsensusNotificationRoot::new(dummy_notification_sender)); let counters = Arc::new(ProcessingCounters::default()); Self { - consensus: Arc::new(Consensus::new(db, config, notification_root, counters)), + consensus: Arc::new(Consensus::new(db, Arc::new(config.clone()), notification_root, counters)), params: config.params.clone(), temp_db_lifetime, } diff --git a/consensus/src/pipeline/body_processor/processor.rs b/consensus/src/pipeline/body_processor/processor.rs index 9d9023b0..cb9f2123 100644 --- a/consensus/src/pipeline/body_processor/processor.rs +++ b/consensus/src/pipeline/body_processor/processor.rs @@ -30,7 +30,12 @@ use kaspa_consensus_core::{ config::genesis::GenesisBlock, tx::Transaction, }; +use kaspa_consensus_notify::{ + notification::{BlockAddedNotification, Notification}, + root::ConsensusNotificationRoot, +}; use kaspa_hashes::Hash; +use kaspa_notify::notifier::Notify; use parking_lot::RwLock; use rayon::ThreadPool; use rocksdb::WriteBatch; @@ -74,6 +79,8 @@ pub struct BlockBodyProcessor { // Dependency manager task_manager: BlockTaskDependencyManager, + pub(crate) notification_root: Arc, + // Counters counters: Arc, } @@ -103,6 +110,7 @@ impl BlockBodyProcessor { >, max_block_mass: u64, genesis: GenesisBlock, + notification_root: Arc, counters: Arc, ) -> Self { Self { @@ -123,6 +131,7 @@ impl BlockBodyProcessor { max_block_mass, genesis, task_manager: BlockTaskDependencyManager::new(), + notification_root, counters, } } @@ -198,6 +207,10 @@ impl BlockBodyProcessor { self.commit_body(block.hash(), block.header.direct_parents(), block.transactions.clone()); + // Send a BlockAdded notification + // TODO: handle notify errors + let _ = self.notification_root.notify(Notification::BlockAdded(BlockAddedNotification::new(block.to_owned()))); + // Report counters self.counters.body_counts.fetch_add(1, Ordering::Relaxed); self.counters.txs_counts.fetch_add(block.transactions.len() as u64, Ordering::Relaxed); diff --git a/core/src/lib.rs b/core/src/lib.rs index 63385b91..6e944e60 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -5,6 +5,7 @@ pub mod console; pub mod log; pub mod panic; pub mod time; +pub mod version; cfg_if::cfg_if! { if #[cfg(not(target_arch = "wasm32"))] { diff --git a/core/src/version.rs b/core/src/version.rs new file mode 100644 index 00000000..99cfdd50 --- /dev/null +++ b/core/src/version.rs @@ -0,0 +1,3 @@ +pub fn version() -> &'static str { + env!("CARGO_PKG_VERSION") +} diff --git a/crypto/txscript/Cargo.toml b/crypto/txscript/Cargo.toml index 70e39a08..a9c8fe1c 100644 --- a/crypto/txscript/Cargo.toml +++ b/crypto/txscript/Cargo.toml @@ -22,6 +22,8 @@ parking_lot.workspace = true kaspa-txscript-errors.workspace = true smallvec.workspace = true thiserror.workspace = true +borsh.workspace = true +serde.workspace = true [dev-dependencies] criterion.workspace = true diff --git a/crypto/txscript/src/script_class.rs b/crypto/txscript/src/script_class.rs index ab2f9401..b3c4a649 100644 --- a/crypto/txscript/src/script_class.rs +++ b/crypto/txscript/src/script_class.rs @@ -1,6 +1,8 @@ use crate::{opcodes, MAX_SCRIPT_PUBLIC_KEY_VERSION}; +use borsh::{BorshDeserialize, BorshSchema, BorshSerialize}; use kaspa_addresses::Version; use kaspa_consensus_core::tx::{ScriptPublicKey, ScriptPublicKeyVersion}; +use serde::{Deserialize, Serialize}; use std::{ fmt::{Display, Formatter}, str::FromStr, @@ -14,7 +16,7 @@ pub enum Error { } /// Standard classes of script payment in the blockDAG -#[derive(PartialEq, Eq, Debug, Clone)] +#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize, BorshSerialize, BorshDeserialize, BorshSchema)] #[repr(u8)] pub enum ScriptClass { /// None of the recognized forms diff --git a/indexes/core/Cargo.toml b/indexes/core/Cargo.toml index 35d1c3a1..036b191e 100644 --- a/indexes/core/Cargo.toml +++ b/indexes/core/Cargo.toml @@ -8,7 +8,6 @@ license.workspace = true [dependencies] kaspa-hashes.workspace = true -kaspa-addresses.workspace = true thiserror.workspace = true kaspa-consensus-core.workspace = true kaspa-utils.workspace = true diff --git a/indexes/core/src/notification.rs b/indexes/core/src/notification.rs index 9527c8a3..a406b0a2 100644 --- a/indexes/core/src/notification.rs +++ b/indexes/core/src/notification.rs @@ -1,6 +1,5 @@ use crate::indexed_utxos::{UtxoChanges, UtxoSetByScriptPublicKey}; use derive_more::Display; -use kaspa_addresses::Prefix; use kaspa_notify::{ events::EventType, full_featured, @@ -55,14 +54,19 @@ pub struct PruningPointUtxoSetOverrideNotification {} #[derive(Debug, Clone)] pub struct UtxosChangedNotification { - pub prefix: Prefix, pub added: Arc, pub removed: Arc, } +impl From for UtxosChangedNotification { + fn from(item: UtxoChanges) -> Self { + Self { added: Arc::new(item.added), removed: Arc::new(item.removed) } + } +} + impl UtxosChangedNotification { - pub fn from_utxos_changed(utxos_changed: UtxoChanges, prefix: Prefix) -> Self { - Self { added: Arc::new(utxos_changed.added), removed: Arc::new(utxos_changed.removed), prefix } + pub fn from_utxos_changed(utxos_changed: UtxoChanges) -> Self { + Self { added: Arc::new(utxos_changed.added), removed: Arc::new(utxos_changed.removed) } } pub(crate) fn apply_utxos_changed_subscription(&self, subscription: &UtxosChangedSubscription) -> Option { @@ -74,7 +78,7 @@ impl UtxosChangedNotification { if added.is_empty() && removed.is_empty() { None } else { - Some(Self { prefix: self.prefix, added: Arc::new(added), removed: Arc::new(removed) }) + Some(Self { added: Arc::new(added), removed: Arc::new(removed) }) } } } diff --git a/indexes/processor/src/processor.rs b/indexes/processor/src/processor.rs index 5e282e0c..0c04f604 100644 --- a/indexes/processor/src/processor.rs +++ b/indexes/processor/src/processor.rs @@ -7,7 +7,6 @@ use futures::{ future::FutureExt, // for `.fuse()` select, }; -use kaspa_consensus_core::config::Config; use kaspa_consensus_notify::{notification as consensus_notification, notification::Notification as ConsensusNotification}; use kaspa_core::trace; use kaspa_index_core::notification::{Notification, PruningPointUtxoSetOverrideNotification, UtxosChangedNotification}; @@ -32,7 +31,6 @@ use std::sync::{ /// into their pending local versions and relaying them to a local notifier. #[derive(Debug)] pub struct Processor { - config: Config, utxoindex: DynUtxoIndexApi, recv_channel: CollectorNotificationReceiver, @@ -43,17 +41,12 @@ pub struct Processor { } impl Processor { - pub fn new( - utxoindex: DynUtxoIndexApi, - recv_channel: CollectorNotificationReceiver, - config: &Config, - ) -> Self { + pub fn new(utxoindex: DynUtxoIndexApi, recv_channel: CollectorNotificationReceiver) -> Self { Self { utxoindex, recv_channel, collect_shutdown: Arc::new(DuplexTrigger::new()), is_started: Arc::new(AtomicBool::new(false)), - config: config.clone(), } } @@ -116,8 +109,7 @@ impl Processor { ) -> IndexResult { trace!("[{IDENT}]: processing {:?}", notification); if let Some(utxoindex) = self.utxoindex.as_deref() { - let utxos_changed = utxoindex.write().update(notification.accumulated_utxo_diff.clone(), notification.virtual_parents)?; - return Ok(UtxosChangedNotification::from_utxos_changed(utxos_changed, self.config.prefix())); + return Ok(utxoindex.write().update(notification.accumulated_utxo_diff.clone(), notification.virtual_parents)?.into()); }; Err(IndexError::NotSupported(EventType::UtxosChanged)) } @@ -184,12 +176,12 @@ mod tests { fn new() -> Self { let (consensus_sender, consensus_receiver) = unbounded(); let (utxoindex_db_lifetime, utxoindex_db) = create_temp_db(); - let config = Config::new(DEVNET_PARAMS); + let config = Arc::new(Config::new(DEVNET_PARAMS)); let tc = TestConsensus::create_from_temp_db_and_dummy_sender(&config); tc.init(); let consensus_manager = Arc::new(ConsensusManager::from_consensus(tc.consensus())); let utxoindex: DynUtxoIndexApi = Some(UtxoIndex::new(consensus_manager, utxoindex_db).unwrap()); - let processor = Arc::new(Processor::new(utxoindex, consensus_receiver, &config)); + let processor = Arc::new(Processor::new(utxoindex, consensus_receiver)); let (processor_sender, processor_receiver) = unbounded(); let notifier = Arc::new(NotifyMock::new(processor_sender)); processor.clone().start(notifier); diff --git a/indexes/processor/src/service.rs b/indexes/processor/src/service.rs index 77294f26..b7ad7823 100644 --- a/indexes/processor/src/service.rs +++ b/indexes/processor/src/service.rs @@ -1,5 +1,4 @@ use crate::{processor::Processor, IDENT}; -use kaspa_consensus_core::config::Config; use kaspa_consensus_notify::{ connection::ConsensusChannelConnection, notification::Notification as ConsensusNotification, notifier::ConsensusNotifier, }; @@ -25,7 +24,7 @@ pub struct IndexService { } impl IndexService { - pub fn new(consensus_notifier: &Arc, utxoindex: DynUtxoIndexApi, config: &Config) -> Self { + pub fn new(consensus_notifier: &Arc, utxoindex: DynUtxoIndexApi) -> Self { // Prepare consensus-notify objects let consensus_notify_channel = Channel::::default(); let consensus_notify_listener_id = @@ -34,7 +33,7 @@ impl IndexService { // Prepare the index-processor notifier // No subscriber is defined here because the subscription are manually created during the construction and never changed after that. let events: EventSwitches = [EventType::UtxosChanged, EventType::PruningPointUtxoSetOverride].as_ref().into(); - let collector = Arc::new(Processor::new(utxoindex.clone(), consensus_notify_channel.receiver(), config)); + let collector = Arc::new(Processor::new(utxoindex.clone(), consensus_notify_channel.receiver())); let notifier = Arc::new(IndexNotifier::new(events, vec![collector], vec![], 1, INDEX_SERVICE)); // Manually subscribe to index-processor related event types diff --git a/kaspad/src/args.rs b/kaspad/src/args.rs index c92f6be2..6de005ce 100644 --- a/kaspad/src/args.rs +++ b/kaspad/src/args.rs @@ -1,5 +1,7 @@ #[allow(unused)] use clap::{arg, command, Arg, Command}; +use kaspa_consensus::config::Config; +use kaspa_core::version::version; pub struct Defaults { pub appdir: &'static str, @@ -58,7 +60,7 @@ pub struct Args { pub fn cli(defaults: &Defaults) -> Command { Command::new("kaspad") - .about(format!("{} (rusty-kaspa) v{}", env!("CARGO_PKG_DESCRIPTION"), env!("CARGO_PKG_VERSION"))) + .about(format!("{} (rusty-kaspa) v{}", env!("CARGO_PKG_DESCRIPTION"), version())) .version(env!("CARGO_PKG_VERSION")) .arg(arg!(-b --appdir "Directory to store data.")) .arg( @@ -170,6 +172,10 @@ impl Args { simnet: m.get_one::("simnet").cloned().unwrap_or(defaults.simnet), } } + + pub fn apply_to_config(&self, config: &mut Config) { + config.utxoindex = self.utxoindex; + } } /* diff --git a/kaspad/src/main.rs b/kaspad/src/main.rs index 00163104..dd3a9a7d 100644 --- a/kaspad/src/main.rs +++ b/kaspad/src/main.rs @@ -9,6 +9,7 @@ use kaspa_consensus_core::networktype::NetworkType; use kaspa_consensus_notify::root::ConsensusNotificationRoot; use kaspa_consensus_notify::service::NotifyService; use kaspa_consensusmanager::ConsensusManager; +use kaspa_core::version::version; use kaspa_core::{core::Core, signals::Signals, task::runtime::AsyncRuntime}; use kaspa_index_processor::service::IndexService; use kaspa_mining::manager::MiningManager; @@ -137,7 +138,7 @@ pub fn main() { kaspa_core::log::init_logger(&args.log_level); // Print package name and version - info!("{} v{}", env!("CARGO_PKG_NAME"), env!("CARGO_PKG_VERSION")); + info!("{} v{}", env!("CARGO_PKG_NAME"), version()); // Configure the panic behavior kaspa_core::panic::configure_panic(); @@ -150,12 +151,16 @@ pub fn main() { _ => panic!("only a single net should be activated"), }; - let config = match network_type { - NetworkType::Mainnet => ConfigBuilder::new(MAINNET_PARAMS).build(), - NetworkType::Testnet => ConfigBuilder::new(TESTNET_PARAMS).build(), - NetworkType::Devnet => ConfigBuilder::new(DEVNET_PARAMS).build(), - NetworkType::Simnet => ConfigBuilder::new(SIMNET_PARAMS).build(), - }; + let config = Arc::new( + match network_type { + NetworkType::Mainnet => ConfigBuilder::new(MAINNET_PARAMS), + NetworkType::Testnet => ConfigBuilder::new(TESTNET_PARAMS), + NetworkType::Devnet => ConfigBuilder::new(DEVNET_PARAMS), + NetworkType::Simnet => ConfigBuilder::new(SIMNET_PARAMS), + } + .apply_args(|config| args.apply_to_config(config)) + .build(), + ); // TODO: Refactor all this quick-and-dirty code let app_dir = args @@ -214,13 +219,13 @@ pub fn main() { let consensus_manager = Arc::new(ConsensusManager::new(consensus_factory)); let monitor = Arc::new(ConsensusMonitor::new(counters)); - let notify_service = Arc::new(NotifyService::new(notification_root, notification_recv)); + let notify_service = Arc::new(NotifyService::new(notification_root.clone(), notification_recv)); let index_service: Option> = if args.utxoindex { // Use only a single thread for none-consensus databases let utxoindex_db = kaspa_database::prelude::open_db(utxoindex_db_dir, true, 1); let utxoindex: DynUtxoIndexApi = Some(UtxoIndex::new(consensus_manager.clone(), utxoindex_db).unwrap()); - Some(Arc::new(IndexService::new(¬ify_service.notifier(), utxoindex, &config))) + Some(Arc::new(IndexService::new(¬ify_service.notifier(), utxoindex))) } else { None }; @@ -229,9 +234,15 @@ pub fn main() { let mining_manager = Arc::new(MiningManager::new(config.target_time_per_block, false, config.max_block_mass, None)); - let flow_context = Arc::new(FlowContext::new(consensus_manager.clone(), address_manager, &config, mining_manager)); + let flow_context = Arc::new(FlowContext::new( + consensus_manager.clone(), + address_manager, + config.clone(), + mining_manager.clone(), + notification_root, + )); let p2p_service = Arc::new(P2pService::new( - flow_context, + flow_context.clone(), args.connect, args.listen, args.outbound_target, @@ -245,6 +256,9 @@ pub fn main() { consensus_manager.clone(), notify_service.notifier(), index_service.as_ref().map(|x| x.notifier()), + mining_manager, + flow_context, + config, )); let grpc_server = Arc::new(GrpcServer::new(grpc_server_addr, rpc_core_server.service())); diff --git a/mining/src/model/mod.rs b/mining/src/model/mod.rs index e7146d9d..c53ad12e 100644 --- a/mining/src/model/mod.rs +++ b/mining/src/model/mod.rs @@ -6,4 +6,4 @@ pub mod owner_txs; pub mod topological_index; /// A set of unique transaction ids -pub(crate) type TransactionIdSet = HashSet; +pub type TransactionIdSet = HashSet; diff --git a/notify/src/collector.rs b/notify/src/collector.rs index 0a17546b..35a7c169 100644 --- a/notify/src/collector.rs +++ b/notify/src/collector.rs @@ -1,9 +1,8 @@ -use crate::notifier::DynNotify; - use super::{ error::{Error, Result}, notification::Notification, }; +use crate::{converter::Converter, notifier::DynNotify}; use async_channel::{Receiver, Sender}; use async_trait::async_trait; use core::fmt::Debug; @@ -14,11 +13,11 @@ use futures::{ }; use futures_util::stream::StreamExt; use kaspa_core::trace; -use kaspa_utils::channel::Channel; -use kaspa_utils::triggers::DuplexTrigger; -use std::marker::PhantomData; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::Arc; +use kaspa_utils::{channel::Channel, triggers::DuplexTrigger}; +use std::sync::{ + atomic::{AtomicBool, Ordering}, + Arc, +}; pub type CollectorNotificationChannel = Channel; pub type CollectorNotificationSender = Sender; @@ -46,43 +45,41 @@ pub type DynCollector = Arc>; /// A notification [`Collector`] that receives `I` from a channel, /// converts it into a `N` and sends it to a [`DynNotify`]. #[derive(Debug)] -pub struct CollectorFrom +pub struct CollectorFrom where - N: Notification, - I: Send + Sync + 'static + Sized + Debug, + C: Converter, { - recv_channel: CollectorNotificationReceiver, + recv_channel: CollectorNotificationReceiver, + + converter: Arc, /// Has this collector been started? is_started: Arc, collect_shutdown: Arc, - - _notification: PhantomData, } -impl CollectorFrom +impl CollectorFrom where - N: Notification, - I: Send + Sync + 'static + Sized + Debug, - I: Into, + C: Converter + 'static, { - pub fn new(recv_channel: CollectorNotificationReceiver) -> Self { + pub fn new(recv_channel: CollectorNotificationReceiver, converter: Arc) -> Self { Self { recv_channel, + converter, collect_shutdown: Arc::new(DuplexTrigger::new()), is_started: Arc::new(AtomicBool::new(false)), - _notification: PhantomData, } } - fn spawn_collecting_task(self: Arc, notifier: DynNotify) { + fn spawn_collecting_task(self: Arc, notifier: DynNotify) { // The task can only be spawned once if self.is_started.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst).is_err() { return; } let collect_shutdown = self.collect_shutdown.clone(); let recv_channel = self.recv_channel.clone(); + let converter = self.converter.clone(); workflow_core::task::spawn(async move { trace!("[Collector] collecting_task start"); @@ -99,7 +96,7 @@ where notification = notifications.next().fuse() => { match notification { Some(notification) => { - match notifier.notify(notification.into()) { + match notifier.notify(converter.convert(notification).await) { Ok(_) => (), Err(err) => { trace!("[Collector] notification sender error: {:?}", err); @@ -129,11 +126,10 @@ where } #[async_trait] -impl Collector for CollectorFrom +impl Collector for CollectorFrom where N: Notification, - I: Send + Sync + 'static + Sized + Debug, - I: Into, + C: Converter + 'static, { fn start(self: Arc, notifier: DynNotify) { self.spawn_collecting_task(notifier); @@ -148,6 +144,7 @@ where mod tests { use super::*; use crate::{ + converter::ConverterFrom, events::EventType, notifier::test_helpers::NotifyMock, subscription::single::{OverallSubscription, UtxosChangedSubscription, VirtualChainChangedSubscription}, @@ -195,9 +192,10 @@ mod tests { #[tokio::test] async fn test_collector_from() { + type TestConverter = ConverterFrom; let incoming = Channel::default(); - let collector: Arc> = - Arc::new(CollectorFrom::new(incoming.receiver())); + let collector: Arc> = + Arc::new(CollectorFrom::new(incoming.receiver(), Arc::new(TestConverter::new()))); let outgoing = Channel::default(); let notifier = Arc::new(NotifyMock::new(outgoing.sender())); collector.clone().start(notifier); diff --git a/notify/src/converter.rs b/notify/src/converter.rs new file mode 100644 index 00000000..385c43b4 --- /dev/null +++ b/notify/src/converter.rs @@ -0,0 +1,59 @@ +use async_trait::async_trait; + +use crate::notification::Notification; +use core::fmt::Debug; +use std::marker::PhantomData; + +#[async_trait] +pub trait Converter: Send + Sync + Debug { + type Incoming: Send + Sync + 'static + Sized + Debug; + type Outgoing: Notification; + + async fn convert(&self, incoming: Self::Incoming) -> Self::Outgoing; +} + +/// A notification [`Converter`] that converts an incoming `I` into a notification `N` using the [`From`] trait. +#[derive(Debug)] +pub struct ConverterFrom +where + I: Send + Sync + 'static + Sized + Debug, + N: Notification, +{ + _incoming: PhantomData, + _notification: PhantomData, +} + +impl ConverterFrom +where + N: Notification, + I: Send + Sync + 'static + Sized + Debug, +{ + pub fn new() -> Self { + Self { _incoming: PhantomData, _notification: PhantomData } + } +} + +impl Default for ConverterFrom +where + N: Notification, + I: Send + Sync + 'static + Sized + Debug, +{ + fn default() -> Self { + Self::new() + } +} + +#[async_trait] +impl Converter for ConverterFrom +where + N: Notification, + I: Send + Sync + 'static + Sized + Debug, + I: Into, +{ + type Incoming = I; + type Outgoing = N; + + async fn convert(&self, incoming: I) -> N { + incoming.into() + } +} diff --git a/notify/src/lib.rs b/notify/src/lib.rs index 557dbc42..123aad75 100644 --- a/notify/src/lib.rs +++ b/notify/src/lib.rs @@ -4,6 +4,7 @@ pub mod address; pub(crate) mod broadcaster; pub mod collector; pub mod connection; +pub mod converter; pub mod error; pub mod events; pub mod listener; diff --git a/notify/src/notifier.rs b/notify/src/notifier.rs index bbfaa65d..d95b8384 100644 --- a/notify/src/notifier.rs +++ b/notify/src/notifier.rs @@ -655,6 +655,7 @@ mod tests { use super::{test_helpers::*, *}; use crate::{ collector::CollectorFrom, + converter::ConverterFrom, events::EVENT_TYPE_ARRAY, notification::test_helpers::*, subscriber::test_helpers::{SubscriptionManagerMock, SubscriptionMessage}, @@ -676,11 +677,13 @@ mod tests { impl Test { fn new(name: &'static str, listener_count: usize, steps: Vec) -> Self { + type TestConverter = ConverterFrom; + type TestCollector = CollectorFrom; // Build the full-featured notifier let (sync_sender, sync_receiver) = unbounded(); let (notification_sender, notification_receiver) = unbounded(); let (subscription_sender, subscription_receiver) = unbounded(); - let collector = Arc::new(CollectorFrom::::new(notification_receiver)); + let collector = Arc::new(TestCollector::new(notification_receiver, Arc::new(TestConverter::new()))); let subscription_manager = Arc::new(SubscriptionManagerMock::new(subscription_sender)); let subscriber = Arc::new(Subscriber::new(EVENT_TYPE_ARRAY[..].into(), subscription_manager, SUBSCRIPTION_MANAGER_ID)); let notifier = Arc::new(TestNotifier::with_sync( diff --git a/protocol/flows/Cargo.toml b/protocol/flows/Cargo.toml index ff36d044..2464345e 100644 --- a/protocol/flows/Cargo.toml +++ b/protocol/flows/Cargo.toml @@ -8,6 +8,7 @@ edition = "2021" [dependencies] kaspa-core.workspace = true kaspa-consensus-core.workspace = true +kaspa-consensus-notify.workspace = true kaspa-p2p-lib.workspace = true kaspa-utils.workspace = true kaspa-hashes.workspace = true @@ -16,6 +17,7 @@ kaspa-connectionmanager.workspace = true kaspa-addressmanager.workspace = true kaspa-consensusmanager.workspace = true kaspa-mining.workspace = true +kaspa-notify.workspace = true parking_lot.workspace = true rand.workspace = true diff --git a/protocol/flows/src/flow_context.rs b/protocol/flows/src/flow_context.rs index 6e3639a4..1b508d6f 100644 --- a/protocol/flows/src/flow_context.rs +++ b/protocol/flows/src/flow_context.rs @@ -6,21 +6,27 @@ use crate::flowcontext::{ use crate::v5; use async_trait::async_trait; use kaspa_addressmanager::AddressManager; -use kaspa_consensus_core::api::ConsensusApi; use kaspa_consensus_core::block::Block; use kaspa_consensus_core::config::Config; use kaspa_consensus_core::tx::{Transaction, TransactionId}; +use kaspa_consensus_core::{api::ConsensusApi, errors::block::RuleError}; +use kaspa_consensus_notify::{ + notification::{NewBlockTemplateNotification, Notification}, + root::ConsensusNotificationRoot, +}; use kaspa_consensusmanager::{ConsensusInstance, ConsensusManager}; -use kaspa_core::time::unix_now; use kaspa_core::{debug, info}; +use kaspa_core::{time::unix_now, warn}; use kaspa_hashes::Hash; use kaspa_mining::{ manager::MiningManager, mempool::tx::{Orphan, Priority}, }; +use kaspa_notify::notifier::Notify; use kaspa_p2p_lib::{ common::ProtocolError, - pb::{self}, + make_message, + pb::{self, kaspad_message::Payload, InvRelayBlockMessage}, ConnectionInitializer, Hub, KaspadHandshake, Router, }; use parking_lot::Mutex; @@ -42,15 +48,16 @@ const PROTOCOL_VERSION: u32 = 5; pub struct FlowContext { pub node_id: Uuid, pub consensus_manager: Arc, - pub config: Config, + pub config: Arc, hub: Hub, orphans_pool: Arc>, shared_block_requests: Arc>>, transactions_spread: Arc>, shared_transaction_requests: Arc>>, is_ibd_running: Arc, // TODO: pass the context wrapped with Arc and avoid some of the internal ones - pub amgr: Arc>, + pub address_manager: Arc>, mining_manager: Arc, + notification_root: Arc, } pub struct IbdRunningGuard { @@ -84,23 +91,25 @@ impl Drop for RequestScope { impl FlowContext { pub fn new( consensus_manager: Arc, - amgr: Arc>, - config: &Config, + address_manager: Arc>, + config: Arc, mining_manager: Arc, + notification_root: Arc, ) -> Self { let hub = Hub::new(); Self { node_id: Uuid::new_v4(), consensus_manager, - config: config.clone(), + config, orphans_pool: Arc::new(AsyncRwLock::new(OrphanBlocksPool::new(MAX_ORPHANS))), shared_block_requests: Arc::new(Mutex::new(HashSet::new())), transactions_spread: Arc::new(AsyncRwLock::new(TransactionsSpread::new(hub.clone()))), shared_transaction_requests: Arc::new(Mutex::new(HashSet::new())), is_ibd_running: Arc::new(AtomicBool::default()), hub, - amgr, + address_manager, mining_manager, + notification_root, } } @@ -160,6 +169,22 @@ impl FlowContext { self.orphans_pool.write().await.unorphan_blocks(consensus, root).await } + /// Adds the given block to the DAG and propagates it. + pub async fn add_block(&self, consensus: &dyn ConsensusApi, block: Block) -> Result<(), ProtocolError> { + if block.transactions.is_empty() { + return Err(RuleError::NoTransactions)?; + } + let hash = block.hash(); + if let Err(err) = self.consensus().session().await.validate_and_insert_block(block.clone(), true).await { + warn!("Validation failed for block {}: {}", hash, err); + return Err(err)?; + } + self.on_new_block_template().await?; + self.on_new_block(consensus, block).await?; + self.hub.broadcast(make_message!(Payload::InvRelayBlock, InvRelayBlockMessage { hash: Some(hash.into()) })).await; + Ok(()) + } + /// Updates the mempool after a new block arrival, relays newly unorphaned transactions /// and possibly rebroadcast manually added transactions when not in IBD. /// @@ -192,7 +217,10 @@ impl FlowContext { pub async fn on_new_block_template(&self) -> Result<(), ProtocolError> { // Clear current template cache self.mining_manager().clear_block_template(); - // TODO: call a handler function or a predefined registered service + // TODO: better handle notification errors + self.notification_root + .notify(Notification::NewBlockTemplate(NewBlockTemplateNotification {})) + .map_err(|_| ProtocolError::Other("Notification error"))?; Ok(()) } @@ -290,7 +318,7 @@ impl ConnectionInitializer for FlowContext { } if router.is_outbound() { - self.amgr.lock().add_address(router.net_address().into()); + self.address_manager.lock().add_address(router.net_address().into()); } // Note: we deliberately do not hold the handshake in memory so at this point receivers for handshake subscriptions diff --git a/protocol/flows/src/service.rs b/protocol/flows/src/service.rs index ea65eb9a..a42f5ddb 100644 --- a/protocol/flows/src/service.rs +++ b/protocol/flows/src/service.rs @@ -67,7 +67,7 @@ impl AsyncService for P2pService { self.inbound_limit, self.dns_seeders, self.default_port, - self.flow_context.amgr.clone(), + self.flow_context.address_manager.clone(), ); // For now, attempt to connect to a running golang node diff --git a/protocol/flows/src/v5/address.rs b/protocol/flows/src/v5/address.rs index f095cba2..521a478c 100644 --- a/protocol/flows/src/v5/address.rs +++ b/protocol/flows/src/v5/address.rs @@ -56,7 +56,7 @@ impl ReceiveAddressesFlow { if address_list.len() > MAX_ADDRESSES_RECEIVE { return Err(ProtocolError::OtherOwned(format!("address count {} exceeded {}", address_list.len(), MAX_ADDRESSES_RECEIVE))); } - let mut amgr_lock = self.ctx.amgr.lock(); + let mut amgr_lock = self.ctx.address_manager.lock(); for (ip, port) in address_list { amgr_lock.add_address(NetAddress::new(ip, port)) } @@ -94,7 +94,7 @@ impl SendAddressesFlow { async fn start_impl(&mut self) -> Result<(), ProtocolError> { loop { dequeue!(self.incoming_route, Payload::RequestAddresses)?; - let addresses = self.ctx.amgr.lock().iterate_addresses().collect_vec(); + let addresses = self.ctx.address_manager.lock().iterate_addresses().collect_vec(); let address_list = addresses .choose_multiple(&mut rand::thread_rng(), MAX_ADDRESSES_SEND) .map(|addr| (addr.ip, addr.port).into()) diff --git a/protocol/p2p/Cargo.toml b/protocol/p2p/Cargo.toml index 0ebf3ad1..b4352555 100644 --- a/protocol/p2p/Cargo.toml +++ b/protocol/p2p/Cargo.toml @@ -38,12 +38,12 @@ tokio = { version = "1.21.2", features = [ "signal", ] } tokio-stream = { version = "0.1.11", features = ["net"] } -tonic = { version = "0.8.3", features = ["tls", "gzip"] } +tonic = { version = "0.9.1", features = ["tls", "gzip"] } h2 = "0.3" uuid = { version = "1.2.2", features = ["v4", "fast-rng"] } [build-dependencies] -tonic-build = { version = "0.8.3", features = ["prost"] } +tonic-build = { version = "0.9.1", features = ["prost"] } [dev-dependencies] hex = "0.4.3" diff --git a/protocol/p2p/src/core/connection_handler.rs b/protocol/p2p/src/core/connection_handler.rs index dc327e15..750e2d6d 100644 --- a/protocol/p2p/src/core/connection_handler.rs +++ b/protocol/p2p/src/core/connection_handler.rs @@ -32,6 +32,9 @@ pub enum ConnectionError { TonicStatus(#[from] TonicStatus), } +/// Maximum P2P decoded gRPC message size to send and receive +const P2P_MAX_MESSAGE_SIZE: usize = 1024 * 1024 * 1024; // 1GB + /// Handles Router creation for both server and client-side new connections #[derive(Clone)] pub struct ConnectionHandler { @@ -53,10 +56,9 @@ impl ConnectionHandler { tokio::spawn(async move { let proto_server = ProtoP2pServer::new(connection_handler) .accept_compressed(tonic::codec::CompressionEncoding::Gzip) - .send_compressed(tonic::codec::CompressionEncoding::Gzip); + .send_compressed(tonic::codec::CompressionEncoding::Gzip) + .max_decoding_message_size(P2P_MAX_MESSAGE_SIZE); - // TODO: set max message sizes to 1GB or less. Check if `builder().max_frame_size(frame_size)` is the correct setting (seems not). - // Seems like this important feature should be in the next tonic version: https://github.com/hyperium/tonic/pull/1274 // TODO: check whether we should set tcp_keepalive let serve_result = TonicServer::builder() .add_service(proto_server) @@ -76,7 +78,6 @@ impl ConnectionHandler { let Some(socket_address) = peer_address.to_socket_addrs()?.next() else { return Err(ConnectionError::NoAddress); }; let peer_address = format!("http://{}", peer_address); // Add scheme prefix as required by Tonic - // TODO: set max message sizes to 1GB or less. See comment above in server configuration. let channel = tonic::transport::Endpoint::new(peer_address)? .timeout(Duration::from_millis(Self::communication_timeout())) .connect_timeout(Duration::from_millis(Self::connect_timeout())) @@ -86,7 +87,8 @@ impl ConnectionHandler { let mut client = ProtoP2pClient::new(channel) .send_compressed(tonic::codec::CompressionEncoding::Gzip) - .accept_compressed(tonic::codec::CompressionEncoding::Gzip); + .accept_compressed(tonic::codec::CompressionEncoding::Gzip) + .max_decoding_message_size(P2P_MAX_MESSAGE_SIZE); let (outgoing_route, outgoing_receiver) = mpsc_channel(Self::outgoing_network_channel_size()); let incoming_stream = client.message_stream(ReceiverStream::new(outgoing_receiver)).await?.into_inner(); diff --git a/protocol/p2p/src/core/hub.rs b/protocol/p2p/src/core/hub.rs index 2ef47a6c..a0403b79 100644 --- a/protocol/p2p/src/core/hub.rs +++ b/protocol/p2p/src/core/hub.rs @@ -94,6 +94,11 @@ impl Hub { pub fn active_peers(&self) -> Vec { self.peers.read().values().map(|r| r.as_ref().into()).collect() } + + /// Returns whether there are currently active peers + pub fn has_peers(&self) -> bool { + !self.peers.read().is_empty() + } } impl Default for Hub { diff --git a/rpc/core/Cargo.toml b/rpc/core/Cargo.toml index dac7f91c..3ca83ceb 100644 --- a/rpc/core/Cargo.toml +++ b/rpc/core/Cargo.toml @@ -17,6 +17,7 @@ kaspa-utils.workspace = true kaspa-notify.workspace = true kaspa-index-core.workspace = true kaspa-txscript.workspace = true +kaspa-mining.workspace = true faster-hex.workspace = true serde.workspace = true derive_more.workspace = true diff --git a/rpc/core/src/api/rpc.rs b/rpc/core/src/api/rpc.rs index 581e0c71..2859bf50 100644 --- a/rpc/core/src/api/rpc.rs +++ b/rpc/core/src/api/rpc.rs @@ -136,7 +136,12 @@ where ) -> RpcResult; /// Requests blocks between a certain block `low_hash` up to this node's current virtual. - async fn get_blocks(&self, low_hash: RpcHash, include_blocks: bool, include_transactions: bool) -> RpcResult { + async fn get_blocks( + &self, + low_hash: Option, + include_blocks: bool, + include_transactions: bool, + ) -> RpcResult { self.get_blocks_call(GetBlocksRequest::new(low_hash, include_blocks, include_transactions)).await } async fn get_blocks_call(&self, request: GetBlocksRequest) -> RpcResult; diff --git a/rpc/core/src/convert/block.rs b/rpc/core/src/convert/block.rs index acb5515d..3f66b617 100644 --- a/rpc/core/src/convert/block.rs +++ b/rpc/core/src/convert/block.rs @@ -1,7 +1,7 @@ use std::sync::Arc; -use crate::{GetBlockTemplateResponse, RpcBlock, RpcError, RpcResult, RpcTransaction}; -use kaspa_consensus_core::block::{Block, BlockTemplate, MutableBlock}; +use crate::{RpcBlock, RpcError, RpcResult, RpcTransaction}; +use kaspa_consensus_core::block::{Block, MutableBlock}; // ---------------------------------------------------------------------------- // consensus_core to rpc_core @@ -28,17 +28,6 @@ impl From<&MutableBlock> for RpcBlock { } } -impl From<&BlockTemplate> for GetBlockTemplateResponse { - fn from(item: &BlockTemplate) -> Self { - Self { - block: (&item.block).into(), - // TODO: either call some Block.is_synced() if/when available or implement - // a functional equivalent here based on item.selected_parent_timestamp - is_synced: true, - } - } -} - // ---------------------------------------------------------------------------- // rpc_core to consensus_core // ---------------------------------------------------------------------------- diff --git a/rpc/core/src/convert/notification.rs b/rpc/core/src/convert/notification.rs index bc77c442..c18a0a0d 100644 --- a/rpc/core/src/convert/notification.rs +++ b/rpc/core/src/convert/notification.rs @@ -119,10 +119,9 @@ impl From<&index_notify::PruningPointUtxoSetOverrideNotification> for PruningPoi } impl From<&index_notify::UtxosChangedNotification> for UtxosChangedNotification { + // This is not intended to be ever called because no address prefix is available. + // Use kaspa_rpc_service::converter::index::IndexConverter instead. fn from(item: &index_notify::UtxosChangedNotification) -> Self { - Self { - added: Arc::new(utxo_set_into_rpc(&item.added, item.prefix)), - removed: Arc::new(utxo_set_into_rpc(&item.removed, item.prefix)), - } + Self { added: Arc::new(utxo_set_into_rpc(&item.added, None)), removed: Arc::new(utxo_set_into_rpc(&item.removed, None)) } } } diff --git a/rpc/core/src/convert/tx.rs b/rpc/core/src/convert/tx.rs index e4de6f8a..6eab66c9 100644 --- a/rpc/core/src/convert/tx.rs +++ b/rpc/core/src/convert/tx.rs @@ -1,5 +1,8 @@ -use crate::{RpcError, RpcResult, RpcTransaction, RpcTransactionInput, RpcTransactionOutput}; -use kaspa_consensus_core::tx::{Transaction, TransactionInput, TransactionOutput}; +use crate::{RpcAcceptedTransactionIds, RpcError, RpcResult, RpcTransaction, RpcTransactionInput, RpcTransactionOutput}; +use kaspa_consensus_core::{ + acceptance_data::MergesetBlockAcceptanceData, + tx::{Transaction, TransactionInput, TransactionOutput}, +}; // ---------------------------------------------------------------------------- // consensus_core to rpc_core @@ -45,6 +48,15 @@ impl From<&TransactionInput> for RpcTransactionInput { } } +impl From<&MergesetBlockAcceptanceData> for RpcAcceptedTransactionIds { + fn from(item: &MergesetBlockAcceptanceData) -> Self { + Self { + accepting_block_hash: item.block_hash, + accepted_transaction_ids: item.accepted_transactions.iter().map(|tx| tx.transaction_id).collect(), + } + } +} + // ---------------------------------------------------------------------------- // rpc_core to consensus_core // ---------------------------------------------------------------------------- diff --git a/rpc/core/src/convert/utxo.rs b/rpc/core/src/convert/utxo.rs index 6741b055..305fb093 100644 --- a/rpc/core/src/convert/utxo.rs +++ b/rpc/core/src/convert/utxo.rs @@ -8,10 +8,10 @@ use kaspa_txscript::extract_script_pub_key_address; // index to rpc_core // ---------------------------------------------------------------------------- -pub fn utxo_set_into_rpc(item: &UtxoSetByScriptPublicKey, prefix: Prefix) -> Vec { +pub fn utxo_set_into_rpc(item: &UtxoSetByScriptPublicKey, prefix: Option) -> Vec { item.iter() .flat_map(|(script_public_key, utxo_collection)| { - let address = extract_script_pub_key_address(script_public_key, prefix).ok(); + let address = prefix.and_then(|x| extract_script_pub_key_address(script_public_key, x).ok()); utxo_collection .iter() .map(|(outpoint, entry)| RpcUtxosByAddressesEntry { diff --git a/rpc/core/src/error.rs b/rpc/core/src/error.rs index 43a27796..58ef2165 100644 --- a/rpc/core/src/error.rs +++ b/rpc/core/src/error.rs @@ -1,6 +1,9 @@ +use kaspa_consensus_core::tx::TransactionId; use std::num::TryFromIntError; use thiserror::Error; +use crate::{RpcHash, RpcTransactionId}; + #[derive(Clone, Debug, Error)] pub enum RpcError { #[error("Not implemented")] @@ -30,6 +33,21 @@ pub enum RpcError { #[error("Primitive to enum conversion error")] PrimitiveToEnumConversionError, + #[error("Coinbase payload is above max length ({0}). Try to shorten the extra data.")] + CoinbasePayloadLengthAboveMax(usize), + + #[error("Rejected transaction {0}: {1}")] + RejectedTransaction(RpcTransactionId, String), + + #[error("Block {0} is invalid. No verbose data can be built.")] + InvalidBlock(RpcHash), + + #[error("If includeTransactions is set, then includeBlockVerboseData must be set as well.")] + InvalidGetBlocksRequest, + + #[error("Transaction {0} not found")] + TransactionNotFound(TransactionId), + #[error(transparent)] AddressError(#[from] kaspa_addresses::AddressError), @@ -39,6 +57,15 @@ pub enum RpcError { #[error(transparent)] NotificationError(#[from] kaspa_notify::error::Error), + #[error(transparent)] + MiningManagerError(#[from] kaspa_mining::errors::MiningManagerError), + + #[error(transparent)] + ConsensusError(#[from] kaspa_consensus_core::errors::consensus::ConsensusError), + + #[error(transparent)] + ScriptClassError(#[from] kaspa_txscript::script_class::Error), + #[error("{0}")] General(String), } diff --git a/rpc/core/src/model/message.rs b/rpc/core/src/model/message.rs index c7285c66..150a1960 100644 --- a/rpc/core/src/model/message.rs +++ b/rpc/core/src/model/message.rs @@ -1,5 +1,6 @@ use crate::model::*; use borsh::{BorshDeserialize, BorshSchema, BorshSerialize}; +use kaspa_consensus_core::sync_info::SyncInfo; use kaspa_notify::subscription::{single::UtxosChangedSubscription, Command}; use serde::{Deserialize, Serialize}; use std::{ @@ -93,6 +94,7 @@ pub struct GetBlockTemplateResponse { /// chance the block will never be accepted, thus the solving effort would have been wasted. pub is_synced: bool, } + /// GetBlockRequest requests information about a specific block #[derive(Clone, Debug, Serialize, Deserialize, BorshSerialize, BorshDeserialize, BorshSchema)] #[serde(rename_all = "camelCase")] @@ -142,6 +144,12 @@ pub struct GetCurrentNetworkResponse { pub network: RpcNetworkType, } +impl GetCurrentNetworkResponse { + pub fn new(network: RpcNetworkType) -> Self { + Self { network } + } +} + #[derive(Clone, Debug, Serialize, Deserialize, BorshSerialize, BorshDeserialize, BorshSchema)] #[serde(rename_all = "camelCase")] pub struct GetPeerAddressesRequest {} @@ -344,13 +352,13 @@ impl GetVirtualChainFromBlockResponse { #[derive(Clone, Debug, Serialize, Deserialize, BorshSerialize, BorshDeserialize, BorshSchema)] #[serde(rename_all = "camelCase")] pub struct GetBlocksRequest { - pub low_hash: RpcHash, + pub low_hash: Option, pub include_blocks: bool, pub include_transactions: bool, } impl GetBlocksRequest { - pub fn new(low_hash: RpcHash, include_blocks: bool, include_transactions: bool) -> Self { + pub fn new(low_hash: Option, include_blocks: bool, include_transactions: bool) -> Self { Self { low_hash, include_blocks, include_transactions } } } @@ -372,18 +380,7 @@ impl GetBlocksResponse { #[serde(rename_all = "camelCase")] pub struct GetBlockCountRequest {} -#[derive(Clone, Debug, Serialize, Deserialize, BorshSerialize, BorshDeserialize, BorshSchema)] -#[serde(rename_all = "camelCase")] -pub struct GetBlockCountResponse { - pub block_count: u64, - pub header_count: u64, -} - -impl GetBlockCountResponse { - pub fn new(block_count: u64, header_count: u64) -> Self { - Self { block_count, header_count } - } -} +pub type GetBlockCountResponse = SyncInfo; #[derive(Clone, Debug, Serialize, Deserialize, BorshSerialize, BorshDeserialize, BorshSchema)] #[serde(rename_all = "camelCase")] diff --git a/rpc/core/src/model/script_class.rs b/rpc/core/src/model/script_class.rs index aca0793d..5040ae4f 100644 --- a/rpc/core/src/model/script_class.rs +++ b/rpc/core/src/model/script_class.rs @@ -1,69 +1,3 @@ -use crate::RpcError; -use borsh::{BorshDeserialize, BorshSchema, BorshSerialize}; -use serde::{Deserialize, Serialize}; -use std::{ - fmt::{Display, Formatter}, - str::FromStr, -}; +use kaspa_txscript::script_class::ScriptClass; -// TODO: in the future it should be a newtype of ScriptClass, that will be probably a type -// associated with the script engine -#[derive(Clone, Debug, Serialize, Deserialize, BorshSerialize, BorshDeserialize, BorshSchema)] -pub enum RpcScriptClass { - /// None of the recognized forms. - NonStandardTy = 0, - - /// Pay to pubkey. - PubKeyTy = 1, - - /// Pay to pubkey ECDSA. - PubKeyECDSATy = 2, - - /// Pay to script hash. - ScriptHashTy = 3, -} - -const NON_STANDARD_TY: &str = "nonstandard"; -const PUB_KEY_TY: &str = "pubkey"; -const PUB_KEY_ECDSA_TY: &str = "pubkeyecdsa"; -const SCRIPT_HASH_TY: &str = "scripthash"; - -impl RpcScriptClass { - fn as_str(&self) -> &'static str { - match self { - RpcScriptClass::NonStandardTy => NON_STANDARD_TY, - RpcScriptClass::PubKeyTy => PUB_KEY_TY, - RpcScriptClass::PubKeyECDSATy => PUB_KEY_ECDSA_TY, - RpcScriptClass::ScriptHashTy => SCRIPT_HASH_TY, - } - } -} - -impl Display for RpcScriptClass { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - f.write_str(self.as_str()) - } -} - -impl FromStr for RpcScriptClass { - type Err = RpcError; - - fn from_str(script_class: &str) -> Result { - match script_class { - NON_STANDARD_TY => Ok(RpcScriptClass::NonStandardTy), - PUB_KEY_TY => Ok(RpcScriptClass::PubKeyTy), - PUB_KEY_ECDSA_TY => Ok(RpcScriptClass::PubKeyECDSATy), - SCRIPT_HASH_TY => Ok(RpcScriptClass::ScriptHashTy), - - _ => Err(RpcError::InvalidRpcScriptClass(script_class.to_string())), - } - } -} - -impl TryFrom<&str> for RpcScriptClass { - type Error = RpcError; - - fn try_from(script_class: &str) -> Result { - script_class.parse() - } -} +pub type RpcScriptClass = ScriptClass; diff --git a/rpc/core/src/model/tx.rs b/rpc/core/src/model/tx.rs index 97fcbcae..7088f184 100644 --- a/rpc/core/src/model/tx.rs +++ b/rpc/core/src/model/tx.rs @@ -1,4 +1,5 @@ use borsh::{BorshDeserialize, BorshSchema, BorshSerialize}; +use kaspa_addresses::Address; use kaspa_consensus_core::tx::{ScriptPublicKey, ScriptVec, TransactionId, TransactionOutpoint, UtxoEntry}; use serde::{Deserialize, Serialize}; @@ -44,9 +45,7 @@ pub struct RpcTransactionOutput { #[serde(rename_all = "camelCase")] pub struct RpcTransactionOutputVerboseData { pub script_public_key_type: RpcScriptClass, - - // TODO: change the type of this field for a better binary representation - pub script_public_key_address: String, + pub script_public_key_address: Address, } /// Represents a Kaspa transaction diff --git a/rpc/core/src/notify/collector.rs b/rpc/core/src/notify/collector.rs index 561fe33f..8cc6a548 100644 --- a/rpc/core/src/notify/collector.rs +++ b/rpc/core/src/notify/collector.rs @@ -1,7 +1,9 @@ use crate::Notification; -use kaspa_notify::collector::CollectorFrom; +use kaspa_notify::{collector::CollectorFrom, converter::ConverterFrom}; + +pub type RpcCoreConverter = ConverterFrom; /// A rpc_core notification collector providing a simple pass-through. /// No conversion occurs since both source and target data are of /// type [`Notification`]. -pub type RpcCoreCollector = CollectorFrom; +pub type RpcCoreCollector = CollectorFrom; diff --git a/rpc/grpc/client/Cargo.toml b/rpc/grpc/client/Cargo.toml index e7216155..07c37567 100644 --- a/rpc/grpc/client/Cargo.toml +++ b/rpc/grpc/client/Cargo.toml @@ -20,7 +20,7 @@ rand.workspace = true regex.workspace = true async-trait = "0.1.57" futures = { version = "0.3" } -tonic = { version = "0.8", features = ["gzip"] } +tonic = { version = "0.9", features = ["gzip"] } prost = { version = "0.11" } h2 = "0.3" tokio = { version = "1.0", features = ["rt-multi-thread", "macros", "sync", "time"] } diff --git a/rpc/grpc/client/src/lib.rs b/rpc/grpc/client/src/lib.rs index 565f5994..492fa6fe 100644 --- a/rpc/grpc/client/src/lib.rs +++ b/rpc/grpc/client/src/lib.rs @@ -14,6 +14,7 @@ use kaspa_core::{debug, trace}; use kaspa_grpc_core::{ channel::NotificationChannel, protowire::{kaspad_request, rpc_client::RpcClient, GetInfoRequestMessage, KaspadRequest, KaspadResponse}, + RPC_MAX_MESSAGE_SIZE, }; use kaspa_notify::{ error::Result as NotifyResult, @@ -30,7 +31,10 @@ use kaspa_rpc_core::{ error::RpcError, error::RpcResult, model::message::*, - notify::{collector::RpcCoreCollector, connection::ChannelConnection}, + notify::{ + collector::{RpcCoreCollector, RpcCoreConverter}, + connection::ChannelConnection, + }, Notification, NotificationSender, }; use kaspa_utils::triggers::DuplexTrigger; @@ -74,7 +78,8 @@ impl GrpcClient { let inner = Inner::connect(address, reconnect, notify_channel.sender(), connection_event_sender, override_handle_stop_notify).await?; let core_events = EVENT_TYPE_ARRAY[..].into(); - let collector = Arc::new(RpcCoreCollector::new(notify_channel.receiver())); + let converter = Arc::new(RpcCoreConverter::new()); + let collector = Arc::new(RpcCoreCollector::new(notify_channel.receiver(), converter)); let subscriber = Arc::new(Subscriber::new(core_events, inner.clone(), 0)); let notifier = Arc::new(Notifier::new(core_events, vec![collector], vec![subscriber], 10, GRPC_CLIENT)); @@ -359,8 +364,10 @@ impl Inner { .connect() .await?; - let mut client = - RpcClient::new(channel).send_compressed(CompressionEncoding::Gzip).accept_compressed(CompressionEncoding::Gzip); + let mut client = RpcClient::new(channel) + .send_compressed(CompressionEncoding::Gzip) + .accept_compressed(CompressionEncoding::Gzip) + .max_decoding_message_size(RPC_MAX_MESSAGE_SIZE); // Force the opening of the stream when connected to a go kaspad server. // This is also needed for querying server capabilities. diff --git a/rpc/grpc/core/Cargo.toml b/rpc/grpc/core/Cargo.toml index 77adff37..8e60821e 100644 --- a/rpc/grpc/core/Cargo.toml +++ b/rpc/grpc/core/Cargo.toml @@ -18,7 +18,7 @@ log.workspace = true rand.workspace = true async-trait = "0.1.57" futures = { version = "0.3" } -tonic = { version = "0.8", features = ["gzip"] } +tonic = { version = "0.9", features = ["gzip"] } prost = { version = "0.11" } h2 = "0.3" tokio = { version = "1.0", features = ["rt-multi-thread", "macros", "sync", "time"] } @@ -29,4 +29,4 @@ paste = "1.0.11" regex.workspace = true [build-dependencies] -tonic-build = { version = "0.8" } +tonic-build = { version = "0.9" } diff --git a/rpc/grpc/core/src/convert/message.rs b/rpc/grpc/core/src/convert/message.rs index afb9c17c..4fa39367 100644 --- a/rpc/grpc/core/src/convert/message.rs +++ b/rpc/grpc/core/src/convert/message.rs @@ -216,7 +216,11 @@ from!(_item: RpcResult<&kaspa_rpc_core::GetVirtualChainFromBlockResponse>, proto }); from!(item: &kaspa_rpc_core::GetBlocksRequest, protowire::GetBlocksRequestMessage, { - Self { low_hash: item.low_hash.to_string(), include_blocks: item.include_blocks, include_transactions: item.include_transactions } + Self { + low_hash: item.low_hash.map_or(Default::default(), |x| x.to_string()), + include_blocks: item.include_blocks, + include_transactions: item.include_transactions, + } }); from!(item: RpcResult<&kaspa_rpc_core::GetBlocksResponse>, protowire::GetBlocksResponseMessage, { Self { @@ -537,7 +541,7 @@ try_from!(_item: &protowire::GetVirtualChainFromBlockResponseMessage, RpcResult< try_from!(item: &protowire::GetBlocksRequestMessage, kaspa_rpc_core::GetBlocksRequest, { Self { - low_hash: RpcHash::from_str(&item.low_hash)?, + low_hash: if item.low_hash.is_empty() { None } else { Some(RpcHash::from_str(&item.low_hash)?) }, include_blocks: item.include_blocks, include_transactions: item.include_transactions, } diff --git a/rpc/grpc/core/src/convert/tx.rs b/rpc/grpc/core/src/convert/tx.rs index 722ea609..91eecb55 100644 --- a/rpc/grpc/core/src/convert/tx.rs +++ b/rpc/grpc/core/src/convert/tx.rs @@ -70,7 +70,7 @@ from!(&kaspa_rpc_core::RpcTransactionInputVerboseData, protowire::RpcTransaction from!(item: &kaspa_rpc_core::RpcTransactionOutputVerboseData, protowire::RpcTransactionOutputVerboseData, { Self { script_public_key_type: item.script_public_key_type.to_string(), - script_public_key_address: item.script_public_key_address.clone(), + script_public_key_address: (&item.script_public_key_address).into(), } }); @@ -176,7 +176,7 @@ try_from!(&protowire::RpcTransactionInputVerboseData, kaspa_rpc_core::RpcTransac try_from!(item: &protowire::RpcTransactionOutputVerboseData, kaspa_rpc_core::RpcTransactionOutputVerboseData, { Self { script_public_key_type: item.script_public_key_type.as_str().try_into()?, - script_public_key_address: item.script_public_key_address.clone(), + script_public_key_address: item.script_public_key_address.as_str().try_into()?, } }); diff --git a/rpc/grpc/core/src/lib.rs b/rpc/grpc/core/src/lib.rs index a33faacc..8ec826c5 100644 --- a/rpc/grpc/core/src/lib.rs +++ b/rpc/grpc/core/src/lib.rs @@ -3,6 +3,9 @@ pub mod convert; pub mod ext; pub mod macros; +/// Maximum decoded gRPC message size to send and receive +pub const RPC_MAX_MESSAGE_SIZE: usize = 1024 * 1024 * 1024; // 1GB + pub mod protowire { tonic::include_proto!("protowire"); } diff --git a/rpc/grpc/server/Cargo.toml b/rpc/grpc/server/Cargo.toml index 2cb590ee..33742f97 100644 --- a/rpc/grpc/server/Cargo.toml +++ b/rpc/grpc/server/Cargo.toml @@ -20,7 +20,7 @@ log.workspace = true rand.workspace = true async-trait = "0.1.57" futures = { version = "0.3" } -tonic = { version = "0.8", features = ["gzip"] } +tonic = { version = "0.9", features = ["gzip"] } prost = { version = "0.11" } h2 = "0.3" tokio = { version = "1.0", features = ["rt-multi-thread", "macros", "sync", "time"] } diff --git a/rpc/grpc/server/src/collector.rs b/rpc/grpc/server/src/collector.rs index 39b96b58..7359c2e8 100644 --- a/rpc/grpc/server/src/collector.rs +++ b/rpc/grpc/server/src/collector.rs @@ -1,4 +1,5 @@ -use kaspa_notify::collector::CollectorFrom; +use kaspa_notify::{collector::CollectorFrom, converter::ConverterFrom}; use kaspa_rpc_core::Notification; -pub type GrpcServiceCollector = CollectorFrom; +pub type GrpcServiceConverter = ConverterFrom; +pub type GrpcServiceCollector = CollectorFrom; diff --git a/rpc/grpc/server/src/lib.rs b/rpc/grpc/server/src/lib.rs index 12a3454f..c1da3da8 100644 --- a/rpc/grpc/server/src/lib.rs +++ b/rpc/grpc/server/src/lib.rs @@ -3,7 +3,7 @@ use kaspa_core::{ task::service::{AsyncService, AsyncServiceError, AsyncServiceFuture}, trace, }; -use kaspa_grpc_core::protowire::rpc_server::RpcServer; +use kaspa_grpc_core::{protowire::rpc_server::RpcServer, RPC_MAX_MESSAGE_SIZE}; use kaspa_rpc_service::service::RpcCoreService; use kaspa_utils::triggers::DuplexTrigger; use std::net::SocketAddr; @@ -55,7 +55,8 @@ impl AsyncService for GrpcServer { // Create a protowire RPC server let svc = RpcServer::from_arc(self.grpc_service.clone()) .send_compressed(CompressionEncoding::Gzip) - .accept_compressed(CompressionEncoding::Gzip); + .accept_compressed(CompressionEncoding::Gzip) + .max_decoding_message_size(RPC_MAX_MESSAGE_SIZE); // Start the tonic gRPC server info!("Grpc server starting on: {}", address); diff --git a/rpc/grpc/server/src/service.rs b/rpc/grpc/server/src/service.rs index 9db298c2..d43e4ece 100644 --- a/rpc/grpc/server/src/service.rs +++ b/rpc/grpc/server/src/service.rs @@ -1,5 +1,5 @@ use crate::{ - collector::GrpcServiceCollector, + collector::{GrpcServiceCollector, GrpcServiceConverter}, connection::{GrpcConnection, GrpcConnectionManager, GrpcSender}, StatusResult, }; @@ -74,7 +74,8 @@ impl GrpcService { // Prepare internals let core_events = EVENT_TYPE_ARRAY[..].into(); - let collector = Arc::new(GrpcServiceCollector::new(core_channel.receiver())); + let converter = Arc::new(GrpcServiceConverter::new()); + let collector = Arc::new(GrpcServiceCollector::new(core_channel.receiver(), converter)); let subscriber = Arc::new(Subscriber::new(core_events, core_service.notifier(), core_listener_id)); let notifier: Arc> = Arc::new(Notifier::new(core_events, vec![collector], vec![subscriber], 10, GRPC_SERVER)); diff --git a/rpc/service/Cargo.toml b/rpc/service/Cargo.toml index 553449a7..269aba71 100644 --- a/rpc/service/Cargo.toml +++ b/rpc/service/Cargo.toml @@ -17,6 +17,10 @@ kaspa-notify.workspace = true kaspa-index-core.workspace = true kaspa-txscript.workspace = true kaspa-consensusmanager.workspace = true +kaspa-mining.workspace = true +kaspa-addresses.workspace = true +kaspa-p2p-flows.workspace = true +kaspa-math.workspace = true log.workspace = true async-trait = "0.1.57" diff --git a/rpc/service/src/collector.rs b/rpc/service/src/collector.rs index 18e3e89f..4ac016f2 100644 --- a/rpc/service/src/collector.rs +++ b/rpc/service/src/collector.rs @@ -1,7 +1,6 @@ -use kaspa_consensus_notify::notification::Notification as ConsensusNotification; -use kaspa_index_core::notification::Notification as IndexNotification; +use crate::converter::{consensus::ConsensusConverter, index::IndexConverter}; use kaspa_notify::collector::CollectorFrom; -use kaspa_rpc_core::Notification; -pub(crate) type CollectorFromConsensus = CollectorFrom; -pub(crate) type CollectorFromIndex = CollectorFrom; +pub(crate) type CollectorFromConsensus = CollectorFrom; + +pub(crate) type CollectorFromIndex = CollectorFrom; diff --git a/rpc/service/src/converter/consensus.rs b/rpc/service/src/converter/consensus.rs new file mode 100644 index 00000000..dc03be00 --- /dev/null +++ b/rpc/service/src/converter/consensus.rs @@ -0,0 +1,202 @@ +use async_trait::async_trait; +use kaspa_addresses::Address; +use kaspa_consensus_core::{ + api::ConsensusApi, + block::Block, + config::Config, + hashing::tx::hash, + header::Header, + tx::{MutableTransaction, Transaction, TransactionId, TransactionInput, TransactionOutput}, + ChainPath, +}; +use kaspa_consensus_notify::notification::{self as consensus_notify, Notification as ConsensusNotification}; +use kaspa_consensusmanager::ConsensusManager; +use kaspa_math::Uint256; +use kaspa_mining::model::{owner_txs::OwnerTransactions, TransactionIdSet}; +use kaspa_notify::converter::Converter; +use kaspa_rpc_core::{ + BlockAddedNotification, Notification, RpcAcceptedTransactionIds, RpcBlock, RpcBlockVerboseData, RpcHash, RpcMempoolEntry, + RpcMempoolEntryByAddress, RpcResult, RpcTransaction, RpcTransactionInput, RpcTransactionOutput, RpcTransactionOutputVerboseData, + RpcTransactionVerboseData, +}; +use kaspa_txscript::{extract_script_pub_key_address, script_class::ScriptClass}; +use std::{collections::HashMap, fmt::Debug, ops::Deref, sync::Arc}; + +/// Conversion of consensus_core to rpc_core structures +pub struct ConsensusConverter { + consensus_manager: Arc, + config: Arc, +} + +impl ConsensusConverter { + pub fn new(consensus_manager: Arc, config: Arc) -> Self { + Self { consensus_manager, config } + } + + /// Returns the proof-of-work difficulty as a multiple of the minimum difficulty using + /// the passed bits field from the header of a block. + pub fn get_difficulty_ratio(&self, bits: u32) -> f64 { + // The minimum difficulty is the max possible proof-of-work limit bits + // converted back to a number. Note this is not the same as the proof of + // work limit directly because the block difficulty is encoded in a block + // with the compact form which loses precision. + let target = Uint256::from_compact_target_bits(bits); + self.config.params.max_difficulty_f64 / target.as_f64() + } + + /// Converts a consensus [`Block`] into an [`RpcBlock`], optionally including transaction verbose data. + /// + /// _GO-KASPAD: PopulateBlockWithVerboseData_ + pub fn get_block( + &self, + consensus: &dyn ConsensusApi, + block: &Block, + include_transaction_verbose_data: bool, + ) -> RpcResult { + let hash = block.hash(); + let ghostdag_data = consensus.get_ghostdag_data(hash)?; + let block_status = consensus.get_block_status(hash).unwrap(); + let children = consensus.get_block_children(hash).unwrap_or_default(); + let is_chain_block = consensus.is_chain_block(hash)?; + let verbose_data = Some(RpcBlockVerboseData { + hash, + difficulty: self.get_difficulty_ratio(block.header.bits), + selected_parent_hash: ghostdag_data.selected_parent, + transaction_ids: block.transactions.iter().map(|x| x.id()).collect(), + is_header_only: block_status.is_header_only(), + blue_score: ghostdag_data.blue_score, + children_hashes: (*children).clone(), + merge_set_blues_hashes: ghostdag_data.mergeset_blues, + merge_set_reds_hashes: ghostdag_data.mergeset_reds, + is_chain_block, + }); + + let transactions = block + .transactions + .iter() + .map(|x| self.get_transaction(consensus, x, Some(&block.header), include_transaction_verbose_data)) + .collect::>(); + + Ok(RpcBlock { header: (*block.header).clone(), transactions, verbose_data }) + } + + pub fn get_mempool_entry(&self, consensus: &dyn ConsensusApi, transaction: &MutableTransaction) -> RpcMempoolEntry { + let is_orphan = !transaction.is_fully_populated(); + let rpc_transaction = self.get_transaction(consensus, &transaction.tx, None, true); + RpcMempoolEntry::new(transaction.calculated_fee.unwrap_or_default(), rpc_transaction, is_orphan) + } + + pub fn get_mempool_entries_by_address( + &self, + consensus: &dyn ConsensusApi, + address: Address, + owner_transactions: &OwnerTransactions, + transactions: &HashMap, + ) -> RpcMempoolEntryByAddress { + let sending = self.get_owner_entries(consensus, &owner_transactions.sending_txs, transactions); + let receiving = self.get_owner_entries(consensus, &owner_transactions.receiving_txs, transactions); + RpcMempoolEntryByAddress::new(address, sending, receiving) + } + + pub fn get_owner_entries( + &self, + consensus: &dyn ConsensusApi, + transaction_ids: &TransactionIdSet, + transactions: &HashMap, + ) -> Vec { + transaction_ids.iter().map(|x| self.get_mempool_entry(consensus, transactions.get(x).expect("transaction exists"))).collect() + } + + /// Converts a consensus [`Transaction`] into an [`RpcTransaction`], optionally including verbose data. + /// + /// _GO-KASPAD: PopulateTransactionWithVerboseData + pub fn get_transaction( + &self, + consensus: &dyn ConsensusApi, + transaction: &Transaction, + header: Option<&Header>, + include_verbose_data: bool, + ) -> RpcTransaction { + if include_verbose_data { + let verbose_data = Some(RpcTransactionVerboseData { + transaction_id: transaction.id(), + hash: hash(transaction), + mass: consensus.calculate_transaction_mass(transaction), + // TODO: make block_hash an option + block_hash: header.map_or_else(RpcHash::default, |x| x.hash), + block_time: header.map_or(0, |x| x.timestamp), + }); + RpcTransaction { + version: transaction.version, + inputs: transaction.inputs.iter().map(|x| self.get_transaction_input(x)).collect(), + outputs: transaction.outputs.iter().map(|x| self.get_transaction_output(x)).collect(), + lock_time: transaction.lock_time, + subnetwork_id: transaction.subnetwork_id.clone(), + gas: transaction.gas, + payload: transaction.payload.clone(), + verbose_data, + } + } else { + transaction.into() + } + } + + fn get_transaction_input(&self, input: &TransactionInput) -> RpcTransactionInput { + input.into() + } + + fn get_transaction_output(&self, output: &TransactionOutput) -> RpcTransactionOutput { + let script_public_key_type = ScriptClass::from_script(&output.script_public_key); + let address = extract_script_pub_key_address(&output.script_public_key, self.config.prefix()).ok(); + let verbose_data = + address.map(|address| RpcTransactionOutputVerboseData { script_public_key_type, script_public_key_address: address }); + RpcTransactionOutput { value: output.value, script_public_key: output.script_public_key.clone(), verbose_data } + } + + pub fn get_virtual_chain_accepted_transaction_ids( + &self, + consensus: &dyn ConsensusApi, + chain_path: &ChainPath, + ) -> RpcResult> { + let acceptance_data = consensus.get_blocks_acceptance_data(&chain_path.added).unwrap(); + Ok(chain_path + .added + .iter() + .zip(acceptance_data.iter()) + .map(|(hash, block_data)| RpcAcceptedTransactionIds { + accepting_block_hash: hash.to_owned(), + accepted_transaction_ids: block_data + .iter() + .flat_map(|x| x.accepted_transactions.iter().map(|tx| tx.transaction_id)) + .collect(), + }) + .collect()) + } +} + +#[async_trait] +impl Converter for ConsensusConverter { + type Incoming = ConsensusNotification; + type Outgoing = Notification; + + async fn convert(&self, incoming: ConsensusNotification) -> Notification { + match incoming { + consensus_notify::Notification::BlockAdded(msg) => { + let consensus = self.consensus_manager.consensus(); + let session = consensus.session().await; + + // If get_block fails, rely on the infallible From implementation which will lack verbose data + let block = Arc::new(self.get_block(session.deref(), &msg.block, true).unwrap_or_else(|_| (&msg.block).into())); + + Notification::BlockAdded(BlockAddedNotification { block }) + } + _ => (&incoming).into(), + } + } +} + +impl Debug for ConsensusConverter { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ConsensusConverter").field("consensus_manager", &"").field("config", &self.config).finish() + } +} diff --git a/rpc/service/src/converter/index.rs b/rpc/service/src/converter/index.rs new file mode 100644 index 00000000..9d520aab --- /dev/null +++ b/rpc/service/src/converter/index.rs @@ -0,0 +1,44 @@ +use async_trait::async_trait; +use kaspa_consensus_core::config::Config; +use kaspa_index_core::indexed_utxos::UtxoSetByScriptPublicKey; +use kaspa_index_core::notification::{self as index_notify, Notification as IndexNotification}; +use kaspa_notify::converter::Converter; +use kaspa_rpc_core::utxo::utxo_set_into_rpc; +use kaspa_rpc_core::{Notification, RpcUtxosByAddressesEntry, UtxosChangedNotification}; +use std::sync::Arc; + +/// Conversion of consensus_core to rpc_core structures +#[derive(Debug)] +pub struct IndexConverter { + config: Arc, +} + +impl IndexConverter { + pub fn new(config: Arc) -> Self { + Self { config } + } + + pub fn get_utxo_changed_notification(&self, utxo_changed: index_notify::UtxosChangedNotification) -> UtxosChangedNotification { + UtxosChangedNotification { + added: Arc::new(self.get_utxos_by_addresses_entries(&utxo_changed.added)), + removed: Arc::new(self.get_utxos_by_addresses_entries(&utxo_changed.removed)), + } + } + + pub fn get_utxos_by_addresses_entries(&self, item: &UtxoSetByScriptPublicKey) -> Vec { + utxo_set_into_rpc(item, Some(self.config.prefix())) + } +} + +#[async_trait] +impl Converter for IndexConverter { + type Incoming = IndexNotification; + type Outgoing = Notification; + + async fn convert(&self, incoming: IndexNotification) -> Notification { + match incoming { + index_notify::Notification::UtxosChanged(msg) => Notification::UtxosChanged(self.get_utxo_changed_notification(msg)), + _ => (&incoming).into(), + } + } +} diff --git a/rpc/service/src/converter/mod.rs b/rpc/service/src/converter/mod.rs new file mode 100644 index 00000000..d65ea89e --- /dev/null +++ b/rpc/service/src/converter/mod.rs @@ -0,0 +1,2 @@ +pub mod consensus; +pub mod index; diff --git a/rpc/service/src/lib.rs b/rpc/service/src/lib.rs index d5cfe109..30a3742f 100644 --- a/rpc/service/src/lib.rs +++ b/rpc/service/src/lib.rs @@ -1,3 +1,5 @@ +use self::service::RpcCoreService; +use kaspa_consensus_core::config::Config; use kaspa_consensus_notify::notifier::ConsensusNotifier; use kaspa_consensusmanager::ConsensusManager; use kaspa_core::{ @@ -5,12 +7,13 @@ use kaspa_core::{ trace, }; use kaspa_index_core::notifier::IndexNotifier; +use kaspa_mining::manager::MiningManager; +use kaspa_p2p_flows::flow_context::FlowContext; use kaspa_utils::triggers::DuplexTrigger; use std::sync::Arc; -use self::service::RpcCoreService; - pub mod collector; +pub mod converter; pub mod service; const RPC_CORE_SERVICE: &str = "rpc-core-service"; @@ -26,8 +29,12 @@ impl RpcCoreServer { consensus_manager: Arc, consensus_notifier: Arc, index_notifier: Option>, + mining_manager: Arc, + flow_context: Arc, + config: Arc, ) -> Self { - let service = Arc::new(RpcCoreService::new(consensus_manager, consensus_notifier, index_notifier)); + let service = + Arc::new(RpcCoreService::new(consensus_manager, consensus_notifier, index_notifier, mining_manager, flow_context, config)); Self { service, shutdown: DuplexTrigger::default() } } diff --git a/rpc/service/src/service.rs b/rpc/service/src/service.rs index 817a21ef..3c38d32a 100644 --- a/rpc/service/src/service.rs +++ b/rpc/service/src/service.rs @@ -1,36 +1,37 @@ //! Core server implementation for ClientAPI use super::collector::{CollectorFromConsensus, CollectorFromIndex}; +use crate::converter::{consensus::ConsensusConverter, index::IndexConverter}; use async_trait::async_trait; -use kaspa_consensus_core::{block::Block, coinbase::MinerData}; +use kaspa_consensus_core::{ + block::Block, + coinbase::MinerData, + config::Config, + tx::{Transaction, COINBASE_TRANSACTION_INDEX}, +}; use kaspa_consensus_notify::{ notifier::ConsensusNotifier, {connection::ConsensusChannelConnection, notification::Notification as ConsensusNotification}, }; use kaspa_consensusmanager::ConsensusManager; -use kaspa_core::{info, trace}; -use kaspa_hashes::Hash; +use kaspa_core::{debug, info, trace, version::version, warn}; use kaspa_index_core::{connection::IndexChannelConnection, notification::Notification as IndexNotification, notifier::IndexNotifier}; +use kaspa_mining::{manager::MiningManager, mempool::tx::Orphan}; use kaspa_notify::{ collector::DynCollector, events::{EventSwitches, EventType, EVENT_TYPE_ARRAY}, listener::ListenerId, - notifier::{Notifier, Notify}, + notifier::Notifier, scope::Scope, subscriber::{Subscriber, SubscriptionManager}, }; -use kaspa_rpc_core::{ - api::rpc::RpcApi, model::*, notify::connection::ChannelConnection, FromRpcHex, Notification, RpcError, RpcResult, -}; +use kaspa_p2p_flows::flow_context::FlowContext; +use kaspa_rpc_core::{api::rpc::RpcApi, model::*, notify::connection::ChannelConnection, Notification, RpcError, RpcResult}; +use kaspa_txscript::{extract_script_pub_key_address, pay_to_address_script}; use kaspa_utils::channel::Channel; -use std::{ - str::FromStr, - sync::Arc, - time::{SystemTime, UNIX_EPOCH}, - vec, -}; +use std::{iter::once, ops::Deref, sync::Arc, vec}; -/// A service implementing the Rpc API at rpc_core level. +/// A service implementing the Rpc API at kaspa_rpc_core level. /// /// Collects notifications from the consensus and forwards them to /// actual protocol-featured services. Thanks to the subscription pattern, @@ -50,6 +51,11 @@ use std::{ pub struct RpcCoreService { consensus_manager: Arc, notifier: Arc>, + mining_manager: Arc, + flow_context: Arc, + config: Arc, + consensus_converter: Arc, + _index_converter: Arc, } const RPC_CORE: &str = "rpc-core"; @@ -59,6 +65,9 @@ impl RpcCoreService { consensus_manager: Arc, consensus_notifier: Arc, index_notifier: Option>, + mining_manager: Arc, + flow_context: Arc, + config: Arc, ) -> Self { // Prepare consensus-notify objects let consensus_notify_channel = Channel::::default(); @@ -69,20 +78,23 @@ impl RpcCoreService { let mut consensus_events: EventSwitches = EVENT_TYPE_ARRAY[..].into(); consensus_events[EventType::UtxosChanged] = false; consensus_events[EventType::PruningPointUtxoSetOverride] = index_notifier.is_none(); - let consensus_collector = Arc::new(CollectorFromConsensus::new(consensus_notify_channel.receiver())); + let consensus_converter = Arc::new(ConsensusConverter::new(consensus_manager.clone(), config.clone())); + let consensus_collector = + Arc::new(CollectorFromConsensus::new(consensus_notify_channel.receiver(), consensus_converter.clone())); let consensus_subscriber = Arc::new(Subscriber::new(consensus_events, consensus_notifier, consensus_notify_listener_id)); let mut collectors: Vec> = vec![consensus_collector]; let mut subscribers = vec![consensus_subscriber]; // Prepare index-processor objects if an IndexService is provided + let index_converter = Arc::new(IndexConverter::new(config.clone())); if let Some(ref index_notifier) = index_notifier { let index_notify_channel = Channel::::default(); let index_notify_listener_id = index_notifier.clone().register_new_listener(IndexChannelConnection::new(index_notify_channel.sender())); let index_events: EventSwitches = [EventType::UtxosChanged, EventType::PruningPointUtxoSetOverride].as_ref().into(); - let index_collector = Arc::new(CollectorFromIndex::new(index_notify_channel.receiver())); + let index_collector = Arc::new(CollectorFromIndex::new(index_notify_channel.receiver(), index_converter.clone())); let index_subscriber = Arc::new(Subscriber::new(index_events, index_notifier.clone(), index_notify_listener_id)); collectors.push(index_collector); @@ -92,7 +104,15 @@ impl RpcCoreService { // Create the rcp-core notifier let notifier = Arc::new(Notifier::new(EVENT_TYPE_ARRAY[..].into(), collectors, subscribers, 1, RPC_CORE)); - Self { consensus_manager, notifier } + Self { + consensus_manager, + notifier, + mining_manager, + flow_context, + config, + consensus_converter, + _index_converter: index_converter, + } } pub fn start(&self) { @@ -113,127 +133,268 @@ impl RpcCoreService { #[async_trait] impl RpcApi for RpcCoreService { async fn submit_block_call(&self, request: SubmitBlockRequest) -> RpcResult { + let consensus = self.consensus_manager.consensus(); + let session = consensus.session().await; + + // TODO: consider adding an error field to SubmitBlockReport to document both the report and error fields + let is_synced: bool = self.flow_context.hub().has_peers() && session.is_nearly_synced(); + + if !self.config.allow_submit_block_when_not_synced && !is_synced { + // error = "Block not submitted - node is not synced" + return Ok(SubmitBlockResponse { report: SubmitBlockReport::Reject(SubmitBlockRejectReason::IsInIBD) }); + } + let try_block: RpcResult = (&request.block).try_into(); - if let Err(ref err) = try_block { + if let Err(err) = &try_block { trace!("incoming SubmitBlockRequest with block conversion error: {}", err); + // error = format!("Could not parse block: {0}", err) + return Ok(SubmitBlockResponse { report: SubmitBlockReport::Reject(SubmitBlockRejectReason::BlockInvalid) }); } let block = try_block?; let hash = block.hash(); - // We recreate a RpcBlock for the BlockAdded notification. - // This guaranties that we have the right hash. - // TODO: remove it when consensus emit a BlockAdded notification. - let rpc_block: RpcBlock = (&block).into(); + if !request.allow_non_daa_blocks { + let virtual_daa_score = session.get_virtual_daa_score(); - trace!("incoming SubmitBlockRequest for block {}", hash); + // A simple heuristic check which signals that the mined block is out of date + // and should not be accepted unless user explicitly requests + let daa_window_size = self.config.difficulty_window_size as u64; + if virtual_daa_score > daa_window_size && block.header.daa_score < virtual_daa_score - daa_window_size { + // error = format!("Block rejected. Reason: block DAA score {0} is too far behind virtual's DAA score {1}", block.header.daa_score, virtual_daa_score) + return Ok(SubmitBlockResponse { report: SubmitBlockReport::Reject(SubmitBlockRejectReason::BlockInvalid) }); + } + } - match self.consensus_manager.consensus().session().await.validate_and_insert_block(block, true).await { + trace!("incoming SubmitBlockRequest for block {}", hash); + match self.flow_context.add_block(session.deref(), block.clone()).await { Ok(_) => { info!("Accepted block {} via submit block", hash); - - // Notify about new added block - // TODO: let consensus emit this notification through an event channel - self.notifier.notify(Notification::BlockAdded(BlockAddedNotification { block: Arc::new(rpc_block) })).unwrap(); - - // Emit a NewBlockTemplate notification - self.notifier.notify(Notification::NewBlockTemplate(NewBlockTemplateNotification {})).unwrap(); - Ok(SubmitBlockResponse { report: SubmitBlockReport::Success }) } Err(err) => { - trace!("submit block error: {}", err); + warn!("The RPC submitted block triggered an error: {}\nPrinting the full header for debug purposes:\n{:?}", err, err); + // error = format!("Block rejected. Reason: {}", err)) Ok(SubmitBlockResponse { report: SubmitBlockReport::Reject(SubmitBlockRejectReason::BlockInvalid) }) - } // TODO: handle also the IsInIBD reject reason + } } } async fn get_block_template_call(&self, request: GetBlockTemplateRequest) -> RpcResult { trace!("incoming GetBlockTemplate request"); + // Make sure the pay address prefix matches the config network type + if request.pay_address.prefix != self.config.prefix() { + return Err(kaspa_addresses::AddressError::InvalidPrefix(request.pay_address.prefix.to_string()))?; + } + + // Build block template let script_public_key = kaspa_txscript::pay_to_address_script(&request.pay_address); - let miner_data: MinerData = MinerData::new(script_public_key, request.extra_data); - // TODO: handle error properly when managed through mining manager - let block_template = self.consensus_manager.consensus().session().await.build_block_template(miner_data, vec![]).unwrap(); + let extra_data = version().as_bytes().iter().chain(once(&(b'/'))).chain(&request.extra_data).cloned().collect::>(); + let miner_data: MinerData = MinerData::new(script_public_key, extra_data); + let consensus = self.consensus_manager.consensus(); + let session = consensus.session().await; + let block_template = self.mining_manager.get_block_template(session.deref(), &miner_data)?; + + // Check coinbase tx payload length + if block_template.block.transactions[COINBASE_TRANSACTION_INDEX].payload.len() > self.config.max_coinbase_payload_len { + return Err(RpcError::CoinbasePayloadLengthAboveMax(self.config.max_coinbase_payload_len)); + } - Ok((&block_template).into()) + let is_nearly_synced = self.config.is_nearly_synced(block_template.selected_parent_timestamp); + Ok(GetBlockTemplateResponse { + block: (&block_template.block).into(), + is_synced: self.flow_context.hub().has_peers() && is_nearly_synced, + }) } - async fn get_block_call(&self, req: GetBlockRequest) -> RpcResult { - // TODO: Remove the following test when consensus is used to fetch data + async fn get_block_call(&self, request: GetBlockRequest) -> RpcResult { + // TODO: test + let consensus = self.consensus_manager.consensus(); + let session = consensus.session().await; + let mut block = session.get_block_even_if_header_only(request.hash)?; + if !request.include_transactions { + block.transactions = Arc::new(vec![]); + } + Ok(GetBlockResponse { block: self.consensus_converter.get_block(session.deref(), &block, request.include_transactions)? }) + } - // This is a test to simulate a consensus error - if req.hash.as_bytes()[0] == 0 { - return Err(RpcError::General(format!("Block {0} not found", req.hash))); + async fn get_blocks_call(&self, request: GetBlocksRequest) -> RpcResult { + // Validate that user didn't set include_transactions without setting include_blocks + if !request.include_blocks && request.include_transactions { + return Err(RpcError::InvalidGetBlocksRequest); } - // TODO: query info from consensus and use it to build the response - Ok(GetBlockResponse { block: create_dummy_rpc_block() }) - } + let consensus = self.consensus_manager.consensus(); + let session = consensus.session().await; - async fn get_info_call(&self, _req: GetInfoRequest) -> RpcResult { - // TODO: query info from consensus and use it to build the response + // If low_hash is empty - use genesis instead. + let low_hash = match request.low_hash { + Some(low_hash) => { + // Make sure low_hash points to an existing and valid block + session.deref().get_ghostdag_data(low_hash)?; + low_hash + } + None => self.config.genesis.hash, + }; + + // Get hashes between low_hash and sink + let sink_hash = session.get_sink(); + + // We use +1 because low_hash is also returned + // max_blocks MUST be >= mergeset_size_limit + 1 + let max_blocks = self.config.mergeset_size_limit as usize + 1; + let (block_hashes, high_hash) = session.get_hashes_between(low_hash, sink_hash, max_blocks)?; + + // If the high hash is equal to sink it means get_hashes_between didn't skip any hashes, and + // there's space to add the sink anticone, otherwise we cannot add the anticone because + // there's no guarantee that all of the anticone root ancestors will be present. + let sink_anticone = if high_hash == sink_hash { session.get_anticone(sink_hash)? } else { vec![] }; + // Prepend low hash to make it inclusive and append the sink anticone + let block_hashes = once(low_hash).chain(block_hashes).chain(sink_anticone).collect::>(); + let blocks = if request.include_blocks { + block_hashes + .iter() + .cloned() + .map(|hash| { + let mut block = session.get_block_even_if_header_only(hash)?; + if !request.include_transactions { + block.transactions = Arc::new(vec![]); + } + self.consensus_converter.get_block(session.deref(), &block, request.include_transactions) + }) + .collect::>>() + } else { + Ok(vec![]) + }?; + Ok(GetBlocksResponse { block_hashes, blocks }) + } + + async fn get_info_call(&self, _request: GetInfoRequest) -> RpcResult { + let is_nearly_synced = self.consensus_manager.consensus().session().await.is_nearly_synced(); Ok(GetInfoResponse { - p2p_id: "test".to_string(), - mempool_size: 1, - server_version: "0.12.8".to_string(), - is_utxo_indexed: false, - is_synced: false, + p2p_id: self.flow_context.node_id.to_string(), + mempool_size: self.mining_manager.transaction_count(true, false) as u64, + server_version: version().to_string(), + is_utxo_indexed: self.config.utxoindex, + is_synced: self.flow_context.hub().has_peers() && is_nearly_synced, has_notify_command: true, has_message_id: true, }) } - // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - // UNIMPLEMENTED METHODS - - async fn get_current_network_call(&self, _request: GetCurrentNetworkRequest) -> RpcResult { - unimplemented!(); - } - - async fn get_peer_addresses_call(&self, _request: GetPeerAddressesRequest) -> RpcResult { - unimplemented!(); - } - - async fn get_selected_tip_hash_call(&self, _request: GetSelectedTipHashRequest) -> RpcResult { - unimplemented!(); - } - - async fn get_mempool_entry_call(&self, _request: GetMempoolEntryRequest) -> RpcResult { - unimplemented!(); + async fn get_mempool_entry_call(&self, request: GetMempoolEntryRequest) -> RpcResult { + let Some(transaction) = self.mining_manager.get_transaction(&request.transaction_id, !request.filter_transaction_pool, request.include_orphan_pool) else { + return Err(RpcError::TransactionNotFound(request.transaction_id)); + }; + let consensus = self.consensus_manager.consensus(); + let session = consensus.session().await; + Ok(GetMempoolEntryResponse::new(self.consensus_converter.get_mempool_entry(session.deref(), &transaction))) } - async fn get_mempool_entries_call(&self, _request: GetMempoolEntriesRequest) -> RpcResult { - unimplemented!(); + async fn get_mempool_entries_call(&self, request: GetMempoolEntriesRequest) -> RpcResult { + let consensus = self.consensus_manager.consensus(); + let session = consensus.session().await; + let (transactions, orphans) = + self.mining_manager.get_all_transactions(!request.filter_transaction_pool, request.include_orphan_pool); + let mempool_entries = transactions + .iter() + .chain(orphans.iter()) + .map(|transaction| self.consensus_converter.get_mempool_entry(session.deref(), transaction)) + .collect(); + Ok(GetMempoolEntriesResponse::new(mempool_entries)) } - async fn get_connected_peer_info_call(&self, _request: GetConnectedPeerInfoRequest) -> RpcResult { - unimplemented!(); + async fn get_mempool_entries_by_addresses_call( + &self, + request: GetMempoolEntriesByAddressesRequest, + ) -> RpcResult { + let consensus = self.consensus_manager.consensus(); + let session = consensus.session().await; + let script_public_keys = request.addresses.iter().map(pay_to_address_script).collect(); + let grouped_txs = self.mining_manager.get_transactions_by_addresses( + &script_public_keys, + !request.filter_transaction_pool, + request.include_orphan_pool, + ); + let mempool_entries = grouped_txs + .owners + .iter() + .map(|(script_public_key, owner_transactions)| { + let address = extract_script_pub_key_address(script_public_key, self.config.prefix()) + .expect("script public key is convertible into an address"); + self.consensus_converter.get_mempool_entries_by_address( + session.deref(), + address, + owner_transactions, + &grouped_txs.transactions, + ) + }) + .collect(); + Ok(GetMempoolEntriesByAddressesResponse::new(mempool_entries)) + } + + async fn submit_transaction_call(&self, request: SubmitTransactionRequest) -> RpcResult { + let transaction: Transaction = (&request.transaction).try_into()?; + let transaction_id = transaction.id(); + let consensus = self.consensus_manager.consensus(); + let session = consensus.session().await; + self.flow_context.add_transaction(session.deref(), transaction, Orphan::Allowed).await.map_err(|err| { + let err = RpcError::RejectedTransaction(transaction_id, err.to_string()); + debug!("{err}"); + err + })?; + Ok(SubmitTransactionResponse::new(transaction_id)) + } + + async fn get_current_network_call(&self, _: GetCurrentNetworkRequest) -> RpcResult { + Ok(GetCurrentNetworkResponse::new(self.config.net)) + } + + async fn get_subnetwork_call(&self, _: GetSubnetworkRequest) -> RpcResult { + Err(RpcError::NotImplemented) + } + + async fn get_selected_tip_hash_call(&self, _: GetSelectedTipHashRequest) -> RpcResult { + Ok(GetSelectedTipHashResponse::new(self.consensus_manager.consensus().session().await.get_sink())) + } + + async fn get_sink_blue_score_call(&self, _: GetSinkBlueScoreRequest) -> RpcResult { + let consensus = self.consensus_manager.consensus(); + let session = consensus.session().await; + Ok(GetSinkBlueScoreResponse::new(session.get_ghostdag_data(session.get_sink())?.blue_score)) } - async fn add_peer_call(&self, _request: AddPeerRequest) -> RpcResult { - unimplemented!(); + async fn get_virtual_chain_from_block_call( + &self, + request: GetVirtualChainFromBlockRequest, + ) -> RpcResult { + let consensus = self.consensus_manager.consensus(); + let session = consensus.session().await; + let virtual_chain = session.get_virtual_chain_from_block(request.start_hash)?; + let accepted_transaction_ids = if request.include_accepted_transaction_ids { + self.consensus_converter.get_virtual_chain_accepted_transaction_ids(session.deref(), &virtual_chain)? + } else { + vec![] + }; + Ok(GetVirtualChainFromBlockResponse::new(virtual_chain.removed, virtual_chain.added, accepted_transaction_ids)) } - async fn submit_transaction_call(&self, _request: SubmitTransactionRequest) -> RpcResult { - unimplemented!(); + async fn get_block_count_call(&self, _: GetBlockCountRequest) -> RpcResult { + Ok(self.consensus_manager.consensus().session().await.get_sync_info()) } - async fn get_subnetwork_call(&self, _request: GetSubnetworkRequest) -> RpcResult { - unimplemented!(); - } + // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + // UNIMPLEMENTED METHODS - async fn get_virtual_chain_from_block_call( - &self, - _request: GetVirtualChainFromBlockRequest, - ) -> RpcResult { + async fn get_peer_addresses_call(&self, _request: GetPeerAddressesRequest) -> RpcResult { unimplemented!(); } - async fn get_blocks_call(&self, _request: GetBlocksRequest) -> RpcResult { + async fn get_connected_peer_info_call(&self, _request: GetConnectedPeerInfoRequest) -> RpcResult { unimplemented!(); } - async fn get_block_count_call(&self, _request: GetBlockCountRequest) -> RpcResult { + async fn add_peer_call(&self, _request: AddPeerRequest) -> RpcResult { unimplemented!(); } @@ -273,10 +434,6 @@ impl RpcApi for RpcCoreService { unimplemented!(); } - async fn get_sink_blue_score_call(&self, _request: GetSinkBlueScoreRequest) -> RpcResult { - unimplemented!(); - } - async fn ban_call(&self, _request: BanRequest) -> RpcResult { unimplemented!(); } @@ -292,13 +449,6 @@ impl RpcApi for RpcCoreService { unimplemented!(); } - async fn get_mempool_entries_by_addresses_call( - &self, - _request: GetMempoolEntriesByAddressesRequest, - ) -> RpcResult { - unimplemented!(); - } - async fn get_coin_supply_call(&self, _request: GetCoinSupplyRequest) -> RpcResult { unimplemented!(); } @@ -339,38 +489,3 @@ impl RpcApi for RpcCoreService { Ok(()) } } - -// TODO: Remove the following function when consensus is used to fetch data -fn create_dummy_rpc_block() -> RpcBlock { - let sel_parent_hash = Hash::from_str("5963be67f12da63004ce1baceebd7733c4fb601b07e9b0cfb447a3c5f4f3c4f0").unwrap(); - RpcBlock { - header: RpcHeader { - hash: Hash::from_str("8270e63a0295d7257785b9c9b76c9a2efb7fb8d6ac0473a1bff1571c5030e995").unwrap(), - version: 1, - parents_by_level: vec![], - hash_merkle_root: Hash::from_str("4b5a041951c4668ecc190c6961f66e54c1ce10866bef1cf1308e46d66adab270").unwrap(), - accepted_id_merkle_root: Hash::from_str("1a1310d49d20eab15bf62c106714bdc81e946d761701e81fabf7f35e8c47b479").unwrap(), - utxo_commitment: Hash::from_str("e7cdeaa3a8966f3fff04e967ed2481615c76b7240917c5d372ee4ed353a5cc15").unwrap(), - timestamp: SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis() as u64, - bits: 1, - nonce: 1234, - daa_score: 123456, - blue_work: RpcBlueWorkType::from_rpc_hex("1234567890abcdef").unwrap(), - pruning_point: Hash::from_str("7190c08d42a0f7994b183b52e7ef2f99bac0b91ef9023511cadf4da3a2184b16").unwrap(), - blue_score: 12345678901, - }, - transactions: vec![], - verbose_data: Some(RpcBlockVerboseData { - hash: Hash::from_str("8270e63a0295d7257785b9c9b76c9a2efb7fb8d6ac0473a1bff1571c5030e995").unwrap(), - difficulty: 5678.0, - selected_parent_hash: sel_parent_hash, - transaction_ids: vec![], - is_header_only: true, - blue_score: 98765, - children_hashes: vec![], - merge_set_blues_hashes: vec![], - merge_set_reds_hashes: vec![], - is_chain_block: true, - }), - } -} diff --git a/rpc/wrpc/client/src/client.rs b/rpc/wrpc/client/src/client.rs index d036e6fa..1011c15b 100644 --- a/rpc/wrpc/client/src/client.rs +++ b/rpc/wrpc/client/src/client.rs @@ -1,5 +1,5 @@ use crate::imports::*; -use kaspa_rpc_core::notify::collector::RpcCoreCollector; +use kaspa_rpc_core::notify::collector::{RpcCoreCollector, RpcCoreConverter}; pub use kaspa_rpc_macros::build_wrpc_client_interface; use std::fmt::Debug; @@ -147,7 +147,8 @@ impl KaspaRpcClient { let inner = Arc::new(Inner::new(encoding, url)?); let notifier = if matches!(notification_mode, NotificationMode::MultiListeners) { let enabled_events = EVENT_TYPE_ARRAY[..].into(); - let collector = Arc::new(RpcCoreCollector::new(inner.notification_channel_receiver())); + let converter = Arc::new(RpcCoreConverter::new()); + let collector = Arc::new(RpcCoreCollector::new(inner.notification_channel_receiver(), converter)); let subscriber = Arc::new(Subscriber::new(enabled_events, inner.clone(), 0)); Some(Arc::new(Notifier::new(enabled_events, vec![collector], vec![subscriber], 3, WRPC_CLIENT))) } else { diff --git a/rpc/wrpc/server/src/collector.rs b/rpc/wrpc/server/src/collector.rs index 1083fe5c..5b86bbd3 100644 --- a/rpc/wrpc/server/src/collector.rs +++ b/rpc/wrpc/server/src/collector.rs @@ -1,4 +1,5 @@ -use kaspa_notify::collector::CollectorFrom; +use kaspa_notify::{collector::CollectorFrom, converter::ConverterFrom}; use kaspa_rpc_core::Notification; -pub type WrpcServiceCollector = CollectorFrom; +pub type WrpcServiceConverter = ConverterFrom; +pub type WrpcServiceCollector = CollectorFrom; diff --git a/rpc/wrpc/server/src/server.rs b/rpc/wrpc/server/src/server.rs index d3dbbe5e..92b96dd9 100644 --- a/rpc/wrpc/server/src/server.rs +++ b/rpc/wrpc/server/src/server.rs @@ -1,4 +1,9 @@ -use crate::{collector::WrpcServiceCollector, connection::Connection, result::Result, service::Options}; +use crate::{ + collector::{WrpcServiceCollector, WrpcServiceConverter}, + connection::Connection, + result::Result, + service::Options, +}; use kaspa_notify::{ events::EVENT_TYPE_ARRAY, listener::ListenerId, @@ -52,7 +57,8 @@ impl Server { // Prepare notification internals let rpc_events = EVENT_TYPE_ARRAY[..].into(); - let collector = Arc::new(WrpcServiceCollector::new(rpc_channel.receiver())); + let converter = Arc::new(WrpcServiceConverter::new()); + let collector = Arc::new(WrpcServiceCollector::new(rpc_channel.receiver(), converter)); let subscriber = Arc::new(Subscriber::new(rpc_events, subscription_manager, rpc_listener_id)); let notifier: Arc> = Arc::new(Notifier::new(rpc_events, vec![collector], vec![subscriber], tasks, WRPC_SERVER)); diff --git a/simpa/src/main.rs b/simpa/src/main.rs index f3909a71..6c747274 100644 --- a/simpa/src/main.rs +++ b/simpa/src/main.rs @@ -126,18 +126,18 @@ fn main() { let mut perf_params = PERF_PARAMS; adjust_consensus_params(&args, &mut params); adjust_perf_params(&args, ¶ms, &mut perf_params); - let config = ConfigBuilder::new(params).set_perf_params(perf_params).skip_proof_of_work().build(); + let config = Arc::new(ConfigBuilder::new(params).set_perf_params(perf_params).skip_proof_of_work().build()); // Load an existing consensus or run the simulation let (consensus, _lifetime) = if let Some(input_dir) = args.input_dir { let (lifetime, db) = load_existing_db(input_dir, num_cpus::get()); let (dummy_notification_sender, _) = unbounded(); let notification_root = Arc::new(ConsensusNotificationRoot::new(dummy_notification_sender)); - let consensus = Arc::new(Consensus::new(db, &config, notification_root, Default::default())); + let consensus = Arc::new(Consensus::new(db, config.clone(), notification_root, Default::default())); (consensus, lifetime) } else { let until = if args.target_blocks.is_none() { args.sim_time * 1000 } else { u64::MAX }; // milliseconds - let mut sim = KaspaNetworkSimulator::new(args.delay, args.bps, args.target_blocks, &config, args.output_dir); + let mut sim = KaspaNetworkSimulator::new(args.delay, args.bps, args.target_blocks, config.clone(), args.output_dir); let (consensus, handles, lifetime) = sim.init(args.miners, args.tpb).run(until); consensus.shutdown(handles); (consensus, lifetime) @@ -147,7 +147,7 @@ fn main() { let (_lifetime2, db2) = create_temp_db_with_parallelism(num_cpus::get()); let (dummy_notification_sender, _) = unbounded(); let notification_root = Arc::new(ConsensusNotificationRoot::new(dummy_notification_sender)); - let consensus2 = Arc::new(Consensus::new(db2, &config, notification_root, Default::default())); + let consensus2 = Arc::new(Consensus::new(db2, config.clone(), notification_root, Default::default())); let handles2 = consensus2.run_processors(); validate(&consensus, &consensus2, &config, args.delay, args.bps); consensus2.shutdown(handles2); diff --git a/simpa/src/simulator/network.rs b/simpa/src/simulator/network.rs index a663387b..f90162e1 100644 --- a/simpa/src/simulator/network.rs +++ b/simpa/src/simulator/network.rs @@ -20,22 +20,15 @@ pub struct KaspaNetworkSimulator { // Consensus instances consensuses: Vec, - config: Config, // Consensus config + config: Arc, // Consensus config bps: f64, // Blocks per second target_blocks: Option, // Target simulation blocks output_dir: Option, // Possible permanent output directory } impl KaspaNetworkSimulator { - pub fn new(delay: f64, bps: f64, target_blocks: Option, config: &Config, output_dir: Option) -> Self { - Self { - simulation: Simulation::new((delay * 1000.0) as u64), - consensuses: Vec::new(), - bps, - config: config.clone(), - target_blocks, - output_dir, - } + pub fn new(delay: f64, bps: f64, target_blocks: Option, config: Arc, output_dir: Option) -> Self { + Self { simulation: Simulation::new((delay * 1000.0) as u64), consensuses: Vec::new(), bps, config, target_blocks, output_dir } } pub fn init(&mut self, num_miners: u64, target_txs_per_block: u64) -> &mut Self { @@ -49,7 +42,7 @@ impl KaspaNetworkSimulator { }; let (dummy_notification_sender, _) = unbounded(); let notification_root = Arc::new(ConsensusNotificationRoot::new(dummy_notification_sender)); - let consensus = Arc::new(Consensus::new(db, &self.config, notification_root, Default::default())); + let consensus = Arc::new(Consensus::new(db, self.config.clone(), notification_root, Default::default())); let handles = consensus.run_processors(); let (sk, pk) = secp.generate_keypair(&mut rng); let miner_process = Box::new(Miner::new( diff --git a/testing/integration/src/integration_tests.rs b/testing/integration/src/integration_tests.rs index c6234a53..6ed7f68a 100644 --- a/testing/integration/src/integration_tests.rs +++ b/testing/integration/src/integration_tests.rs @@ -11,7 +11,7 @@ use kaspa_consensus::model::stores::ghostdag::{GhostdagStoreReader, KType as Gho use kaspa_consensus::model::stores::headers::HeaderStoreReader; use kaspa_consensus::model::stores::reachability::DbReachabilityStore; use kaspa_consensus::model::stores::selected_chain::SelectedChainStoreReader; -use kaspa_consensus::params::{Params, DEVNET_PARAMS, MAINNET_PARAMS}; +use kaspa_consensus::params::{Params, DEVNET_PARAMS, DIFFICULTY_MAX, DIFFICULTY_MAX_AS_F64, MAINNET_PARAMS}; use kaspa_consensus::pipeline::ProcessingCounters; use kaspa_consensus::processes::reachability::tests::{DagBlock, DagBuilder, StoreValidationExtensions}; use kaspa_consensus_core::api::ConsensusApi; @@ -737,6 +737,8 @@ impl KaspadGoParams { timestamp_deviation_tolerance: self.TimestampDeviationTolerance, target_time_per_block: self.TargetTimePerBlock / 1_000_000, max_block_parents: self.MaxBlockParents, + max_difficulty: DIFFICULTY_MAX, + max_difficulty_f64: DIFFICULTY_MAX_AS_F64, difficulty_window_size: self.DifficultyAdjustmentWindowSize, mergeset_size_limit: self.MergeSetSizeLimit, merge_depth: self.MergeDepth, @@ -853,6 +855,7 @@ async fn json_test(file_path: &str) { if proof_exists { config.process_genesis = false; } + let config = Arc::new(config); let (notification_send, notification_recv) = unbounded(); let tc = Arc::new(TestConsensus::create_from_temp_db(&config, notification_send)); @@ -861,7 +864,7 @@ async fn json_test(file_path: &str) { let (_utxoindex_db_lifetime, utxoindex_db) = create_temp_db(); let consensus_manager = Arc::new(ConsensusManager::from_consensus(tc.consensus())); let utxoindex = UtxoIndex::new(consensus_manager, utxoindex_db).unwrap(); - let index_service = Arc::new(IndexService::new(¬ify_service.notifier(), Some(utxoindex.clone()), &config)); + let index_service = Arc::new(IndexService::new(¬ify_service.notifier(), Some(utxoindex.clone()))); let async_runtime = Arc::new(AsyncRuntime::new(2)); async_runtime.register(notify_service.clone());