Skip to content
This repository has been archived by the owner on Feb 15, 2021. It is now read-only.

Commit

Permalink
Retry only if prover is not terminating
Browse files Browse the repository at this point in the history
  • Loading branch information
jazzandrock committed Feb 17, 2020
1 parent 18f468b commit edace7e
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 54 deletions.
73 changes: 46 additions & 27 deletions core/prover/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
// Built-in deps
use std::str::FromStr;
use std::sync::{atomic::AtomicBool, atomic::Ordering, Arc};
use std::time;
// External deps
use backoff;
use backoff::Operation;
use crypto_exports::franklin_crypto::bellman::groth16;
use failure::bail;
use failure::format_err;
use log::info;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -36,7 +38,7 @@ pub struct PublishReq {
pub proof: models::EncodedProof,
}

#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct ApiClient {
register_url: String,
block_to_prove_url: String,
Expand All @@ -46,10 +48,11 @@ pub struct ApiClient {
stopped_url: String,
worker: String,
req_server_timeout: time::Duration,
is_terminating_bool: Option<Arc<AtomicBool>>,
}

impl ApiClient {
pub fn new(base_url: &str, worker: &str) -> Self {
pub fn new(base_url: &str, worker: &str, is_terminating_bool: Option<Arc<AtomicBool>>) -> Self {
let config_opts = ConfigurationOptions::from_env();
if worker == "" {
panic!("worker name cannot be empty")
Expand All @@ -63,19 +66,46 @@ impl ApiClient {
stopped_url: format!("{}/stopped", base_url),
worker: worker.to_string(),
req_server_timeout: config_opts.req_server_timeout,
is_terminating_bool,
}
}

fn is_terminating(&self) -> bool {
self.is_terminating_bool
.as_ref()
.map(|b| b.load(Ordering::SeqCst))
.unwrap_or(false)
}

fn with_retries<T>(
&self,
op: &dyn Fn() -> Result<T, failure::Error>,
) -> Result<T, failure::Error> {
let mut with_checking = || -> Result<T, backoff::Error<failure::Error>> {
if self.is_terminating() {
op().map_err(backoff::Error::Permanent)
} else {
op().map_err(backoff::Error::Transient)
}
};

with_checking
.retry(&mut Self::get_backoff())
.map_err(|e| match e {
backoff::Error::Permanent(e) | backoff::Error::Transient(e) => e,
})
}

fn get_backoff() -> backoff::ExponentialBackoff {
let mut backoff = backoff::ExponentialBackoff::default();
backoff.initial_interval = time::Duration::from_secs(2);
backoff.multiplier = 2.0;
backoff.initial_interval = time::Duration::from_secs(6);
backoff.multiplier = 1.2;
// backoff.max_elapsed_time = Some(time::Duration::from_secs(30));
backoff
}

pub fn register_prover(&self) -> Result<i32, failure::Error> {
let mut op = || -> Result<i32, backoff::Error<failure::Error>> {
let op = || -> Result<i32, failure::Error> {
info!("Registering prover...");
let client = self.get_client()?;
let res = client
Expand All @@ -94,8 +124,7 @@ impl ApiClient {
.map_err(|e| format_err!("failed to parse register prover id: {}", e))?)
};

op.retry(&mut Self::get_backoff())
.map_err(|e| format_err!("Timeout: {}", e))
Ok(self.with_retries(&op)?)
}

pub fn prover_stopped(&self, prover_run_id: i32) -> Result<(), failure::Error> {
Expand All @@ -118,7 +147,7 @@ impl ApiClient {

impl crate::ApiClient for ApiClient {
fn block_to_prove(&self) -> Result<Option<(i64, i32)>, failure::Error> {
let mut op = || -> Result<Option<(i64, i32)>, backoff::Error<failure::Error>> {
let op = || -> Result<Option<(i64, i32)>, failure::Error> {
let client = self.get_client()?;
let mut res = client
.get(&self.block_to_prove_url)
Expand All @@ -138,12 +167,11 @@ impl crate::ApiClient for ApiClient {
Ok(None)
};

op.retry(&mut Self::get_backoff())
.map_err(|e| format_err!("Timeout: {}", e))
Ok(self.with_retries(&op)?)
}

fn working_on(&self, job_id: i32) -> Result<(), failure::Error> {
let mut op = || -> Result<(), backoff::Error<failure::Error>> {
let op = || -> Result<(), failure::Error> {
let client = self.get_client()?;
let res = client
.post(&self.working_on_url)
Expand All @@ -153,22 +181,18 @@ impl crate::ApiClient for ApiClient {
.send()
.map_err(|e| format_err!("failed to send working on request: {}", e))?;
if res.status() != reqwest::StatusCode::OK {
Err(backoff::Error::Transient(format_err!(
"working on request failed with status: {}",
res.status()
)))
bail!("working on request failed with status: {}", res.status())
} else {
Ok(())
}
};

op.retry(&mut Self::get_backoff())
.map_err(|e| format_err!("Timeout: {}", e))
Ok(self.with_retries(&op)?)
}

fn prover_data(&self, block: i64) -> Result<ProverData, failure::Error> {
let client = self.get_client()?;
let mut op = || -> Result<ProverData, backoff::Error<failure::Error>> {
let op = || -> Result<ProverData, failure::Error> {
let mut res = client
.get(&self.prover_data_url)
.json(&block)
Expand All @@ -182,16 +206,15 @@ impl crate::ApiClient for ApiClient {
Ok(res.ok_or_else(|| format_err!("couldn't get ProverData"))?)
};

op.retry(&mut Self::get_backoff())
.map_err(|e| format_err!("Timeout: {}", e))
Ok(self.with_retries(&op)?)
}

fn publish(
&self,
block: i64,
proof: groth16::Proof<models::node::Engine>,
) -> Result<(), failure::Error> {
let mut op = || -> Result<(), backoff::Error<failure::Error>> {
let op = || -> Result<(), failure::Error> {
let encoded = encode_proof(&proof);

let client = self.get_client()?;
Expand All @@ -204,16 +227,12 @@ impl crate::ApiClient for ApiClient {
.send()
.map_err(|e| format_err!("failed to send publish request: {}", e))?;
if res.status() != reqwest::StatusCode::OK {
Err(backoff::Error::Transient(format_err!(
"publish request failed with status: {}",
res.status()
)))
bail!("publish request failed with status: {}", res.status());
} else {
Ok(())
}
};

op.retry(&mut Self::get_backoff())
.map_err(|e| format_err!("Timeout: {}", e))
Ok(self.with_retries(&op)?)
}
}
46 changes: 23 additions & 23 deletions core/prover/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,20 +32,20 @@ fn main() {

// Create client
let api_url = env::var("PROVER_SERVER_URL").expect("PROVER_SERVER_URL is missing");
let api_client = client::ApiClient::new(&api_url, &worker_name);
let api_client = client::ApiClient::new(&api_url, &worker_name, Some(stop_signal.clone()));
// Create prover
let jubjub_params = AltJubjubBn256::new();
let circuit_params = read_from_key_dir(key_dir);
let heartbeat_interval = time::Duration::from_secs(PROVER_HEARTBEAT_INTERVAL);
let worker = BabyProver::new(
circuit_params,
jubjub_params,
api_client,
api_client.clone(),
heartbeat_interval,
stop_signal,
);
// Register prover
let prover_id = client::ApiClient::new(&api_url, &worker_name)
let prover_id = api_client
.register_prover()
.expect("failed to register prover");
// Start prover
Expand All @@ -55,30 +55,30 @@ fn main() {
});

// Handle termination requests.
let prover_id_copy = prover_id;
let api_url_copy = api_url.clone();
let worker_name_copy = worker_name.clone();
thread::spawn(move || {
let signals = Signals::new(&[
signal_hook::SIGTERM,
signal_hook::SIGINT,
signal_hook::SIGQUIT,
])
.expect("Signals::new() failed");
for _ in signals.forever() {
info!(
"Termination signal received. Prover will finish the job and shut down gracefully"
);
client::ApiClient::new(&api_url_copy, &worker_name_copy)
.prover_stopped(prover_id_copy)
.expect("failed to send prover stop request");
}
});
{
let prover_id = prover_id;
let api_client = api_client.clone();
thread::spawn(move || {
let signals = Signals::new(&[
signal_hook::SIGTERM,
signal_hook::SIGINT,
signal_hook::SIGQUIT,
])
.expect("Signals::new() failed");
for _ in signals.forever() {
info!("Termination signal received. Prover will finish the job and shut down gracefully");
match api_client.prover_stopped(prover_id) {
Ok(_) => {}
Err(e) => error!("failed to send prover stop request: {}", e),
}
}
});
}

// Handle prover exit errors.
let err = exit_err_rx.recv();
error!("prover exited with error: {:?}", err);
client::ApiClient::new(&api_url, &worker_name)
api_client
.prover_stopped(prover_id)
.expect("failed to send prover stop request");
}
Expand Down
6 changes: 3 additions & 3 deletions core/server/tests/prover_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,14 @@ fn access_storage() -> storage::StorageProcessor {
#[test]
#[should_panic]
fn client_with_empty_worker_name_panics() {
client::ApiClient::new("", "");
client::ApiClient::new("", "", None);
}

#[test]
#[cfg_attr(not(feature = "db_test"), ignore)]
fn api_client_register_start_and_stop_of_prover() {
let addr = spawn_server(time::Duration::from_secs(1), time::Duration::from_secs(1));
let client = client::ApiClient::new(&format!("http://{}", &addr), "foo");
let client = client::ApiClient::new(&format!("http://{}", &addr), "foo", None);
let id = client.register_prover().expect("failed to register");
let storage = access_storage();
storage
Expand All @@ -61,7 +61,7 @@ fn api_client_simple_simulation() {

let addr = spawn_server(prover_timeout, rounds_interval);

let client = client::ApiClient::new(&format!("http://{}", &addr), "foo");
let client = client::ApiClient::new(&format!("http://{}", &addr), "foo", None);

// call block_to_prove and check its none
let to_prove = client
Expand Down
2 changes: 1 addition & 1 deletion etc/env/dev.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ TX_BATCH_SIZE=50

SERVER_API_HOST=localhost

REQ_SERVER_TIMEOUT=300
REQ_SERVER_TIMEOUT=10

REST_API_ADDR=http://localhost:3000
HTTP_RPC_API_ADDR=http://localhost:3030
Expand Down

0 comments on commit edace7e

Please sign in to comment.