Skip to content

Commit

Permalink
remove unused json-rpc streaming interface
Browse files Browse the repository at this point in the history
  • Loading branch information
bmwill authored and bors-libra committed Dec 16, 2021
1 parent bd74186 commit 19f2bad
Show file tree
Hide file tree
Showing 41 changed files with 12 additions and 3,569 deletions.
69 changes: 3 additions & 66 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

32 changes: 0 additions & 32 deletions config/src/config/json_rpc_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ pub struct JsonRpcConfig {
pub content_length_limit: usize,
pub tls_cert_path: Option<String>,
pub tls_key_path: Option<String>,
#[serde(default)]
pub stream_rpc: StreamConfig,
}

pub const DEFAULT_JSON_RPC_ADDRESS: &str = "127.0.0.1";
Expand All @@ -35,7 +33,6 @@ impl Default for JsonRpcConfig {
content_length_limit: DEFAULT_CONTENT_LENGTH_LIMIT,
tls_cert_path: None,
tls_key_path: None,
stream_rpc: StreamConfig::default(),
}
}
}
Expand All @@ -45,32 +42,3 @@ impl JsonRpcConfig {
self.address.set_port(utils::get_available_port());
}
}

/// This API is experimental and subject to change
/// Documentation is in /json-rpc/src/stream_rpc/README.md
#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
#[serde(default)]
pub struct StreamConfig {
pub enabled: bool,
pub subscription_fetch_size: u64,
pub send_queue_size: usize,
pub poll_interval_ms: u64,
pub max_poll_interval_ms: u64,
}

pub const DEFAULT_STREAM_RPC_SUBSCRIPTION_FETCH_SIZE: u64 = 100;
pub const DEFAULT_STREAM_RPC_SEND_QUEUE_SIZE: usize = 100;
pub const DEFAULT_STREAM_RPC_POLL_INTERVAL_MS: u64 = 1000;
pub const DEFAULT_STREAM_RPC_MAX_POLL_INTERVAL_MS: u64 = 5000;

impl Default for StreamConfig {
fn default() -> StreamConfig {
StreamConfig {
enabled: false,
subscription_fetch_size: DEFAULT_STREAM_RPC_SUBSCRIPTION_FETCH_SIZE,
send_queue_size: DEFAULT_STREAM_RPC_SEND_QUEUE_SIZE,
poll_interval_ms: DEFAULT_STREAM_RPC_POLL_INTERVAL_MS,
max_poll_interval_ms: DEFAULT_STREAM_RPC_MAX_POLL_INTERVAL_MS,
}
}
}
5 changes: 1 addition & 4 deletions crates/diem-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,10 @@ publish = ["crates-io"]
edition = "2018"

[features]
default = ["async", "blocking", "faucet", "websocket"]
default = ["async", "blocking", "faucet"]
blocking = ["ureq", "ipnet"]
async = ["reqwest", "tokio"]
faucet = ["reqwest", "reqwest/blocking", "blocking"]
websocket = ["async", "futures", "tokio-tungstenite"]

[dependencies]
anyhow = "1.0.38"
Expand All @@ -31,10 +30,8 @@ move-core-types = { path = "../../language/move-core/types", version = "0.0.3" }
diem-types = { path = "../../types", version = "0.0.3" }

# Optional Dependencies
futures = {version = "0.3.12", optional = true }
reqwest = { version = "0.11.2", features = ["json"], optional = true }
tokio = { version = "1.8.1", features = ["time"], default_features = false, optional = true }
tokio-tungstenite = { version = "0.14", optional = true }
ureq = { version = "1.5.4", features = ["json", "native-tls"], default-features = false, optional = true }
ipnet = { version = "2.3", optional = true }
diem-workspace-hack = { version = "0.1", path = "../diem-workspace-hack" }
Expand Down
129 changes: 1 addition & 128 deletions crates/diem-client/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,7 @@
// 'blocking' feature are enabled
#![allow(dead_code)]

use diem_json_rpc_types::{errors::JsonRpcError, stream::response::StreamJsonRpcResponse};

cfg_websocket! {
use tokio_tungstenite::tungstenite;
}
use diem_json_rpc_types::errors::JsonRpcError;

pub type Result<T, E = Error> = ::std::result::Result<T, E>;

Expand Down Expand Up @@ -181,129 +177,6 @@ impl From<serde_json::Error> for Error {
}
}

