Skip to content

Commit

Permalink
Merge #1258
Browse files Browse the repository at this point in the history
1258: Move out `ConnectionPool` from the `TokenDBCache` r=popzxc a=alekseysidorov

We should carefully check that this changes do not lead to deadlocks.

Co-authored-by: Aleksei Sidorov <[email protected]>
Co-authored-by: Aleksey Sidorov <[email protected]>
  • Loading branch information
3 people authored Dec 24, 2020
2 parents b35bf03 + d3b9d94 commit b684dae
Show file tree
Hide file tree
Showing 15 changed files with 171 additions and 103 deletions.
10 changes: 5 additions & 5 deletions core/bin/zksync_api/src/api_server/event_notify/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,11 @@ pub struct NotifierState {

impl NotifierState {
pub fn new(cache_capacity: usize, db_pool: ConnectionPool) -> Self {
let tokens_cache = TokenDBCache::new(db_pool.clone());

Self {
cache_of_executed_priority_operations: LruCache::new(cache_capacity),
cache_of_transaction_receipts: LruCache::new(cache_capacity),
cache_of_blocks_info: LruCache::new(cache_capacity),
tokens_cache,
tokens_cache: TokenDBCache::new(),
db_pool,
}
}
Expand Down Expand Up @@ -158,7 +156,9 @@ impl NotifierState {
action: ActionType,
) -> anyhow::Result<(AccountId, ResponseAccountState)> {
let start = Instant::now();

let mut storage = self.db_pool.access_storage().await?;

let account_state = storage
.chain()
.account_schema()
Expand All @@ -177,7 +177,7 @@ impl NotifierState {
}
.map(|(_, a)| a)
{
ResponseAccountState::try_restore(account, &self.tokens_cache).await?
ResponseAccountState::try_restore(&mut storage, &self.tokens_cache, account).await?
} else {
ResponseAccountState::default()
};
Expand Down Expand Up @@ -212,7 +212,7 @@ impl NotifierState {
};

let account = if let Some(account) = stored_account {
ResponseAccountState::try_restore(account, &self.tokens_cache)
ResponseAccountState::try_restore(&mut storage, &self.tokens_cache, account)
.await
.ok()
} else {
Expand Down
20 changes: 11 additions & 9 deletions core/bin/zksync_api/src/api_server/rest/v1/accounts/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use actix_web::{

// Workspace uses
use zksync_config::ConfigurationOptions;
use zksync_storage::{QueryResult, StorageProcessor};
use zksync_storage::{ConnectionPool, QueryResult, StorageProcessor};
use zksync_types::{AccountId, Address, BlockNumber, TokenId};

// Local uses
Expand Down Expand Up @@ -45,26 +45,29 @@ fn parse_account_query(query: String) -> Result<AccountQuery, ApiError> {
/// Shared data between `api/v1/accounts` endpoints.
#[derive(Clone)]
struct ApiAccountsData {
pool: ConnectionPool,
tokens: TokenDBCache,
core_api_client: CoreApiClient,
confirmations_for_eth_event: BlockNumber,
}

impl ApiAccountsData {
fn new(
pool: ConnectionPool,
tokens: TokenDBCache,
core_api_client: CoreApiClient,
confirmations_for_eth_event: BlockNumber,
) -> Self {
Self {
pool,
tokens,
core_api_client,
confirmations_for_eth_event,
}
}

async fn access_storage(&self) -> QueryResult<StorageProcessor<'_>> {
self.tokens.pool.access_storage().await.map_err(From::from)
self.pool.access_storage().await.map_err(From::from)
}

async fn find_account_address(&self, query: String) -> Result<Address, ApiError> {
Expand Down Expand Up @@ -122,20 +125,16 @@ impl ApiAccountsData {
.account_state_by_id(account_id)
.await?;

// Drop storage access to avoid deadlocks.
// TODO Rewrite `TokensDBCache` logic to make such errors impossible. ZKS-169
drop(storage);

let (account_id, account) = if let Some(state) = account_state.committed {
state
} else {
// This account has not been committed.
return Ok(None);
};

let committed = AccountState::from_storage(&account, &self.tokens).await?;
let committed = AccountState::from_storage(&mut storage, &self.tokens, &account).await?;
let verified = match account_state.verified {
Some(state) => AccountState::from_storage(&state.1, &self.tokens).await?,
Some(state) => AccountState::from_storage(&mut storage, &self.tokens, &state.1).await?,
None => AccountState::default(),
};

Expand All @@ -146,9 +145,10 @@ impl ApiAccountsData {
.await?;

DepositingBalances::from_pending_ops(
&mut storage,
&self.tokens,
ongoing_ops,
self.confirmations_for_eth_event,
&self.tokens,
)
.await?
};
Expand Down Expand Up @@ -289,10 +289,12 @@ async fn account_pending_receipts(

pub fn api_scope(
env_options: &ConfigurationOptions,
pool: ConnectionPool,
tokens: TokenDBCache,
core_api_client: CoreApiClient,
) -> Scope {
let data = ApiAccountsData::new(
pool,
tokens,
core_api_client,
env_options.confirmations_for_eth_event as BlockNumber,
Expand Down
3 changes: 2 additions & 1 deletion core/bin/zksync_api/src/api_server/rest/v1/accounts/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ impl TestServer {
let (api_client, api_server) = cfg.start_server(move |cfg| {
api_scope(
&cfg.env_options,
TokenDBCache::new(cfg.pool.clone()),
cfg.pool.clone(),
TokenDBCache::new(),
core_client.clone(),
)
});
Expand Down
12 changes: 7 additions & 5 deletions core/bin/zksync_api/src/api_server/rest/v1/accounts/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use zksync_storage::{
records::{AccountOpReceiptResponse, AccountTxReceiptResponse},
SearchDirection as StorageSearchDirection,
},
QueryResult, MAX_BLOCK_NUMBER,
QueryResult, StorageProcessor, MAX_BLOCK_NUMBER,
};
use zksync_types::{
tx::TxHash, Account, AccountId, Address, BlockNumber, Nonce, PriorityOp, PubKeyHash, H256,
Expand Down Expand Up @@ -222,13 +222,14 @@ impl FromStr for SearchDirection {

impl AccountState {
pub(crate) async fn from_storage(
account: &Account,
storage: &mut StorageProcessor<'_>,
tokens: &TokenDBCache,
account: &Account,
) -> QueryResult<Self> {
let mut balances = BTreeMap::new();
for (token_id, balance) in account.get_nonzero_balances() {
let token_symbol = tokens
.token_symbol(token_id)
.token_symbol(storage, token_id)
.await?
.ok_or_else(|| unable_to_find_token(token_id))?;

Expand All @@ -254,9 +255,10 @@ impl From<SearchDirection> for StorageSearchDirection {

impl DepositingBalances {
pub(crate) async fn from_pending_ops(
storage: &mut StorageProcessor<'_>,
tokens: &TokenDBCache,
ongoing_ops: Vec<PriorityOp>,
confirmations_for_eth_event: BlockNumber,
tokens: &TokenDBCache,
) -> QueryResult<Self> {
let mut balances = BTreeMap::new();

Expand All @@ -270,7 +272,7 @@ impl DepositingBalances {
};

let token_symbol = tokens
.token_symbol(token_id)
.token_symbol(storage, token_id)
.await?
.ok_or_else(|| unable_to_find_token(token_id))?;

Expand Down
2 changes: 2 additions & 0 deletions core/bin/zksync_api/src/api_server/rest/v1/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ pub(crate) fn api_scope(
web::scope("/api/v1")
.service(accounts::api_scope(
&env_options,
tx_sender.pool.clone(),
tx_sender.tokens.clone(),
tx_sender.core_api_client.clone(),
))
Expand All @@ -58,6 +59,7 @@ pub(crate) fn api_scope(
.service(operations::api_scope(tx_sender.pool.clone()))
.service(search::api_scope(tx_sender.pool.clone()))
.service(tokens::api_scope(
tx_sender.pool.clone(),
tx_sender.tokens,
tx_sender.ticker_requests,
))
Expand Down
33 changes: 24 additions & 9 deletions core/bin/zksync_api/src/api_server/rest/v1/tokens.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use futures::{
channel::{mpsc, oneshot},
prelude::*,
};
use zksync_storage::QueryResult;
use zksync_storage::{ConnectionPool, QueryResult};
use zksync_types::{Token, TokenLike};

// Local uses
Expand All @@ -33,15 +33,24 @@ use crate::{
struct ApiTokensData {
fee_ticker: mpsc::Sender<TickerRequest>,
tokens: TokenDBCache,
pool: ConnectionPool,
}

impl ApiTokensData {
fn new(tokens: TokenDBCache, fee_ticker: mpsc::Sender<TickerRequest>) -> Self {
Self { tokens, fee_ticker }
fn new(
pool: ConnectionPool,
tokens: TokenDBCache,
fee_ticker: mpsc::Sender<TickerRequest>,
) -> Self {
Self {
pool,
tokens,
fee_ticker,
}
}

async fn tokens(&self) -> QueryResult<Vec<Token>> {
let mut storage = self.tokens.pool.access_storage().await?;
let mut storage = self.pool.access_storage().await?;

let tokens = storage.tokens_schema().load_tokens().await?;

Expand All @@ -53,7 +62,9 @@ impl ApiTokensData {
}

async fn token(&self, token_like: TokenLike) -> QueryResult<Option<Token>> {
self.tokens.get_token(token_like).await
let mut storage = self.pool.access_storage().await?;

self.tokens.get_token(&mut storage, token_like).await
}

async fn token_price_usd(&self, token: TokenLike) -> QueryResult<Option<BigDecimal>> {
Expand Down Expand Up @@ -163,8 +174,12 @@ async fn token_price(
Ok(Json(price))
}

pub fn api_scope(tokens_db: TokenDBCache, fee_ticker: mpsc::Sender<TickerRequest>) -> Scope {
let data = ApiTokensData::new(tokens_db, fee_ticker);
pub fn api_scope(
pool: ConnectionPool,
tokens_db: TokenDBCache,
fee_ticker: mpsc::Sender<TickerRequest>,
) -> Scope {
let data = ApiTokensData::new(pool, tokens_db, fee_ticker);

web::scope("tokens")
.data(data)
Expand Down Expand Up @@ -234,7 +249,7 @@ mod tests {
let fee_ticker = dummy_fee_ticker(&prices);

let (client, server) = cfg.start_server(move |cfg| {
api_scope(TokenDBCache::new(cfg.pool.clone()), fee_ticker.clone())
api_scope(cfg.pool.clone(), TokenDBCache::new(), fee_ticker.clone())
});

// Fee requests
Expand Down Expand Up @@ -325,7 +340,7 @@ mod tests {

let fee_ticker = dummy_fee_ticker(&[]);
let (client, server) = cfg.start_server(move |cfg| {
api_scope(TokenDBCache::new(cfg.pool.clone()), fee_ticker.clone())
api_scope(cfg.pool.clone(), TokenDBCache::new(), fee_ticker.clone())
});

// Get Golem token as GNT.
Expand Down
16 changes: 12 additions & 4 deletions core/bin/zksync_api/src/api_server/rpc_server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -330,13 +330,21 @@ impl RpcApp {

if let Some((account_id, committed_state)) = account_info.committed {
result.account_id = Some(account_id);
result.committed =
ResponseAccountState::try_restore(committed_state, &self.tx_sender.tokens).await?;
result.committed = ResponseAccountState::try_restore(
&mut storage,
&self.tx_sender.tokens,
committed_state,
)
.await?;
};

if let Some((_, verified_state)) = account_info.verified {
result.verified =
ResponseAccountState::try_restore(verified_state, &self.tx_sender.tokens).await?;
result.verified = ResponseAccountState::try_restore(
&mut storage,
&self.tx_sender.tokens,
verified_state,
)
.await?;
};

metrics::histogram!("api.rpc.get_account_state", start.elapsed());
Expand Down
9 changes: 6 additions & 3 deletions core/bin/zksync_api/src/api_server/rpc_server/rpc_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,12 @@ impl RpcApp {
let account_state = self.get_account_state(address).await?;

let depositing_ops = self.get_ongoing_deposits_impl(address).await?;
let depositing =
DepositingAccountBalances::from_pending_ops(depositing_ops, &self.tx_sender.tokens)
.await?;
let depositing = DepositingAccountBalances::from_pending_ops(
&mut self.access_storage().await?,
&self.tx_sender.tokens,
depositing_ops,
)
.await?;

log::trace!(
"account_info: address {}, total request processing {}ms",
Expand Down
14 changes: 10 additions & 4 deletions core/bin/zksync_api/src/api_server/rpc_server/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::collections::HashMap;
use jsonrpc_core::{Error, Result};
use num::{BigUint, ToPrimitive};
use serde::{Deserialize, Serialize};
use zksync_storage::StorageProcessor;
// Workspace uses
use zksync_types::{
tx::TxEthSignature, Account, AccountId, Address, Nonce, PriorityOp, PubKeyHash,
Expand All @@ -28,8 +29,12 @@ pub struct ResponseAccountState {
}

impl ResponseAccountState {
pub async fn try_restore(account: Account, tokens: &TokenDBCache) -> Result<Self> {
let inner = AccountState::from_storage(&account, tokens)
pub async fn try_restore(
storage: &mut StorageProcessor<'_>,
tokens: &TokenDBCache,
account: Account,
) -> Result<Self> {
let inner = AccountState::from_storage(storage, tokens, &account)
.await
.map_err(|_| Error::internal_error())?;

Expand Down Expand Up @@ -68,8 +73,9 @@ pub struct DepositingAccountBalances {

impl DepositingAccountBalances {
pub async fn from_pending_ops(
pending_ops: OngoingDepositsResp,
storage: &mut StorageProcessor<'_>,
tokens: &TokenDBCache,
pending_ops: OngoingDepositsResp,
) -> Result<Self> {
let mut balances = HashMap::new();

Expand All @@ -78,7 +84,7 @@ impl DepositingAccountBalances {
"ETH".to_string()
} else {
tokens
.get_token(op.token_id)
.get_token(storage, op.token_id)
.await
.map_err(|_| Error::internal_error())?
.ok_or_else(Error::internal_error)?
Expand Down
12 changes: 9 additions & 3 deletions core/bin/zksync_api/src/api_server/tx_sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,10 @@ impl TxSender {

Self {
core_api_client,
pool: connection_pool.clone(),
pool: connection_pool,
sign_verify_requests: sign_verify_request_sender,
ticker_requests: ticker_request_sender,
tokens: TokenDBCache::new(connection_pool),
tokens: TokenDBCache::new(),

enforce_pubkey_change_fee,
forced_exit_minimum_account_age,
Expand Down Expand Up @@ -415,8 +415,14 @@ impl TxSender {
}

async fn token_info_from_id(&self, token_id: TokenId) -> Result<Token, SubmitError> {
let mut storage = self
.pool
.access_storage()
.await
.map_err(SubmitError::internal)?;

self.tokens
.get_token(token_id)
.get_token(&mut storage, token_id)
.await
.map_err(SubmitError::internal)?
// TODO Make error more clean
Expand Down
Loading

0 comments on commit b684dae

Please sign in to comment.