Skip to content

Commit

Permalink
[faucet] Improving observability and throughput (MystenLabs#7091)
Browse files Browse the repository at this point in the history
A number of changes to improve the quality of metrics we receive from
the faucet, and also reduce bottlenecking at the core of the service, to
achieve a higher transaction throughput:

### Fixes to `SimpleFaucet::prepare_gas_coin`

This function had a confusing control-flow that hid a few bugs:

- The consumer lock was held for its entire body, when it was only
needed at the beginning (to recieve a Coin ID). Fixed by factoring out
the part of the function that needs to hold that consumer lock into
`pop_gas_coin`.
- If the consumer lock timed out, the function produced an error that
was never used. Fixed by deleting this unused error message.
- The rest of the body is contained within an unconditional loop which
always runs exactly once. Fixed by removing the loop and simplifying the
control-flow.
- The control-flow path that discards a coin because it has insufficient
balance did not update the `total_discarded_coins` metric. Fixed by
moving the logic for updating this metric to `transfer_gases` which is
responsible for handling the return from `prepare_gas_coin` -- this is
also the function that is responsible for recycling coins that should be
kept.

### Standardise format for Coin IDs in log messages

- Output them as a `coin_id` label that can be parsed and queried
against, rather than inline in the message.

### Improvements to RequestMetrics middleware

- Detect and count disconnects, printing their elapsed time in the logs.
- Treat non-successful (2xx) HTTP Responses as failed requests

## Test Plan:

Unit tests:
```
sui$ cargo nextest run
```

Test the various new features on a local instance of the faucet, by:

```
 # Starting validators
sui$ cargo build
$ rm -rf ~/.sui                        \
  && ~/sui/target/debug/sui genesis -f \
  && ~/sui/target/debug/sui start

 # Starting faucet (new terminal session)
$ ~/sui/target/debug/sui-faucet \
  --request-buffer-size 2       \
  --max-request-per-second 1

 # Sending a faucet request (excuse fish shell syntax)
$ set -l ADDRESS (yq -r .active_address ~/.sui/sui_config/client.yaml | sed 's/0x//' | string upper)
  curl --location --request POST 'http://127.0.0.1:5003/gas' \
      --header 'Content-Type: application/json'              \
      --data-raw "{ \"FixedAmountRequest\": { \"recipient\": \"$ADDRESS\" } }"
```

- Test detecting request failure by replacing the body of
`SimpleFaucet::pop_gas_coin` with `None`
- Test load shedding by running multiple faucet requests in `while`
loops (three is sufficient)
- Test disconnection metrics by adding a sleep command within
`SimpleFaucet::prepare_gas_coin` and interrupting the `curl` command
before it finishes.
  • Loading branch information
amnn authored Jan 3, 2023
1 parent c824243 commit e0a5e0b
Show file tree
Hide file tree
Showing 4 changed files with 142 additions and 134 deletions.
2 changes: 1 addition & 1 deletion crates/sui-faucet/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pub enum FaucetError {
#[error("Gas coin `{0}` is not valid and has been removed from gas coin pool")]
InvalidGasCoin(String),

#[error("No gas coin available in the gas coin pool")]
#[error("Timed out waiting for a coin from the gas coin pool")]
NoGasCoinAvailable,

#[error("Wallet Error: `{0}`")]
Expand Down
181 changes: 77 additions & 104 deletions crates/sui-faucet/src/faucet/simple_faucet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ enum GasCoinResponse {

const DEFAULT_GAS_BUDGET: u64 = 1000;
const PAY_SUI_GAS: u64 = 1000;
const WAIT_FOR_GAS_TIMEOUT_SEC: u64 = 5;
const WAIT_FOR_LOCK_TIMEOUT_SEC: u64 = 10;
const LOCK_TIMEOUT: Duration = Duration::from_secs(10);
const RECV_TIMEOUT: Duration = Duration::from_secs(5);

impl SimpleFaucet {
pub async fn new(
Expand Down Expand Up @@ -96,104 +96,70 @@ impl SimpleFaucet {
})
}

/// Take the consumer lock and pull a Coin ID from the queue, without checking whether it is
/// valid or not.
async fn pop_gas_coin(&self, uuid: Uuid) -> Option<ObjectID> {
// If the gas candidate queue is exhausted, the request will be suspended indefinitely until
// a producer puts in more candidate gas objects. At the same time, other requests will be
// blocked by the lock acquisition as well.
let Ok(mut consumer) = tokio::time::timeout(LOCK_TIMEOUT, self.consumer.lock()).await else {
error!(?uuid, "Timeout when getting consumer lock");
return None;
};

info!(?uuid, "Got consumer lock, pulling coins.");
let Ok(coin) = tokio::time::timeout(RECV_TIMEOUT, consumer.recv()).await else {
error!(?uuid, "Timeout when getting gas coin from the queue");
return None;
};

let Some(coin) = coin else {
unreachable!("channel is closed");
};

Some(coin)
}

/// Pulls a coin from the queue and makes sure it is fit for use (belongs to the faucet, has
/// sufficient balance).
async fn prepare_gas_coin(&self, total_amount: u64, uuid: Uuid) -> GasCoinResponse {
// If the gas candidate queue is exhausted, the request will be
// suspended indefinitely until a producer puts in more candidate
// gas objects. At the same time, other requests will be blocked by the
// lock acquisition as well.
let consumer_res = match tokio::time::timeout(
Duration::from_secs(WAIT_FOR_LOCK_TIMEOUT_SEC),
self.consumer.lock(),
)
.await
{
Ok(guard) => Ok(guard),
Err(_) => {
error!(?uuid, "Timeout when getting consumer lock");
Err(anyhow::anyhow!("Too many concurrent requests, try later!"))
}
let Some(coin_id) = self.pop_gas_coin(uuid).await else {
warn!("Failed getting gas coin, try later!");
return GasCoinResponse::NoGasCoinAvailable;
};
if let Ok(mut consumer) = consumer_res {
info!(?uuid, "Got consumer lock, pulling coins.");
loop {
match tokio::time::timeout(
Duration::from_secs(WAIT_FOR_GAS_TIMEOUT_SEC),
consumer.recv(),
)
.await
{
Ok(Some(coin)) => {
info!(?uuid, "Pulling coin from pool {:?}", coin);
self.metrics.total_available_coins.dec();
match self.get_gas_coin(coin).await {
Ok(Some(gas_coin))
if gas_coin.value() >= total_amount + PAY_SUI_GAS =>
{
info!(
?uuid,
"Planning to use coin from pool {:?}, current balance: {}",
coin,
gas_coin.value()
);
return GasCoinResponse::ValidGasCoin(coin);
}
Ok(Some(_)) => {
warn!(
?uuid,
"Coin {:?} does not have sufficient balance, removing from pool", coin
);
return GasCoinResponse::GasCoinWithInsufficientBalance(coin);
}
Ok(None) => {
// Invalid gas, meaning that the coin does not exist,
// or does not belong to the current active address.
self.metrics.total_discarded_coins.inc();
warn!(?uuid, "Coin {:?} is not valid, removing from pool", coin);
return GasCoinResponse::InvalidGasCoin(coin);
}
Err(e) => {
error!(
?uuid,
"Cannot read gas coin object {:?} from Fullnode with err: {:?}",
coin,
e
);
return GasCoinResponse::UnknownGasCoin(coin);
}
}
}
Ok(None) => {
unreachable!("channel is closed");
}
Err(_) => {
error!(?uuid, "Timeout when getting gas coin from the queue");
break;
}
}

match self.get_gas_coin(coin_id).await {
Ok(Some(gas_coin)) if gas_coin.value() >= total_amount + PAY_SUI_GAS => {
info!(?uuid, ?coin_id, "balance: {}", gas_coin.value());
GasCoinResponse::ValidGasCoin(coin_id)
}

Ok(Some(_)) => GasCoinResponse::GasCoinWithInsufficientBalance(coin_id),

Ok(None) => GasCoinResponse::InvalidGasCoin(coin_id),

Err(e) => {
error!(?uuid, ?coin_id, "Fullnode read error: {e:?}");
GasCoinResponse::UnknownGasCoin(coin_id)
}
}
warn!("Failed getting gas coin, try later!");
GasCoinResponse::NoGasCoinAvailable
}

/// Check if the gas coin is still valid. A valid gas coin is
/// 1. existent presently
/// 2. is a GasCoin
/// 3. still belongs to facuet account
/// Check if the gas coin is still valid. A valid gas coin
/// 1. Exists presently
/// 2. Belongs to the faucet account
/// 3. is a GasCoin
/// If the coin is valid, return Ok(Some(GasCoin))
/// If the coin invalid, return Ok(None)
/// If the fullnode returns an unexpected error, returns Err(e)
async fn get_gas_coin(&self, coin_id: ObjectID) -> anyhow::Result<Option<GasCoin>> {
let client = self.wallet.get_client().await?;
let gas_obj = client.read_api().get_parsed_object(coin_id).await?;
Ok(match gas_obj {
SuiObjectRead::NotExists(_) | SuiObjectRead::Deleted(_) => None,
SuiObjectRead::Exists(obj) => match &obj.owner {
Owner::AddressOwner(owner_addr) => {
if owner_addr == &self.active_address {
GasCoin::try_from(&obj).ok()
} else {
None
}
Owner::AddressOwner(owner_addr) if owner_addr == &self.active_address => {
GasCoin::try_from(&obj).ok()
}
_ => None,
},
Expand All @@ -212,50 +178,57 @@ impl SimpleFaucet {
let gas_coin_response = self.prepare_gas_coin(total_amount, uuid).await;

match gas_coin_response {
GasCoinResponse::ValidGasCoin(gas_coin) => {
GasCoinResponse::ValidGasCoin(coin_id) => {
let result = self
.execute_pay_sui_txn_with_retrials(
gas_coin,
coin_id,
self.active_address,
recipient,
amounts,
DEFAULT_GAS_BUDGET,
uuid,
)
.await;
self.recycle_gas_coin(gas_coin, uuid).await;
self.recycle_gas_coin(coin_id, uuid).await;
self.check_and_map_transfer_gas_result(result, number_of_coins, &recipient)
.await
}
GasCoinResponse::UnknownGasCoin(gas_coin) => {
self.recycle_gas_coin(gas_coin, uuid).await;

GasCoinResponse::UnknownGasCoin(coin_id) => {
self.recycle_gas_coin(coin_id, uuid).await;
Err(FaucetError::FullnodeReadingError)
}
GasCoinResponse::GasCoinWithInsufficientBalance(gas_coin) => Err(
FaucetError::GasCoinWithInsufficientBalance(gas_coin.to_hex_literal()),
),
GasCoinResponse::InvalidGasCoin(gas_coin) => {
Err(FaucetError::InvalidGasCoin(gas_coin.to_hex_literal()))

GasCoinResponse::GasCoinWithInsufficientBalance(coin_id) => {
warn!(?uuid, ?coin_id, "Insufficient balance, removing from pool");
self.metrics.total_discarded_coins.inc();
Err(FaucetError::GasCoinWithInsufficientBalance(
coin_id.to_hex_literal(),
))
}

GasCoinResponse::InvalidGasCoin(coin_id) => {
// The coin does not exist, or does not belong to the current active address.
warn!(?uuid, ?coin_id, "Invalid, removing from pool");
self.metrics.total_discarded_coins.inc();
Err(FaucetError::InvalidGasCoin(coin_id.to_hex_literal()))
}

GasCoinResponse::NoGasCoinAvailable => Err(FaucetError::NoGasCoinAvailable),
}
}

async fn recycle_gas_coin(&self, gas_coin: ObjectID, uuid: Uuid) {
async fn recycle_gas_coin(&self, coin_id: ObjectID, uuid: Uuid) {
// Once transactions are done, in despite of success or failure,
// we put back the coins. The producer should never wait indefinitely,
// in that the channel is initialized with big enough capacity.
let producer = self.producer.lock().await;
info!(
?uuid,
"Got producer lock and recycling gas coin {:?}.", gas_coin
);
info!(?uuid, ?coin_id, "Got producer lock and recycling coin");
producer
.try_send(gas_coin)
.try_send(coin_id)
.expect("unexpected - queue is large enough to hold all coins");
self.metrics.total_available_coins.inc();
info!(?uuid, "Recycled coin {:?}", gas_coin);
drop(producer);
info!(?uuid, ?coin_id, "Recycled coin");
}

async fn execute_pay_sui_txn_with_retrials(
Expand Down
8 changes: 8 additions & 0 deletions crates/sui-faucet/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub struct RequestMetrics {
pub(crate) total_requests_succeeded: IntCounter,
pub(crate) total_requests_shed: IntCounter,
pub(crate) total_requests_failed: IntCounter,
pub(crate) total_requests_disconnected: IntCounter,
pub(crate) current_requests_in_flight: IntGauge,
pub(crate) process_latency: Histogram,
}
Expand Down Expand Up @@ -59,6 +60,13 @@ impl RequestMetrics {
registry,
)
.unwrap(),
total_requests_disconnected: register_int_counter_with_registry!(
"total_requests_disconnected",
"Total number of requests where the client disconnected before the service \
returned a response",
registry,
)
.unwrap(),
current_requests_in_flight: register_int_gauge_with_registry!(
"current_requests_in_flight",
"Current number of requests being processed in Faucet",
Expand Down
85 changes: 56 additions & 29 deletions crates/sui-faucet/src/metrics_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ use std::{
};

use futures::Future;
use prometheus::Registry;
use tower::{BoxError, Layer, Service, ServiceExt};
use prometheus::{HistogramTimer, Registry};
use tower::{load_shed::error::Overloaded, BoxError, Layer, Service, ServiceExt};
use tracing::info;

use crate::metrics::RequestMetrics;
Expand All @@ -30,6 +30,11 @@ pub struct RequestMetricsFuture<Res> {
future: Pin<Box<dyn Future<Output = Result<Res, BoxError>> + Send>>,
}

struct MetricsGuard {
timer: Option<HistogramTimer>,
metrics: Arc<RequestMetrics>,
}

impl RequestMetricsLayer {
pub fn new(registry: &Registry) -> Self {
Self {
Expand All @@ -48,9 +53,9 @@ impl<Inner> Layer<Inner> for RequestMetricsLayer {
}
}

impl<Inner, Req> Service<Req> for RequestMetricsService<Inner>
impl<Inner, Req, Body> Service<Req> for RequestMetricsService<Inner>
where
Inner: Service<Req, Error = BoxError> + Clone + Send + 'static,
Inner: Service<Req, Response = http::Response<Body>, Error = BoxError> + Clone + Send + 'static,
Inner::Future: Send,
Req: Send + 'static,
{
Expand All @@ -63,36 +68,16 @@ where
}

fn call(&mut self, req: Req) -> Self::Future {
let metrics = self.metrics.clone();
let metrics = MetricsGuard::new(self.metrics.clone());
let inner = self.inner.clone();

let future = Box::pin(async move {
metrics.total_requests_received.inc();
metrics.current_requests_in_flight.inc();
let _metrics_guard = scopeguard::guard(metrics.clone(), |metrics| {
metrics.current_requests_in_flight.dec();
});

let timer = metrics.process_latency.start_timer();
let resp = inner.oneshot(req).await;

let elapsed = timer.stop_and_record();

match &resp {
Result::Ok(_) => {
metrics.total_requests_succeeded.inc();
info!("Request succeeded in {:.2}s", elapsed);
}

Result::Err(err) => {
if err.is::<tower::load_shed::error::Overloaded>() {
metrics.total_requests_shed.inc();
info!("Request shed in {:.2}s", elapsed);
} else {
metrics.total_requests_failed.inc();
info!("Request failed in {:.2}s", elapsed);
}
}
Result::Ok(resp) if !resp.status().is_success() => metrics.failed(),
Result::Ok(_) => metrics.succeeded(),
Result::Err(err) if err.is::<Overloaded>() => metrics.shed(),
Result::Err(_) => metrics.failed(),
}

resp
Expand All @@ -108,3 +93,45 @@ impl<Res> Future for RequestMetricsFuture<Res> {
Future::poll(self.future.as_mut(), ctx)
}
}

impl MetricsGuard {
fn new(metrics: Arc<RequestMetrics>) -> Self {
metrics.total_requests_received.inc();
metrics.current_requests_in_flight.inc();
MetricsGuard {
timer: Some(metrics.process_latency.start_timer()),
metrics,
}
}

fn succeeded(mut self) {
let elapsed = self.timer.take().unwrap().stop_and_record();
self.metrics.total_requests_succeeded.inc();
info!("Request succeeded in {:.2}s", elapsed);
}

fn failed(mut self) {
let elapsed = self.timer.take().unwrap().stop_and_record();
self.metrics.total_requests_failed.inc();
info!("Request failed in {:.2}s", elapsed);
}

fn shed(mut self) {
let elapsed = self.timer.take().unwrap().stop_and_record();
self.metrics.total_requests_shed.inc();
info!("Request shed in {:.2}s", elapsed);
}
}

impl Drop for MetricsGuard {
fn drop(&mut self) {
self.metrics.current_requests_in_flight.dec();

// Request was still in flight when the guard was dropped, implying the client disconnected.
if let Some(timer) = self.timer.take() {
let elapsed = timer.stop_and_record();
self.metrics.total_requests_disconnected.inc();
info!("Request disconnected in {:.2}s", elapsed);
}
}
}

0 comments on commit e0a5e0b

Please sign in to comment.