cfg_websocket! {

pub type StreamResult<T, E = StreamError> = ::std::result::Result<T, E>;

#[derive(Debug)]
pub enum StreamKind {
HttpStatus(u16),
Request,
Decode,
Encode,
ConnectionClosed,
MessageTooLarge,
HttpError,
TlsError,
IdAlreadyUsed,
IdNotFound(Option<StreamJsonRpcResponse>),
QueueFullError,
JsonRpcError,
SubscriptionOkTimeout
}

#[derive(Debug)]
pub struct StreamError {
inner: Box<StreamInner>,
}

#[derive(Debug)]
struct StreamInner {
kind: StreamKind,
source: Option<BoxError>,
json_rpc_error: Option<JsonRpcError>,
}

impl StreamError {
pub fn json_rpc_error(&self) -> Option<&JsonRpcError> {
self.inner.json_rpc_error.as_ref()
}

pub(crate) fn from_tungstenite_error(e: tungstenite::Error) -> Self {
match e {
tungstenite::Error::ConnectionClosed => Self::connection_closed(None::<Self>),
tungstenite::Error::AlreadyClosed => Self::connection_closed(None::<Self>),
tungstenite::Error::Io(e) => Self::connection_closed(Some(e)),
tungstenite::Error::Tls(e) => Self::new(StreamKind::TlsError, Some(e)),
tungstenite::Error::Capacity(e) => Self::new(StreamKind::MessageTooLarge, Some(e)),
tungstenite::Error::Protocol(e) => Self::connection_closed(Some(e)),
tungstenite::Error::SendQueueFull(_) => {
Self::new(StreamKind::QueueFullError, None::<Self>)
}
tungstenite::Error::Utf8 => Self::encode(e),
tungstenite::Error::Url(e) => Self::new(StreamKind::Request, Some(e)),
tungstenite::Error::Http(e) => {
Self::new(StreamKind::HttpStatus(e.status().as_u16()), None::<Self>)
}
tungstenite::Error::HttpFormat(e) => Self::new(StreamKind::HttpError, Some(e)),
}
}

pub(crate) fn decode<E: Into<BoxError>>(e: E) -> Self {
Self::new(StreamKind::Decode, Some(e))
}

pub(crate) fn encode<E: Into<BoxError>>(e: E) -> Self {
Self::new(StreamKind::Encode, Some(e))
}

pub(crate) fn from_http_error(e: tungstenite::http::Error) -> Self {
Self::new(StreamKind::HttpError, Some(e))
}

pub(crate) fn connection_closed<E: Into<BoxError>>(e: Option<E>) -> Self {
Self::new(StreamKind::ConnectionClosed, e)
}

pub(crate) fn subscription_id_already_used<E: Into<BoxError>>(e: Option<E>) -> Self {
Self::new(StreamKind::IdAlreadyUsed, e)
}

pub(crate) fn subscription_ok_timeout() -> Self {
Self::new(StreamKind::SubscriptionOkTimeout, None::<Self>)
}

pub(crate) fn subscription_json_rpc_error(error: JsonRpcError) -> Self{
Self {
inner: Box::new(StreamInner {
kind: StreamKind::JsonRpcError,
source: None,
json_rpc_error: Some(error),
}),
}
}

fn new<E: Into<BoxError>>(kind: StreamKind, source: Option<E>) -> Self {
Self {
inner: Box::new(StreamInner {
kind,
source: source.map(Into::into),
json_rpc_error: None,
}),
}
}
}

impl From<serde_json::Error> for StreamError {
fn from(e: serde_json::Error) -> Self {
Self::decode(e)
}
}

impl std::fmt::Display for StreamError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{:?}", self)
}
}

impl std::error::Error for StreamError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
self.inner.source.as_ref().map(|e| &**e as _)
}
}

}

#[allow(clippy::large_enum_variant)]
#[derive(Debug)]
pub enum WaitForTransactionError {
Expand Down
6 changes: 0 additions & 6 deletions crates/diem-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,6 @@ cfg_async_or_blocking! {
pub use move_deserialize::Event;
}

// This API is experimental and subject to change
cfg_websocket! {
pub use error::{StreamError, StreamResult};
pub mod stream;
}

mod state;
pub use state::State;

Expand Down
10 changes: 0 additions & 10 deletions crates/diem-client/src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,3 @@ macro_rules! cfg_faucet {
)*
}
}

macro_rules! cfg_websocket {
($($item:item)*) => {
$(
#[cfg(feature = "websocket")]
#[cfg_attr(docsrs, doc(cfg(feature = "websocket")))]
$item
)*
}
}
10 changes: 0 additions & 10 deletions crates/diem-client/src/stream/mod.rs

This file was deleted.

Loading

0 comments on commit 19f2bad

Please sign in to comment.