Skip to content

Commit

Permalink
proper signal handling
Browse files Browse the repository at this point in the history
  • Loading branch information
gluk64 committed May 8, 2019
1 parent 095f7ee commit ae6cf05
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 28 deletions.
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,6 @@ storage = { path = "src/storage" }
prover = { path = "src/prover" }
server = { path = "src/server" }

signal-hook = "0.1.8"
signal-hook = { version = "0.1.8", features = ["tokio-support"] }
tokio = "0.1.18"
futures = "0.1.25"
31 changes: 19 additions & 12 deletions src/bin/prover.rs
Original file line number Diff line number Diff line change
@@ -1,29 +1,36 @@
extern crate storage;
extern crate prover;
extern crate signal_hook;
extern crate tokio;
extern crate futures;

use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread;
use std::time::Duration;
use std::sync::{Arc, atomic::{AtomicBool}};
use std::env;

use prover::start_prover;
use prover::run_prover;
use signal_hook::iterator::Signals;
use futures::Stream;

fn main() {

let mut rt = tokio::runtime::Runtime::new().unwrap();

// handle ctrl+c
let stop_signal = Arc::new(AtomicBool::new(false));
signal_hook::flag::register(signal_hook::SIGTERM, Arc::clone(&stop_signal)).expect("Error setting SIGTERM handler");
signal_hook::flag::register(signal_hook::SIGINT, Arc::clone(&stop_signal)).expect("Error setting SIGINT handler");
signal_hook::flag::register(signal_hook::SIGQUIT, Arc::clone(&stop_signal)).expect("Error setting SIGQUIT handler");

let args: Vec<String> = env::args().collect();
start_prover(args.get(1).unwrap_or(&"default".to_string()).clone());
rt.spawn(
Signals::new(&[signal_hook::SIGTERM, signal_hook::SIGINT, signal_hook::SIGQUIT])
.unwrap()
.into_async()
.unwrap()
.map_err(|_|())
.for_each(|_| {println!("termination signal received"); Ok(())})
);

while !stop_signal.load(Ordering::SeqCst) {
thread::sleep(Duration::from_secs(1));
}
run_prover(stop_signal, env::var("POD_NAME").unwrap_or("default".to_string()).clone());

println!("terminate signal received");
rt.shutdown_now();
println!("prover terminated");
}
1 change: 1 addition & 0 deletions src/prover/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,4 @@ rust-crypto = "0.2"
rustc-hex = "2.0.1"

signal-hook = "0.1.8"
tokio = "0.1.18"
39 changes: 24 additions & 15 deletions src/prover/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ extern crate rustc_hex;
extern crate plasma;
extern crate models;
extern crate storage;
extern crate tokio;

extern crate rand;
extern crate crypto;
Expand All @@ -14,6 +15,10 @@ use std::fmt;
use std::thread;
use std::time::Duration;
use rand::{OsRng};
use std::sync::{Arc, atomic::{AtomicBool, Ordering}};

use tokio::prelude::*;
use tokio::timer;

use crypto::sha2::Sha256;
use crypto::digest::Digest;
Expand Down Expand Up @@ -980,6 +985,15 @@ impl BabyProver {
let storage = StorageProcessor::establish_connection().map_err(|e| format!("establish_connection failed: {}", e))?;
let job = storage.fetch_prover_job(worker, PROVER_TIMEOUT).map_err(|e| format!("fetch_prover_job failed: {}", e))?;

// TODO: start interval
let _ping = timer::Interval::new_interval(Duration::from_millis(1000))
.map(|_| {
println!(".");
if let Ok(_storage) = StorageProcessor::establish_connection() {

}
});

if let Some(block_number) = job {
println!("prover {} got a new job for block {}", worker, block_number);

Expand Down Expand Up @@ -1007,23 +1021,18 @@ impl BabyProver {
}
Ok(())
}

fn run(&mut self, worker: &String) {
loop {
if let Err(err) = self.make_proving_attempt(worker) {
eprint!("Error: {}", err);
}
}
}
}

pub fn start_prover(worker: String) {
thread::Builder::new().name(worker.clone()).spawn(move || {
println!("prover worker: {}", worker);
let mut prover = BabyProver::create().unwrap();
println!("prover started");
prover.run(&worker)
}).expect("prover thread must start");
pub fn run_prover(stop_signal: Arc<AtomicBool>, worker: String) {
println!("creating prover, worker: {}", worker);
let mut prover = BabyProver::create().unwrap();
println!("prover started");

while !stop_signal.load(Ordering::SeqCst) {
if let Err(err) = prover.make_proving_attempt(&worker) {
eprint!("Error: {}", err);
}
}
}

// #[test]
Expand Down

0 comments on commit ae6cf05

Please sign in to comment.