diff --git a/Cargo.lock b/Cargo.lock index 6740937e1..c5eaae3f8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -28,7 +28,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fe438c63458706e03479442743baae6c88256498e6431708f6dfc520a26515d3" dependencies = [ "lazy_static", - "regex 1.3.6", + "regex 1.3.7", ] [[package]] @@ -401,7 +401,7 @@ dependencies = [ "peeking_take_while", "proc-macro2", "quote 1.0.3", - "regex 1.3.6", + "regex 1.3.7", "rustc-hash", "shlex", "which", @@ -1278,7 +1278,7 @@ dependencies = [ "atty", "humantime", "log 0.4.8", - "regex 1.3.6", + "regex 1.3.7", "termcolor", ] @@ -1291,7 +1291,7 @@ dependencies = [ "atty", "humantime", "log 0.4.8", - "regex 1.3.6", + "regex 1.3.7", "termcolor", ] @@ -1979,7 +1979,7 @@ dependencies = [ "bstr", "fnv", "log 0.4.8", - "regex 1.3.6", + "regex 1.3.7", ] [[package]] @@ -2261,9 +2261,9 @@ dependencies = [ [[package]] name = "hyper" -version = "0.13.4" +version = "0.13.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ed6081100e960d9d74734659ffc9cc91daf1c0fc7aceb8eaa94ee1a3f5046f2e" +checksum = "96816e1d921eca64d208a85aab4f7798455a8e34229ee5a88c935bdee1b78b14" dependencies = [ "bytes 0.5.4", "futures-channel", @@ -2322,7 +2322,7 @@ dependencies = [ "bytes 0.5.4", "ct-logs", "futures-util", - "hyper 0.13.4", + "hyper 0.13.5", "log 0.4.8", "rustls 0.17.0", "rustls-native-certs", @@ -2717,7 +2717,7 @@ dependencies = [ "futures-timer 3.0.2", "globset", "hashbrown 0.7.1", - "hyper 0.13.4", + "hyper 0.13.5", "jsonrpsee-proc-macros", "lazy_static", "log 0.4.8", @@ -2807,7 +2807,7 @@ dependencies = [ "owning_ref", "parity-util-mem", "parking_lot 0.10.2", - "regex 1.3.6", + "regex 1.3.7", "rocksdb", "smallvec 1.3.0", ] @@ -2939,9 +2939,9 @@ dependencies = [ [[package]] name = "libp2p" -version = "0.18.0" +version = "0.18.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aa5aedb713f76577818529be8283e35ec5e8b3ecdccfe0231ba4d860687438ab" +checksum = "32ea742c86405b659c358223a8f0f9f5a9eb27bb6083894c6340959b05269662" dependencies = [ "bytes 0.5.4", "futures 0.3.4", @@ -2961,7 +2961,7 @@ dependencies = [ "libp2p-plaintext 0.18.0", "libp2p-pnet 0.18.0", "libp2p-secio 0.18.0", - "libp2p-swarm 0.18.0", + "libp2p-swarm 0.18.1", "libp2p-tcp 0.18.0", "libp2p-uds 0.18.0", "libp2p-wasm-ext 0.18.0", @@ -3133,7 +3133,7 @@ dependencies = [ "fnv", "futures 0.3.4", "libp2p-core 0.18.0", - "libp2p-swarm 0.18.0", + "libp2p-swarm 0.18.1", "prost", "prost-build", "rand 0.7.3", @@ -3178,7 +3178,7 @@ dependencies = [ "futures 0.3.4", "futures_codec", "libp2p-core 0.18.0", - "libp2p-swarm 0.18.0", + "libp2p-swarm 0.18.1", "log 0.4.8", "lru", "prost", @@ -3214,7 +3214,7 @@ checksum = "a38ca3eb807789e26f41c82ca7cd2b3843c66c5587b8b5f709a2f421f3061414" dependencies = [ "futures 0.3.4", "libp2p-core 0.18.0", - "libp2p-swarm 0.18.0", + "libp2p-swarm 0.18.1", "log 0.4.8", "prost", "prost-build", @@ -3262,7 +3262,7 @@ dependencies = [ "futures 0.3.4", "futures_codec", "libp2p-core 0.18.0", - "libp2p-swarm 0.18.0", + "libp2p-swarm 0.18.1", "log 0.4.8", "multihash", "prost", @@ -3311,7 +3311,7 @@ dependencies = [ "futures 0.3.4", "lazy_static", "libp2p-core 0.18.0", - "libp2p-swarm 0.18.0", + "libp2p-swarm 0.18.1", "log 0.4.8", "net2", "rand 0.7.3", @@ -3417,7 +3417,7 @@ checksum = "f9bfbf87eebb492d040f9899c5c81c9738730465ac5e78d9b7a7d086d0f07230" dependencies = [ "futures 0.3.4", "libp2p-core 0.18.0", - "libp2p-swarm 0.18.0", + "libp2p-swarm 0.18.1", "log 0.4.8", "rand 0.7.3", "void", @@ -3564,9 +3564,9 @@ dependencies = [ [[package]] name = "libp2p-swarm" -version = "0.18.0" +version = "0.18.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "622605817885e67b5572189c2507e514b786beb69ed85a120dbb245a7f15383d" +checksum = "44ab289ae44cc691da0a6fe96aefa43f26c86c6c7813998e203f6d80f1860f18" dependencies = [ "futures 0.3.4", "libp2p-core 0.18.0", @@ -5655,9 +5655,9 @@ dependencies = [ [[package]] name = "regex" -version = "1.3.6" +version = "1.3.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f6946991529684867e47d86474e3a6d0c0ab9b82d5821e314b1ede31fa3a4b3" +checksum = "a6020f034922e3194c711b82a627453881bc4682166cabb07134a10c26ba7692" dependencies = [ "aho-corasick 0.7.10", "memchr", @@ -5760,8 +5760,10 @@ dependencies = [ "async-std", "bincode", "derive_more", + "futures 0.3.4", + "futures-timer 3.0.2", "hex", - "libp2p 0.18.0", + "libp2p 0.18.1", "log 0.4.8", "pallet-indices", "parity-scale-codec", @@ -5914,7 +5916,7 @@ dependencies = [ "md-5", "proc-macro2", "quote 1.0.3", - "regex 1.3.6", + "regex 1.3.7", "syn 1.0.17", ] @@ -6107,7 +6109,7 @@ dependencies = [ "derive_more", "futures 0.3.4", "futures-timer 3.0.2", - "libp2p 0.18.0", + "libp2p 0.18.1", "log 0.4.8", "parity-scale-codec", "prost", @@ -6214,7 +6216,7 @@ dependencies = [ "names", "nix 0.17.0", "parity-util-mem", - "regex 1.3.6", + "regex 1.3.7", "rpassword", "sc-client-api", "sc-informant", @@ -6602,7 +6604,7 @@ dependencies = [ "futures-timer 3.0.2", "futures_codec", "hex", - "libp2p 0.18.0", + "libp2p 0.18.1", "linked-hash-map", "linked_hash_set", "log 0.4.8", @@ -6646,7 +6648,7 @@ checksum = "a61f954d0ce6279fe48df2d3d20d3e5589964d501509f85b323cb0fde37b1b90" dependencies = [ "futures 0.3.4", "futures-timer 3.0.2", - "libp2p 0.18.0", + "libp2p 0.18.1", "log 0.4.8", "lru", "sc-network", @@ -6665,7 +6667,7 @@ dependencies = [ "fnv", "futures 0.3.4", "futures-timer 3.0.2", - "hyper 0.13.4", + "hyper 0.13.5", "hyper-rustls 0.20.0", "log 0.4.8", "num_cpus", @@ -6690,7 +6692,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "17b85bec68c3c1b4495b6eabdf14d1c4140a6d569b61af9c4f721f23eae42ed7" dependencies = [ "futures 0.3.4", - "libp2p 0.18.0", + "libp2p 0.18.1", "log 0.4.8", "serde_json", "sp-utils", @@ -6848,7 +6850,7 @@ dependencies = [ "bytes 0.5.4", "futures 0.3.4", "futures-timer 3.0.2", - "libp2p 0.18.0", + "libp2p 0.18.1", "log 0.4.8", "parking_lot 0.10.2", "pin-project", @@ -7465,7 +7467,7 @@ dependencies = [ "futures 0.3.4", "futures-diagnose", "futures-timer 3.0.2", - "libp2p 0.18.0", + "libp2p 0.18.1", "log 0.4.8", "parity-scale-codec", "parking_lot 0.10.2", @@ -7533,7 +7535,7 @@ dependencies = [ "parking_lot 0.10.2", "primitive-types", "rand 0.7.3", - "regex 1.3.6", + "regex 1.3.7", "schnorrkel", "serde", "sha2", @@ -8091,7 +8093,7 @@ dependencies = [ "async-std", "derive_more", "futures-util", - "hyper 0.13.4", + "hyper 0.13.5", "log 0.4.8", "prometheus", "tokio 0.2.18", @@ -8425,12 +8427,11 @@ dependencies = [ [[package]] name = "time" -version = "0.1.42" +version = "0.1.43" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "db8dcfca086c1143c9270ac42a2bbd8a7ee477b78ac8e45b19abfb0cbede4b6f" +checksum = "ca8a50ef2360fbd1eeb0ecd46795a87a19024eb4b53c5dc916ca1fd95fe62438" dependencies = [ "libc", - "redox_syscall", "winapi 0.3.8", ] @@ -9353,9 +9354,9 @@ checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" [[package]] name = "winapi-util" -version = "0.1.4" +version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fa515c5163a99cc82bab70fd3bfdd36d827be85de63737b40fcef2ce084a436e" +checksum = "70ec6ce85bb158151cae5e5c87f95a8e97d2c0c4b001223f33a334e3ce5de178" dependencies = [ "winapi 0.3.8", ] diff --git a/robonomics/protocol/Cargo.toml b/robonomics/protocol/Cargo.toml index 99e98766a..27b981fcb 100644 --- a/robonomics/protocol/Cargo.toml +++ b/robonomics/protocol/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "robonomics-protocol" description = "Robonomics Network protocol on libp2p." -version = "0.3.0" +version = "0.4.0" authors = ["Airalab "] edition = "2018" @@ -13,6 +13,8 @@ sp-core = "2.0.0-alpha.6" sp-runtime = "2.0.0-alpha.6" pallet-indices = "2.0.0-alpha.6" derive_more = "0.99" +futures-timer = "3.0" +futures = "0.3.4" bincode = "1.2" libp2p = "0.18" log = "0.4" diff --git a/robonomics/protocol/src/cli.rs b/robonomics/protocol/src/cli.rs index e77d8431f..837f2efef 100644 --- a/robonomics/protocol/src/cli.rs +++ b/robonomics/protocol/src/cli.rs @@ -17,9 +17,10 @@ /////////////////////////////////////////////////////////////////////////////// //! Robonomics Network console interface. -use async_std::task; use sp_core::{sr25519, crypto::{Pair, Ss58Codec}}; +use futures::{future, StreamExt}; use libp2p::Multiaddr; +use async_std::task; use crate::datalog; use crate::pubsub::*; use crate::error::Result; @@ -27,26 +28,29 @@ use crate::error::Result; /// Command for pubsub router mode. #[derive(Debug, structopt::StructOpt, Clone)] pub struct PubSubCmd { - /// Topic name for subscribe and publish. + /// Subscribe for given topic name and print received messages. #[structopt( long, value_name = "TOPIC_NAME", )] - pub topic: Option, - /// Listen address for incoming PubSub connections, + pub subscribe: Option, + /// Publish stdin lines into given topic name. #[structopt( long, - value_name = "MULTIADDR", + value_name = "TOPIC_NAME", default_value = "/ip4/0.0.0.0/tcp/0", )] pub listen: Multiaddr, - /// Indicates PubSub nodes for first connections + /// Indicates PubSub nodes for first connections. #[structopt( long, value_name = "MULTIADDR", use_delimiter = true, )] pub bootnodes: Vec, + /// Disable Robonomics PubSub peer discovery. + #[structopt(long)] + pub disable_discovery: bool, #[allow(missing_docs)] #[structopt(flatten)] pub shared_params: sc_cli::SharedParams, @@ -68,32 +72,37 @@ impl sc_cli::CliConfiguration for PubSubCmd { impl PubSubCmd { /// Runs the command and node as pubsub router. pub fn run(&self) -> Result<()> { - let mut pubsub = Gossipsub::new()?; + let (pubsub, worker) = Gossipsub::new()?; // Listen address - pubsub.listen(&self.listen)?; + let _ = pubsub.listen(self.listen.clone()); // Connect to bootnodes for addr in &self.bootnodes { - pubsub.connect(addr)?; + let _ = pubsub.connect(addr.clone()); } // Subscribe on topic topic and print received content - match self.topic.clone() { + match self.subscribe.clone() { Some(topic_name) => { - pubsub.subscribe(topic_name, |from, msg| - log::info!( - target: "robonomics-pubsub", - "Received message from {}: {}", - from.to_base58(), - String::from_utf8_lossy(&msg) - ) - ); - }, + task::spawn(pubsub.subscribe(&topic_name).for_each(|msg| { + println!( + "{}: {}", + msg.from.to_base58(), + String::from_utf8_lossy(&msg.data[..]), + ); + future::ready(()) + })); + } _ => (), } - Ok(task::block_on(pubsub.start())) + // Enable peer discovery if not disabled + if !self.disable_discovery { + task::spawn(discovery::start(pubsub.clone())); + } + + task::block_on(worker) } } diff --git a/robonomics/protocol/src/error.rs b/robonomics/protocol/src/error.rs index fd97e8b9d..7f1e6f54d 100644 --- a/robonomics/protocol/src/error.rs +++ b/robonomics/protocol/src/error.rs @@ -20,11 +20,19 @@ use libp2p::core::transport::TransportError; use libp2p::core::connection::ConnectionLimit; use sp_core::crypto::SecretStringError; +use futures::channel::oneshot; +use futures::Future; /// Protocol Result typedef. pub type Result = std::result::Result; -/// Service errors. +/// Oneshot channel result. +type OneshotResult = std::result::Result; + +/// Async version of protocol Result typedef. +pub type FutureResult = Box> + Send + Sync + Unpin>; + +/// Robonomics protocol errors. #[derive(Debug, derive_more::Display, derive_more::From)] pub enum Error { /// IO error. @@ -40,8 +48,6 @@ pub enum Error { PrivateKeyFailure(SecretStringError), /// Codec error. Codec(bincode::Error), - /// Synchronization error. - SyncError, /// Other error. Other(String), } diff --git a/robonomics/protocol/src/pubsub/mod.rs b/robonomics/protocol/src/pubsub.rs similarity index 50% rename from robonomics/protocol/src/pubsub/mod.rs rename to robonomics/protocol/src/pubsub.rs index daf03d1b5..0ef2a5658 100644 --- a/robonomics/protocol/src/pubsub/mod.rs +++ b/robonomics/protocol/src/pubsub.rs @@ -17,39 +17,55 @@ /////////////////////////////////////////////////////////////////////////////// ///! Robonomics Publisher/Subscriber protocol implements broadcasting layer. -use libp2p::core::connection::ListenerId; use libp2p::{PeerId, Multiaddr}; -use crate::error::Result; +use futures::Stream; +use crate::error::FutureResult; /// PubSub implementation using libp2p Gossipsub. pub mod gossipsub; pub use gossipsub::PubSub as Gossipsub; +/// Simple PubSub based node discovery. +pub mod discovery; + +/// Robonomics PubSub message. +#[derive(PartialEq, Eq, Clone)] +pub struct Message { + pub from: PeerId, + pub data: Vec, +} + /// Robonomics Publisher/Subscriber. pub trait PubSub { - /// Returns node local peer ID. + /// Stream of incoming messages. + type Inbox: Stream + Sized; + + /// Returns local peer ID. fn peer_id(&self) -> PeerId; /// Listen address for incoming connections. - fn listen(&mut self, address: &Multiaddr) -> Result; + /// + /// Returns true when successful bind and false in case of error. + fn listen(&self, address: Multiaddr) -> FutureResult; - /// Returns list of addresses we're listening on. - fn listeners(&self) -> Vec; + /// Returns a list of node addresses. + fn listeners(&self) -> FutureResult>; /// Connect to peer and add it into swarm. - fn connect(&mut self, address: &Multiaddr) -> Result<()>; + /// + /// Returns true when connected and false in case of error. + fn connect(&self, address: Multiaddr) -> FutureResult; - /// Subscribe and set handler for topic with given name. + /// Subscribe for a topic with given name. /// - /// Returns true if the subscription worked. Returns false if we were already subscribed. - fn subscribe(&mut self, topic_name: T, callback: F) -> bool - where T: ToString, F: FnMut(PeerId, Vec) + 'static; + /// Returns stream of incoming messages. + fn subscribe(&self, topic_name: &T) -> Self::Inbox; - /// Unsubscribe and remove handler for given topic name. + /// Unsubscribe for incoming messages from topic. /// - /// Returns true if we were subscribed to this topic. - fn unsubscribe(&mut self, topic_name: T) -> bool; + /// Returns true when success. + fn unsubscribe(&self, topic_name: &T) -> FutureResult; - /// Publish message into the topic. - fn publish>>(&mut self, topic_name: T, message: M); + /// Publish message into the topic by name. + fn publish>>(&self, topic_name: &T, message: M); } diff --git a/robonomics/protocol/src/pubsub/discovery.rs b/robonomics/protocol/src/pubsub/discovery.rs new file mode 100644 index 000000000..00fa91bf0 --- /dev/null +++ b/robonomics/protocol/src/pubsub/discovery.rs @@ -0,0 +1,94 @@ +/////////////////////////////////////////////////////////////////////////////// +// +// Copyright 2018-2020 Airalab +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +/////////////////////////////////////////////////////////////////////////////// +///! Robonomics Publisher/Subscriber protocol node discovery extension. + +use futures::{Future, FutureExt, StreamExt, future}; +use serde::{Serialize, Deserialize}; +use std::time::Duration; +use libp2p::Multiaddr; +use std::sync::Arc; +use super::PubSub; + +/// Peer information service message. +#[derive(Serialize, Deserialize, PartialEq, Debug)] +pub struct DiscoveryMessage { + peer_id: String, + listeners: Vec, +} + +/// Peer discovery topic name. +pub const DISCOVERY_TOPIC_NAME: &str = "_robonomics_pubsub_peer_discovery"; + +/// Simple node discovery algorithm. +/// +/// 1. All nodes subscribed for DISCOVERY_TOPIC_NAME. +/// 2. Each node periodically send listened addresses into DISCOVERY_TOPIC_NAME. +/// 3. If node received discovery message then try to connect remove node. +pub fn start(pubsub: Arc) -> impl Future { + future::join( + // Message broadcasting task + discovery(pubsub.clone()), + // Subscribe for discovery topic and read messages + pubsub.clone() + .subscribe(&DISCOVERY_TOPIC_NAME) + .for_each(move |msg| connect(pubsub.clone(), msg)), + ).map(|_| ()) +} + +async fn discovery(pubsub: Arc) { + let minute = Duration::from_secs(60); + + loop { + if let Ok(listeners) = pubsub.listeners().await { + if listeners.len() > 0 { + let message = DiscoveryMessage { + peer_id: pubsub.peer_id().to_base58(), + listeners, + }; + + pubsub.publish(&DISCOVERY_TOPIC_NAME, bincode::serialize(&message).unwrap()); + } + } + + // Sleep for 60 seconds + futures_timer::Delay::new(minute).await; + } +} + +async fn connect(pubsub: Arc, msg: super::Message) { + // Handle only external messages + if msg.from == pubsub.peer_id() { + return; + } + + let decoded: bincode::Result = bincode::deserialize(&msg.data[..]); + match decoded { + Ok(message) => { + for addr in message.listeners { + let _ = pubsub.connect(addr.clone()).await; + } + } + Err(e) => { + log::error!( + target: "robonomics-pubsub", + "Unable to decode discovery message from {}: {}", + msg.from.to_base58(), e + ); + } + } +} diff --git a/robonomics/protocol/src/pubsub/gossipsub.rs b/robonomics/protocol/src/pubsub/gossipsub.rs index b3be24563..33a88b3fa 100644 --- a/robonomics/protocol/src/pubsub/gossipsub.rs +++ b/robonomics/protocol/src/pubsub/gossipsub.rs @@ -22,11 +22,16 @@ /// and high available pubsub swarm. /// -use std::collections::hash_map::{HashMap, DefaultHasher}; -use std::hash::{Hash, Hasher}; -use std::time::Duration; -use std::ops::DerefMut; -use serde::{Serialize, Deserialize}; +use std::{ + collections::hash_map::{HashMap, DefaultHasher}, + hash::{Hash, Hasher}, time::Duration, + sync::Arc, pin::Pin, ops::DerefMut, + task::{Context, Poll}, +}; +use futures::{ + channel::{oneshot, mpsc}, prelude::*, + Future, +}; use libp2p::{Swarm, PeerId, Multiaddr}; use libp2p::core::connection::ListenerId; use libp2p::gossipsub::{ @@ -35,29 +40,29 @@ use libp2p::gossipsub::{ Topic, TopicHash, protocol::MessageId, }; -use crate::error::Result; - -/// Peer information service message. -#[derive(Serialize, Deserialize, PartialEq, Debug)] -pub struct DiscoveryMessage { - peer_id: String, - listeners: Vec, -} - -/// Peer discovery topic name. -pub const DISCOVERY_TOPIC_NAME: &str = "_robonomics_pubsub_peer_discovery"; +use crate::error::{Result, FutureResult}; /// Gossipsub heartbeat interval const HEARTBEAT_SECS: u64 = 10; -/// LibP2P Gossipsub based publisher/subscriber service. -pub struct PubSub { +enum ToWorkerMsg { + Listen(Multiaddr, oneshot::Sender), + Connect(Multiaddr, oneshot::Sender), + Listeners(oneshot::Sender>), + Subscribe(String, mpsc::UnboundedSender), + Unsubscribe(String, oneshot::Sender), + Publish(String, Vec), +} + +struct PubSubWorker { swarm: Swarm, - subs: HashMap) + 'static>>, + inbox: HashMap>, + from_service: mpsc::UnboundedReceiver, + service: Arc, } -impl PubSub { - /// Create new Robonomics PubSub instance +impl PubSubWorker { + /// Create new PubSub Worker instance pub fn new() -> Result { // XXX: temporary random local id. let local_key = crate::id::random(); @@ -79,116 +84,215 @@ impl PubSub { .build(); // Build a gossipsub network behaviour - let mut gossipsub = Gossipsub::new(peer_id.clone(), gossipsub_config); - - // Subscribe to discovery topic - gossipsub.subscribe(Topic::new(DISCOVERY_TOPIC_NAME.to_string())); + let gossipsub = Gossipsub::new(peer_id.clone(), gossipsub_config); // Create a Swarm to manage peers and events - let swarm = Swarm::new(transport, gossipsub, peer_id); + let swarm = Swarm::new(transport, gossipsub, peer_id.clone()); + + // Create worker communication channel + let (to_worker, from_service) = mpsc::unbounded(); + + // Create PubSub service + let service = Arc::new(PubSub { to_worker, peer_id }); + + // Create worker instance with empty subscribers + Ok(PubSubWorker { + swarm, + inbox: HashMap::new(), + from_service, + service, + }) + } + + fn listen(&mut self, address: Multiaddr) -> Result { + let listener = Swarm::listen_on(&mut self.swarm, address.clone())?; + log::debug!( + target: "robonomics-pubsub", + "Listener for address {} created: {:?}", address, listener + ); + Ok(listener) + } + + fn listeners(&self) -> Vec { + let listeners = Swarm::listeners(&self.swarm).cloned().collect(); + log::debug!(target: "robonomics-pubsub", "Listeners: {:?}", listeners); + listeners + } + + fn connect(&mut self, address: Multiaddr) -> bool { + log::debug!(target: "robonomics-pubsub", "Connecting to {}", address); + + Swarm::dial_addr(&mut self.swarm, address).is_ok() + } + + fn subscribe( + &mut self, + topic_name: String, + inbox: mpsc::UnboundedSender, + ) -> bool { + let topic = Topic::new(topic_name.clone()); + let subscribed = self.swarm.deref_mut().subscribe(topic.clone()); + if subscribed { + log::debug!(target: "robonomics-pubsub", "Subscribed to {}", topic_name); + self.inbox.insert(topic.no_hash(), inbox); + } else { + log::warn!(target: "robonomics-pubsub", + "Double subscription to {}, ignore", topic_name); + } + subscribed + } + + fn unsubscribe(&mut self, topic_name: String) -> bool { + let topic = Topic::new(topic_name.clone()); + let unsubscribed = self.swarm.deref_mut().unsubscribe(topic.clone()); + if unsubscribed { + log::debug!(target: "robonomics-pubsub", "Unsubscribed from {}", topic_name); + self.inbox.remove(&topic.sha256_hash()); + } else { + log::warn!(target: "robonomics-pubsub", + "Unable to unsubscribe from {}, ignore", topic_name); + } + unsubscribed + } + + fn publish(&mut self, topic_name: String, message: Vec) { + log::debug!(target: "robonomics-pubsub", "Publish to {}", topic_name); - // Create pubsub instance with empty subscribers - Ok(PubSub { swarm, subs: HashMap::new() }) + let topic = Topic::new(topic_name); + self.swarm.deref_mut().publish(&topic, message); } +} + +impl Future for PubSubWorker { + type Output = Result<()>; - pub async fn start(&mut self) { - let topic = Topic::new(DISCOVERY_TOPIC_NAME.to_string()); + fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { loop { - let message = DiscoveryMessage { - peer_id: Swarm::local_peer_id(&mut self.swarm).to_base58(), - listeners: Swarm::listeners(&mut self.swarm).cloned().collect(), - }; - - self.swarm.deref_mut().publish(&topic, bincode::serialize(&message).unwrap()); - log::info!( - target: "robonomics-pubsub", - "Discovery message sended: {:?}", message - ); - - match self.swarm.next().await { - GossipsubEvent::Message(peer_id, id, message) => { - log::info!( - target: "robonomics-pubsub", - "Received message with id: {} from peer: {}", id, peer_id.to_base58() - ); - - // Dispatch handlers by topic name hash - for topic in message.topics { - match self.subs.get_mut(&topic) { - None => { - let decoded: bincode::Result - = bincode::deserialize(&message.data[..]); - match decoded { - Ok(message) => for addr in message.listeners { - let _ = Swarm::dial_addr(&mut self.swarm, addr); - }, - Err(e) => log::error!( - target: "robonomics-pubsub", - "Unable to decode message from {}: {}", peer_id.to_base58(), e - ), - } - }, - Some(handler) => handler( - message.source.clone(), - message.data.clone(), - ), + match self.swarm.poll_next_unpin(cx) { + Poll::Ready(Some(gossip_event)) => match gossip_event { + GossipsubEvent::Message(peer_id, id, message) => { + log::debug!( + target: "robonomics-pubsub", + "Received message with id: {} from peer: {}", id, peer_id.to_base58() + ); + + // Dispatch handlers by topic name hash + for topic in &message.topics { + if let Some(inbox) = self.inbox.get_mut(topic) { + let _ = inbox.unbounded_send(super::Message { + from: message.source.clone(), + data: message.data.clone() + }); + } else { + log::warn!( + target: "robonomics-pubsub", + "Topic {} have no associated inbox!", topic + ); + } } } + _ => {} + }, + Poll::Ready(None) | Poll::Pending => break, + } + } + + loop { + match self.from_service.poll_next_unpin(cx) { + Poll::Ready(Some(request)) => match request { + ToWorkerMsg::Listen(addr, result) => { + let _ = result.send(self.listen(addr).is_ok()); + } + ToWorkerMsg::Connect(addr, result) => { + let _ = result.send(self.connect(addr)); + } + ToWorkerMsg::Listeners(result) => { + let _ = result.send(self.listeners()); + } + ToWorkerMsg::Subscribe(topic_name, inbox) => { + let _ = self.subscribe(topic_name, inbox); + } + ToWorkerMsg::Unsubscribe(topic_name, result) => { + let _ = result.send(self.unsubscribe(topic_name)); + } + ToWorkerMsg::Publish(topic_name, message) => { + self.publish(topic_name, message); + } }, - _ => {} + Poll::Ready(None) | Poll::Pending => break, } } + Poll::Pending + } +} + +/// LibP2P Gossipsub based publisher/subscriber service. +/// Note: it's thread safe. +pub struct PubSub { + peer_id: PeerId, + to_worker: mpsc::UnboundedSender, +} + +impl PubSub { + /// Create Gossipsub based PubSub service and worker. + /// + /// Usage: + /// ``` + /// let (pubsub, worker) = PubSub:new()?; + /// task::spawn(worker); + /// ... + /// task::block_on(pubsub.subscribe("test-topic").map(|msg| + /// println!("{}", msg.data) + /// ) + /// ... in different thread + /// pubsub.publish("test-topic", "hello world!".as_bytes()) + /// ``` + pub fn new() -> Result<(Arc, impl Future>)> { + PubSubWorker::new().map(|worker| (worker.service.clone(), worker)) } } impl super::PubSub for PubSub { + type Inbox = Box>; + fn peer_id(&self) -> PeerId { - Swarm::local_peer_id(&self.swarm).clone() + self.peer_id.clone() } - fn listen(&mut self, address: &Multiaddr) -> Result { - let listener = Swarm::listen_on(&mut self.swarm, address.clone())?; - log::debug!( - target: "robonomics-pubsub", - "Listener for address {} created: {:?}", address, listener - ); - Ok(listener) + fn listen(&self, address: Multiaddr) -> FutureResult { + let (sender, receiver) = oneshot::channel(); + let _ = self.to_worker.unbounded_send(ToWorkerMsg::Listen(address, sender)); + Box::new(receiver) } - fn listeners(&self) -> Vec { - Swarm::listeners(&self.swarm).cloned().collect() + fn listeners(&self) -> FutureResult> { + let (sender, receiver) = oneshot::channel(); + let _ = self.to_worker.unbounded_send(ToWorkerMsg::Listeners(sender)); + Box::new(receiver) } - fn connect(&mut self, address: &Multiaddr) -> Result<()> { - Swarm::dial_addr(&mut self.swarm, address.clone())?; - log::debug!( - target: "robonomics-pubsub", - "Connecting to {}", address - ); - Ok(()) + fn connect(&self, address: Multiaddr) -> FutureResult { + let (sender, receiver) = oneshot::channel(); + let _ = self.to_worker.unbounded_send(ToWorkerMsg::Connect(address, sender)); + Box::new(receiver) } - fn subscribe(&mut self, topic_name: T, callback: F) -> bool - where T: ToString, F: FnMut(PeerId, Vec) + 'static - { - log::debug!(target: "robonomics-pubsub", "Subscribed to {}", topic_name.to_string()); - - let topic = Topic::new(topic_name.to_string()); - self.subs.insert(topic.sha256_hash(), Box::new(callback)); - self.swarm.subscribe(topic) + fn subscribe(&self, topic_name: &T) -> Self::Inbox { + let (sender, receiver) = mpsc::unbounded(); + let _ = self.to_worker.unbounded_send(ToWorkerMsg::Subscribe(topic_name.to_string(), sender)); + Box::new(receiver) } - fn unsubscribe(&mut self, topic_name: T) -> bool { - log::debug!(target: "robonomics-pubsub", "Unsubscribed from {}", topic_name.to_string()); - - let topic = Topic::new(topic_name.to_string()); - self.swarm.deref_mut().unsubscribe(topic) + fn unsubscribe(&self, topic_name: &T) -> FutureResult { + let (sender, receiver) = oneshot::channel(); + let _ = self.to_worker.unbounded_send(ToWorkerMsg::Unsubscribe(topic_name.to_string(), sender)); + Box::new(receiver) } - fn publish>>(&mut self, topic_name: T, message: M) { - log::debug!(target: "robonomics-pubsub", "Publish to {}", topic_name.to_string()); - - let topic = Topic::new(topic_name.to_string()); - self.swarm.deref_mut().publish(&topic, message) + fn publish>>(&self, topic_name: &T, message: M) { + let _ = self.to_worker.unbounded_send(ToWorkerMsg::Publish( + topic_name.to_string(), + message.into(), + )); } } diff --git a/robonomics/protocol/src/runtime/mod.rs b/robonomics/protocol/src/runtime.rs similarity index 96% rename from robonomics/protocol/src/runtime/mod.rs rename to robonomics/protocol/src/runtime.rs index 4c7327fba..e4a47d0e0 100644 --- a/robonomics/protocol/src/runtime/mod.rs +++ b/robonomics/protocol/src/runtime.rs @@ -15,7 +15,7 @@ // limitations under the License. // /////////////////////////////////////////////////////////////////////////////// -//! SubXt compatible Robonomics Network abstration. +//! SubXt compatible Robonomics Network runtime abstration. use sp_runtime::{ traits::{BlakeTwo256, Verify, IdentifyAccount}, generic::Header, diff --git a/robonomics/protocol/src/runtime/pallet_datalog.rs b/robonomics/protocol/src/runtime/pallet_datalog.rs index 080a781a3..479cadfe2 100644 --- a/robonomics/protocol/src/runtime/pallet_datalog.rs +++ b/robonomics/protocol/src/runtime/pallet_datalog.rs @@ -21,22 +21,22 @@ use codec::{EncodeLike, Codec}; use sp_runtime::traits::Member; use substrate_subxt::{system, Call}; -/// The subset of the `pallet_balances::Trait` that a client must implement. +/// The subset of the `pallet_robonomics_datalog::Trait` that a client must implement. pub trait Datalog: system::System { type Record: Codec + EncodeLike + Member; } const MODULE: &str = "Datalog"; -const TRANSFER: &str = "record"; +const RECORD: &str = "record"; -/// Arguments for transferring a balance +/// Arguments for datalog record call. #[derive(codec::Encode)] -pub struct TransferArgs { +pub struct RecordArgs { record: ::Record } pub fn record( record: ::Record, -) -> Call> { - Call::new(MODULE, TRANSFER, TransferArgs { record }) +) -> Call> { + Call::new(MODULE, RECORD, RecordArgs { record }) }