Skip to content

Commit

Permalink
feat(api): add a config flag for disabling filter api (matter-labs#1078)
Browse files Browse the repository at this point in the history
## What ❔

Added a config flag to disable all filter-related methods. If turned on,
they report that the method is not implemented.

## Why ❔

If a node is behind a load balancer then there is no way for client to
reliably hit the same node where he/she created the filter. Hence
supporting this is a very awkward UX. Making this a flag since external
nodes might want to turn this on if they only have a single node mapped
to a static address.

## Checklist

<!-- Check your PR fulfills the following items. -->
<!-- For draft PRs check the boxes as you complete them. -->

- [x] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [x] Tests for the changes have been added / updated.
- [x] Documentation comments have been added / updated.
- [x] Code has been formatted via `zk fmt` and `zk lint`.
- [x] Spellcheck has been run via `zk spellcheck`.
- [x] Linkcheck has been run via `zk linkcheck`.
  • Loading branch information
itegulov authored Feb 22, 2024
1 parent a879621 commit b486d7e
Show file tree
Hide file tree
Showing 17 changed files with 160 additions and 42 deletions.
1 change: 1 addition & 0 deletions checks-config/era.dic
Original file line number Diff line number Diff line change
Expand Up @@ -902,3 +902,4 @@ reimplementation
composability
md5
shivini
balancer
10 changes: 10 additions & 0 deletions core/bin/external_node/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,15 @@ pub struct OptionalENConfig {
latest_values_cache_size_mb: usize,
/// Enabled JSON RPC API namespaces.
api_namespaces: Option<Vec<Namespace>>,
/// Whether to support methods installing filters and querying filter changes.
///
/// When to set this value to `true`:
/// Filters are local to the specific node they were created at. Meaning if
/// there are multiple nodes behind a load balancer the client cannot reliably
/// query the previously created filter as the request might get routed to a
/// different node.
#[serde(default)]
pub filters_disabled: bool,

// Gas estimation config
/// The factor by which to scale the gasLimit
Expand Down Expand Up @@ -567,6 +576,7 @@ impl From<ExternalNodeConfig> for InternalApiConfig {
l2_testnet_paymaster_addr: config.remote.l2_testnet_paymaster_addr,
req_entities_limit: config.optional.req_entities_limit,
fee_history_limit: config.optional.fee_history_limit,
filters_disabled: config.optional.filters_disabled,
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions core/bin/external_node/src/config/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ fn parsing_optional_config_from_empty_env() {
#[test]
fn parsing_optional_config_from_env() {
let env_vars = [
("EN_FILTERS_DISABLED", "true"),
("EN_FILTERS_LIMIT", "5000"),
("EN_SUBSCRIPTIONS_LIMIT", "20000"),
("EN_FEE_HISTORY_LIMIT", "1000"),
Expand All @@ -50,6 +51,7 @@ fn parsing_optional_config_from_env() {
.map(|(name, value)| (name.to_owned(), value.to_owned()));

let config: OptionalENConfig = envy::prefixed("EN_").from_iter(env_vars).unwrap();
assert!(config.filters_disabled);
assert_eq!(config.filters_limit, 5_000);
assert_eq!(config.subscriptions_limit, 20_000);
assert_eq!(config.fee_history_limit, 1_000);
Expand Down
10 changes: 10 additions & 0 deletions core/lib/config/src/configs/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,15 @@ pub struct Web3JsonRpcConfig {
pub ws_url: String,
/// Max possible limit of entities to be requested once.
pub req_entities_limit: Option<u32>,
/// Whether to support methods installing filters and querying filter changes.
///
/// When to set this value to `true`:
/// Filters are local to the specific node they were created at. Meaning if
/// there are multiple nodes behind a load balancer the client cannot reliably
/// query the previously created filter as the request might get routed to a
/// different node.
#[serde(default)]
pub filters_disabled: bool,
/// Max possible limit of filters to be in the state at once.
pub filters_limit: Option<u32>,
/// Max possible limit of subscriptions to be in the state at once.
Expand Down Expand Up @@ -100,6 +109,7 @@ impl Web3JsonRpcConfig {
ws_port: 3051,
ws_url: "ws://localhost:3051".into(),
req_entities_limit: Some(10000),
filters_disabled: false,
filters_limit: Some(10000),
subscriptions_limit: Some(10000),
pubsub_polling_interval: Some(200),
Expand Down
1 change: 1 addition & 0 deletions core/lib/config/src/testonly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ impl RandomConfig for configs::api::Web3JsonRpcConfig {
ws_port: g.gen(),
ws_url: g.gen(),
req_entities_limit: g.gen(),
filters_disabled: g.gen(),
filters_limit: g.gen(),
subscriptions_limit: g.gen(),
pubsub_polling_interval: g.gen(),
Expand Down
2 changes: 2 additions & 0 deletions core/lib/env_config/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ mod tests {
ws_port: 3051,
ws_url: "ws://127.0.0.1:3051".into(),
req_entities_limit: Some(10000),
filters_disabled: false,
filters_limit: Some(10000),
subscriptions_limit: Some(10000),
pubsub_polling_interval: Some(200),
Expand Down Expand Up @@ -111,6 +112,7 @@ mod tests {
API_WEB3_JSON_RPC_WS_PORT="3051"
API_WEB3_JSON_RPC_WS_URL="ws://127.0.0.1:3051"
API_WEB3_JSON_RPC_REQ_ENTITIES_LIMIT=10000
API_WEB3_JSON_RPC_FILTERS_DISABLED=false
API_WEB3_JSON_RPC_FILTERS_LIMIT=10000
API_WEB3_JSON_RPC_SUBSCRIPTIONS_LIMIT=10000
API_WEB3_JSON_RPC_PUBSUB_POLLING_INTERVAL=200
Expand Down
2 changes: 2 additions & 0 deletions core/lib/protobuf_config/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ impl ProtoRepr for proto::Web3JsonRpc {
.context("ws_port")?,
ws_url: required(&self.ws_url).context("ws_url")?.clone(),
req_entities_limit: self.req_entities_limit,
filters_disabled: self.filters_disabled.unwrap_or(false),
filters_limit: self.filters_limit,
subscriptions_limit: self.subscriptions_limit,
pubsub_polling_interval: self.pubsub_polling_interval,
Expand Down Expand Up @@ -128,6 +129,7 @@ impl ProtoRepr for proto::Web3JsonRpc {
ws_port: Some(this.ws_port.into()),
ws_url: Some(this.ws_url.clone()),
req_entities_limit: this.req_entities_limit,
filters_disabled: Some(this.filters_disabled),
filters_limit: this.filters_limit,
subscriptions_limit: this.subscriptions_limit,
pubsub_polling_interval: this.pubsub_polling_interval,
Expand Down
1 change: 1 addition & 0 deletions core/lib/protobuf_config/src/proto/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ message Web3JsonRpc {
optional uint64 max_response_body_size_mb = 24; // optional; MB
optional uint32 websocket_requests_per_minute_limit = 25; // optional
optional string tree_api_url = 26; // optional
optional bool filters_disabled = 27; // optional
}

message ContractVerificationApi {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,15 @@ impl EthNamespaceServer for EthNamespace {
}

async fn uninstall_filter(&self, idx: U256) -> RpcResult<bool> {
Ok(self.uninstall_filter_impl(idx).await)
self.uninstall_filter_impl(idx)
.await
.map_err(into_jsrpc_error)
}

async fn new_pending_transaction_filter(&self) -> RpcResult<U256> {
Ok(self.new_pending_transaction_filter_impl().await)
self.new_pending_transaction_filter_impl()
.await
.map_err(into_jsrpc_error)
}

async fn get_logs(&self, filter: Filter) -> RpcResult<Vec<Log>> {
Expand Down
18 changes: 16 additions & 2 deletions core/lib/zksync_core/src/api_server/web3/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,8 +283,16 @@ impl FullApiParams {
let start_info = BlockStartInfo::new(&mut storage).await?;
drop(storage);

let installed_filters = if self.config.filters_disabled {
None
} else {
Some(Arc::new(Mutex::new(Filters::new(
self.optional.filters_limit,
))))
};

Ok(RpcState {
installed_filters: Arc::new(Mutex::new(Filters::new(self.optional.filters_limit))),
installed_filters,
connection_pool: self.pool,
tx_sender: self.tx_sender,
sync_state: self.optional.sync_state,
Expand Down Expand Up @@ -349,7 +357,13 @@ impl FullApiParams {
self,
stop_receiver: watch::Receiver<bool>,
) -> anyhow::Result<ApiServerHandles> {
if self.optional.filters_limit.is_none() {
if self.config.filters_disabled {
if self.optional.filters_limit.is_some() {
tracing::warn!(
"Filters limit is not supported when filters are disabled, ignoring"
);
}
} else if self.optional.filters_limit.is_none() {
tracing::warn!("Filters limit is not set - unlimited filters are allowed");
}

Expand Down
61 changes: 37 additions & 24 deletions core/lib/zksync_core/src/api_server/web3/namespaces/eth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,13 +231,13 @@ impl EthNamespace {
const METHOD_NAME: &str = "get_filter_logs";

let method_latency = API_METRICS.start_call(METHOD_NAME);
// We clone the filter to not hold the filter lock for an extended period of time.
let maybe_filter = self
let installed_filters = self
.state
.installed_filters
.lock()
.await
.get_and_update_stats(idx);
.as_ref()
.ok_or(Web3Error::NotImplemented)?;
// We clone the filter to not hold the filter lock for an extended period of time.
let maybe_filter = installed_filters.lock().await.get_and_update_stats(idx);

let Some(TypedFilter::Events(filter, _)) = maybe_filter else {
return Err(Web3Error::FilterNotFound);
Expand Down Expand Up @@ -583,6 +583,11 @@ impl EthNamespace {
const METHOD_NAME: &str = "new_block_filter";

let method_latency = API_METRICS.start_call(METHOD_NAME);
let installed_filters = self
.state
.installed_filters
.as_ref()
.ok_or(Web3Error::NotImplemented)?;
let mut storage = self
.state
.connection_pool
Expand All @@ -598,9 +603,7 @@ impl EthNamespace {
let next_block_number = last_block_number + 1;
drop(storage);

let idx = self
.state
.installed_filters
let idx = installed_filters
.lock()
.await
.add(TypedFilter::Blocks(next_block_number));
Expand All @@ -613,6 +616,11 @@ impl EthNamespace {
const METHOD_NAME: &str = "new_filter";

let method_latency = API_METRICS.start_call(METHOD_NAME);
let installed_filters = self
.state
.installed_filters
.as_ref()
.ok_or(Web3Error::NotImplemented)?;
if let Some(topics) = filter.topics.as_ref() {
if topics.len() > EVENT_TOPIC_NUMBER_LIMIT {
return Err(Web3Error::TooManyTopics);
Expand All @@ -621,9 +629,7 @@ impl EthNamespace {

self.state.resolve_filter_block_hash(&mut filter).await?;
let from_block = self.state.get_filter_from_block(&filter).await?;
let idx = self
.state
.installed_filters
let idx = installed_filters
.lock()
.await
.add(TypedFilter::Events(filter, from_block));
Expand All @@ -632,47 +638,49 @@ impl EthNamespace {
}

#[tracing::instrument(skip(self))]
pub async fn new_pending_transaction_filter_impl(&self) -> U256 {
pub async fn new_pending_transaction_filter_impl(&self) -> Result<U256, Web3Error> {
const METHOD_NAME: &str = "new_pending_transaction_filter";

let method_latency = API_METRICS.start_call(METHOD_NAME);
let idx = self
let installed_filters = self
.state
.installed_filters
.as_ref()
.ok_or(Web3Error::NotImplemented)?;
let idx = installed_filters
.lock()
.await
.add(TypedFilter::PendingTransactions(
chrono::Utc::now().naive_utc(),
));
method_latency.observe();
idx
Ok(idx)
}

#[tracing::instrument(skip(self))]
pub async fn get_filter_changes_impl(&self, idx: U256) -> Result<FilterChanges, Web3Error> {
const METHOD_NAME: &str = "get_filter_changes";

let method_latency = API_METRICS.start_call(METHOD_NAME);
let mut filter = self
let installed_filters = self
.state
.installed_filters
.as_ref()
.ok_or(Web3Error::NotImplemented)?;
let mut filter = installed_filters
.lock()
.await
.get_and_update_stats(idx)
.ok_or(Web3Error::FilterNotFound)?;

let result = match self.filter_changes(&mut filter).await {
Ok(changes) => {
self.state
.installed_filters
.lock()
.await
.update(idx, filter);
installed_filters.lock().await.update(idx, filter);
Ok(changes)
}
Err(Web3Error::LogsLimitExceeded(..)) => {
// The filter was not being polled for a long time, so we remove it.
self.state.installed_filters.lock().await.remove(idx);
installed_filters.lock().await.remove(idx);
Err(Web3Error::FilterNotFound)
}
Err(err) => Err(err),
Expand All @@ -682,13 +690,18 @@ impl EthNamespace {
}

#[tracing::instrument(skip(self))]
pub async fn uninstall_filter_impl(&self, idx: U256) -> bool {
pub async fn uninstall_filter_impl(&self, idx: U256) -> Result<bool, Web3Error> {
const METHOD_NAME: &str = "uninstall_filter";

let method_latency = API_METRICS.start_call(METHOD_NAME);
let removed = self.state.installed_filters.lock().await.remove(idx);
let installed_filters = self
.state
.installed_filters
.as_ref()
.ok_or(Web3Error::NotImplemented)?;
let removed = installed_filters.lock().await.remove(idx);
method_latency.observe();
removed
Ok(removed)
}

#[tracing::instrument(skip(self))]
Expand Down
4 changes: 3 additions & 1 deletion core/lib/zksync_core/src/api_server/web3/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ pub struct InternalApiConfig {
pub l2_testnet_paymaster_addr: Option<Address>,
pub req_entities_limit: usize,
pub fee_history_limit: u64,
pub filters_disabled: bool,
}

impl InternalApiConfig {
Expand All @@ -112,6 +113,7 @@ impl InternalApiConfig {
l2_testnet_paymaster_addr: contracts_config.l2_testnet_paymaster_addr,
req_entities_limit: web3_config.req_entities_limit(),
fee_history_limit: web3_config.fee_history_limit(),
filters_disabled: web3_config.filters_disabled,
}
}
}
Expand Down Expand Up @@ -194,7 +196,7 @@ impl SealedMiniblockNumber {
/// Holder for the data required for the API to be functional.
#[derive(Debug, Clone)]
pub struct RpcState {
pub(crate) installed_filters: Arc<Mutex<Filters>>,
pub(crate) installed_filters: Option<Arc<Mutex<Filters>>>,
pub connection_pool: ConnectionPool,
pub tree_api: Option<TreeApiHttpClient>,
pub tx_sender: TxSender,
Expand Down
40 changes: 40 additions & 0 deletions core/lib/zksync_core/src/api_server/web3/tests/filters.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
//! Tests for filter-related methods in the `eth` namespace.
use std::fmt::Debug;

use jsonrpsee::{core::client::Error, types::error::ErrorCode};
use zksync_web3_decl::{jsonrpsee::core::ClientError as RpcError, types::FilterChanges};

use super::*;
Expand Down Expand Up @@ -263,3 +266,40 @@ impl HttpTest for LogFilterChangesWithBlockBoundariesTest {
async fn log_filter_changes_with_block_boundaries() {
test_http_server(LogFilterChangesWithBlockBoundariesTest).await;
}

fn assert_not_implemented<T: Debug>(result: Result<T, Error>) {
assert_matches!(result, Err(Error::Call(e)) => {
assert_eq!(e.code(), ErrorCode::InternalError.code());
assert_eq!(e.message(), "Not implemented");
});
}

#[derive(Debug)]
struct DisableFiltersTest;

#[async_trait]
impl HttpTest for DisableFiltersTest {
async fn test(&self, client: &HttpClient, _pool: &ConnectionPool) -> anyhow::Result<()> {
let filter = Filter {
from_block: Some(api::BlockNumber::Number(2.into())),
..Filter::default()
};
assert_not_implemented(client.new_filter(filter).await);
assert_not_implemented(client.new_block_filter().await);
assert_not_implemented(client.uninstall_filter(1.into()).await);
assert_not_implemented(client.new_pending_transaction_filter().await);
assert_not_implemented(client.get_filter_logs(1.into()).await);
assert_not_implemented(client.get_filter_changes(1.into()).await);

Ok(())
}

fn filters_disabled(&self) -> bool {
true
}
}

#[tokio::test]
async fn disable_filters() {
test_http_server(DisableFiltersTest).await;
}
Loading

0 comments on commit b486d7e

Please sign in to comment.