Skip to content

Commit

Permalink
prover/rpc: change proof endpoint to support path to parameter (scrol…
Browse files Browse the repository at this point in the history
…l-tech#594)

* prover/rpc: support generating params w/ custom K on demand

* prover/rpc: change proof endpoint to support path to parameter

* prover/rpc: try to downcast error to String and str

* prover: reduce logging for INFO

* evm_circuit/DummyGadget: fix logging

* prover: rename vars
  • Loading branch information
pinkiebell authored Jul 14, 2022
1 parent 159a073 commit 23005fc
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 64 deletions.
36 changes: 5 additions & 31 deletions prover/src/bin/prover_rpcd.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
use env_logger::Env;
use halo2_proofs::pairing::bn256::G1Affine;
use halo2_proofs::poly::commitment::Params;
use hyper::body::Buf;
use hyper::body::HttpBody;
use hyper::header::HeaderValue;
use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Method, Request, Response, Server, StatusCode};
use std::env::var;
use std::fs::File;
use std::sync::Arc;

use prover::shared_state::SharedState;
use prover::structs::*;
Expand Down Expand Up @@ -150,20 +146,11 @@ async fn handle_method(
match method {
// enqueues a task for computating proof for any given block
"proof" => {
if params.len() != 3 {
return Err("expected [block_num, rpc_url, retry_if_error]".to_string());
}

let block_num = params[0].as_u64().ok_or("block number at params[0]")?;
let rpc_url = params[1].as_str().ok_or("rpc url at params[1]")?;
let retry_if_error = params[2]
.as_bool()
.ok_or("bool retry_if_error at params[2]")?;
let options = params.get(0).ok_or("expected struct ProofRequestOptions")?;
let options: ProofRequestOptions =
serde_json::from_value(options.to_owned()).map_err(|e| e.to_string())?;

match shared_state
.get_or_enqueue(&block_num, rpc_url, &retry_if_error)
.await
{
match shared_state.get_or_enqueue(&options).await {
// No error
None => Ok(serde_json::Value::Null),
Some(result) => {
Expand Down Expand Up @@ -246,22 +233,9 @@ async fn main() {
.build()
.unwrap();
let h2 = rt.spawn(async move {
// lazily load params
let params_path: String = var("PARAMS_PATH")
.expect("PARAMS_PATH env var")
.parse()
.expect("Cannot parse PARAMS_PATH env var");
// load polynomial commitment parameters
let params_fs = File::open(&params_path).expect("couldn't open params");
let params: Arc<Params<G1Affine>> = Arc::new(
Params::read::<_>(&mut std::io::BufReader::new(params_fs))
.expect("Failed to read params"),
);
log::info!("params: initialized");

let ctx = ctx.clone();
loop {
ctx.duty_cycle(params.clone()).await;
ctx.duty_cycle().await;
tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
}
});
Expand Down
101 changes: 69 additions & 32 deletions prover/src/shared_state.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,25 @@
use halo2_proofs::pairing::bn256::G1Affine;
use halo2_proofs::poly::commitment::Params;

use std::collections::HashMap;
use std::fs::File;
use std::sync::Arc;

use tokio::sync::Mutex;

use crate::compute_proof::compute_proof;
use crate::structs::Proofs;
use crate::structs::{ProofRequestOptions, Proofs};

#[derive(Debug, Clone)]
pub struct ProofRequest {
pub block_num: u64,
pub rpc_url: String,
pub options: ProofRequestOptions,
pub result: Option<Result<Proofs, String>>,
}

pub struct RwState {
pub tasks: Vec<ProofRequest>,
pub pending_tasks: u32,
pub params_cache: HashMap<String, Arc<Params<G1Affine>>>,
}

#[derive(Clone)]
Expand All @@ -29,6 +33,7 @@ impl SharedState {
rw: Arc::new(Mutex::new(RwState {
tasks: Vec::new(),
pending_tasks: 0,
params_cache: HashMap::new(),
})),
}
}
Expand All @@ -39,42 +44,36 @@ impl SharedState {
/// before.
pub async fn get_or_enqueue(
&self,
block_num: &u64,
rpc_url: &str,
retry_if_error: &bool,
options: &ProofRequestOptions,
) -> Option<Result<Proofs, String>> {
let mut rw = self.rw.lock().await;

// task already pending or completed?
let task = rw
.tasks
.iter_mut()
.find(|e| e.block_num == *block_num && e.rpc_url == *rpc_url);
let task = rw.tasks.iter_mut().find(|e| e.options == *options);

if task.is_some() {
let mut task = task.unwrap();

if task.result.is_some() {
if *retry_if_error && task.result.as_ref().unwrap().is_err() {
log::info!("retrying: {:#?}", task);
if options.retry && task.result.as_ref().unwrap().is_err() {
log::debug!("retrying: {:#?}", task);
// will be a candidate in `duty_cycle` again
task.result = None;
} else {
log::info!("completed: {:#?}", task);
log::debug!("completed: {:#?}", task);
return task.result.clone();
}
} else {
log::info!("pending: {:#?}", task);
log::debug!("pending: {:#?}", task);
return None;
}
} else {
// enqueue the task
let task = ProofRequest {
block_num: *block_num,
rpc_url: rpc_url.to_string(),
options: options.clone(),
result: None,
};
log::info!("enqueue: {:#?}", task);
log::debug!("enqueue: {:#?}", task);
rw.tasks.push(task);
}

Expand All @@ -85,7 +84,7 @@ impl SharedState {
/// - records if a task completed
/// - starting a new task
/// Blocks until completion but releases the lock of `self.rw` in between.
pub async fn duty_cycle(&self, params: Arc<Params<G1Affine>>) {
pub async fn duty_cycle(&self) {
let mut rw = self.rw.lock().await;

if rw.pending_tasks > 0 {
Expand All @@ -103,7 +102,7 @@ impl SharedState {

// needs to be cloned because of long running tasks and
// the possibility that the task gets removed in the meantime
let pending_task = pending_task.unwrap().clone();
let mut pending_task = pending_task.unwrap().clone();
{
rw.pending_tasks += 1;
log::info!("compute_proof: {:#?}", pending_task);
Expand All @@ -115,13 +114,16 @@ impl SharedState {
// This could be avoided by spawning a subprocess for the proof computation
// instead.

let _pending_task = pending_task.clone();
let pending_task_copy = pending_task.clone();
let self_copy = self.clone();
let task_result: Result<Result<Proofs, String>, tokio::task::JoinError> =
tokio::spawn(async move {
// lazily load the file and cache it
let param = self_copy.load_param(&pending_task_copy.options.param).await;
let res = compute_proof(
params.as_ref(),
&_pending_task.block_num,
&_pending_task.rpc_url,
param.as_ref(),
&pending_task_copy.options.block,
&pending_task_copy.options.rpc,
)
.await;

Expand All @@ -136,7 +138,20 @@ impl SharedState {

// convert the JoinError to string - if applicable
let task_result: Result<Proofs, String> = match task_result {
Err(err) => Err(err.to_string()),
Err(err) => match err.is_panic() {
true => {
let panic = err.into_panic();

if let Some(msg) = panic.downcast_ref::<&str>() {
Err(msg.to_string())
} else if let Some(msg) = panic.downcast_ref::<String>() {
Err(msg.to_string())
} else {
Err("unknown panic".to_string())
}
}
false => Err(err.to_string()),
},
Ok(val) => val,
};

Expand All @@ -147,22 +162,44 @@ impl SharedState {
let mut rw = self.rw.lock().await;
rw.pending_tasks -= 1;

let task = rw.tasks.iter_mut().find(|e| {
e.block_num == pending_task.block_num && e.rpc_url == pending_task.rpc_url
});
let task = rw
.tasks
.iter_mut()
.find(|e| e.options == pending_task.options);
if let Some(task) = task {
// found our task, update result
task.result = Some(task_result);
} else {
// task was already removed in the meantime, insert it again
rw.tasks.push(ProofRequest {
block_num: pending_task.block_num,
rpc_url: pending_task.rpc_url,
result: Some(task_result),
});
pending_task.result = Some(task_result);
rw.tasks.push(pending_task);
}
}
}

async fn load_param(&self, params_path: &str) -> Arc<Params<G1Affine>> {
let mut rw = self.rw.lock().await;

if !rw.params_cache.contains_key(params_path) {
// drop, potentially long running
drop(rw);

// load polynomial commitment parameters
let params_fs = File::open(params_path).expect("couldn't open params");
let params: Arc<Params<G1Affine>> = Arc::new(
Params::read::<_>(&mut std::io::BufReader::new(params_fs))
.expect("Failed to read params"),
);

// acquire lock and update
rw = self.rw.lock().await;
rw.params_cache.insert(params_path.to_string(), params);

log::info!("params: initialized {}", params_path);
}

rw.params_cache.get(params_path).unwrap().clone()
}
}

impl Default for SharedState {
Expand Down
18 changes: 18 additions & 0 deletions prover/src/structs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,21 @@ pub struct JsonRpcRequest<T: serde::Serialize> {
pub method: String,
pub params: T,
}

#[derive(Debug, Default, Clone, serde::Deserialize)]
pub struct ProofRequestOptions {
/// the block number
pub block: u64,
/// the rpc url
pub rpc: String,
/// retry proof computation if error
pub retry: bool,
/// parameter file to use
pub param: String,
}

impl PartialEq for ProofRequestOptions {
fn eq(&self, other: &Self) -> bool {
self.block == other.block && self.rpc == other.rpc && self.param == other.param
}
}
2 changes: 1 addition & 1 deletion zkevm-circuits/src/evm_circuit/witness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1205,7 +1205,7 @@ impl From<&circuit_input_builder::ExecStep> for ExecutionState {

macro_rules! dummy {
($name:expr) => {{
log::warn!("$name is implemented with DummyGadget");
log::warn!("{:?} is implemented with DummyGadget", $name);
$name
}};
}
Expand Down

0 comments on commit 23005fc

Please sign in to comment.