Skip to content

Commit

Permalink
make generator and prover work with new futures
Browse files Browse the repository at this point in the history
  • Loading branch information
shamatar committed Nov 27, 2019
1 parent 3b439a9 commit 885e32e
Show file tree
Hide file tree
Showing 9 changed files with 197 additions and 17 deletions.
11 changes: 6 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,16 @@ crate-type = ["cdylib", "lib", "staticlib"]

[dependencies]
rand = "0.4"
bit-vec = "0.4.4"
bit-vec = "0.6"
futures = "0.1"
cfg-if = "0.1.7"
cfg-if = "0.1"

#pairing = {package = "pairing_ce", path = "../pairing" }
pairing = {package = "pairing_ce", version = "0.19" }
byteorder = "1"

futures-cpupool = {version = "0.1", optional = true}
futures_new = {package = "futures", version = "0.3", default_features = false}
futures_new = {package = "futures", version = "0.3", default_features = false, features = ["executor"]}
num_cpus = "1"
crossbeam = {version = "0.7", optional = true}

Expand All @@ -35,10 +35,11 @@ tiny-keccak = {version = "1.4.2", optional = true}
blake2-rfc = {version = "0.2.18", optional = true}

[features]
default = ["multicore"]
default = []
# default = ["multicore"]
#default = ["multicore", "nightly"]
#default = ["wasm"]
multicore = ["futures-cpupool", "crossbeam", "futures_new/executor", "futures_new/thread-pool"]
multicore = ["futures-cpupool", "crossbeam", "futures_new/thread-pool"]
sonic = ["tiny-keccak", "blake2-rfc"]
gm17 = []
wasm = ["web-sys"]
Expand Down
2 changes: 1 addition & 1 deletion src/domain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use super::{
SynthesisError
};

use super::worker::Worker;
use super::worker_new::Worker;
pub use super::group::*;

pub struct EvaluationDomain<E: Engine, G: Group<E>> {
Expand Down
2 changes: 1 addition & 1 deletion src/groth16/generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use crate::domain::{
Scalar
};

use crate::worker::{
use crate::worker_new::{
Worker
};

Expand Down
8 changes: 4 additions & 4 deletions src/groth16/prover.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ use rand::Rng;

use std::sync::Arc;

use futures::Future;

use crate::pairing::{
Engine,
CurveProjective,
Expand Down Expand Up @@ -41,9 +39,11 @@ use crate::source::{
FullDensity
};

use crate::multiexp::*;
// use crate::multiexp::*;

use crate::multexp_new::*;

use crate::worker::{
use crate::worker_new::{
Worker
};

Expand Down
9 changes: 8 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,25 @@ cfg_if! {
compile_error!("Multicore feature is not yet compatible with wasm target arch");

mod multicore;
mod multicore_new;
mod worker {
pub use crate::multicore::*;
}
mod worker_new {
pub use crate::multicore_new::*;
}
} else {
mod singlecore;
mod singlecore_new;
mod worker {
pub use crate::singlecore::*;
}
mod worker_new {
pub use crate::singlecore_new::*;
}
}
}

mod worker_new;
mod multexp_new;

mod cs;
Expand Down
9 changes: 8 additions & 1 deletion src/multexp_new.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::pin::{Pin};
extern crate futures_new;

use self::futures_new::future::{join_all, JoinAll};
use self::futures_new::executor::block_on;

use super::worker_new::{Worker, WorkerFuture};

Expand Down Expand Up @@ -407,6 +408,12 @@ impl<G: CurveProjective> Future for ChunksJoiner<G> {
}
}

impl<G: CurveProjective> ChunksJoiner<G> {
pub fn wait(self) -> <Self as Future>::Output {
block_on(self)
}
}

fn join_chunks<G: CurveProjective>
(chunks: Vec<Result<G, SynthesisError>>, c: u32) -> Result<G, SynthesisError> {
if chunks.len() == 0 {
Expand Down Expand Up @@ -702,7 +709,7 @@ fn test_bench_new_sparse_multiexp() {
let start = std::time::Instant::now();

{
use crate::multicore::Future;
use crate::worker::Future;
let _sparse = multiexp_old(
&pool,
(g.clone(), 0),
Expand Down
23 changes: 20 additions & 3 deletions src/worker_new.rs → src/multicore_new.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,24 +13,27 @@ use std::task::{Context, Poll};
use std::pin::{Pin};
use self::futures_new::future::{lazy};
use self::futures_new::channel::oneshot::{channel, Sender, Receiver};
use self::futures_new::executor::{ThreadPool};
// use self::futures_cpupool::{CpuPool, CpuFuture};
use self::crossbeam::thread::{Scope};
use self::futures_new::executor::block_on;

use self::futures_new::executor::{ThreadPool};

#[derive(Clone)]
pub struct Worker {
cpus: usize,
pool: ThreadPool
}


impl Worker {
// We don't expose this outside the library so that
// all `Worker` instances have the same number of
// CPUs configured.

pub(crate) fn new_with_cpus(cpus: usize) -> Worker {
Worker {
cpus: cpus,
pool: ThreadPool::builder().pool_size(cpus).create().expect("should create a thread pool for futures execution")
pool: ThreadPool::builder().pool_size(cpus).create().expect("should create a thread pool for futures execution"),
}
}

Expand Down Expand Up @@ -111,6 +114,12 @@ impl<T: Send + 'static, E: Send + 'static> Future for WorkerFuture<T, E> {
}
}

impl<T: Send + 'static, E: Send + 'static> WorkerFuture<T, E> {
pub fn wait(self) -> <Self as Future>::Output {
block_on(self)
}
}

fn log2_floor(num: usize) -> u32 {
assert!(num > 0);

Expand Down Expand Up @@ -146,6 +155,8 @@ fn test_trivial_spawning() {
i = i.wrapping_mul(42);
}

println!("Done calculating long task");

Ok(i)
}

Expand All @@ -154,5 +165,11 @@ fn test_trivial_spawning() {
let fut = worker.compute(|| long_fn());
println!("Done spawning");

println!("Will sleep now");

std::thread::sleep(std::time::Duration::from_millis(10000));

println!("Done sleeping");

let _ = block_on(fut);
}
2 changes: 1 addition & 1 deletion src/singlecore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ extern crate futures;

use std::marker::PhantomData;

use self::futures::{Future, IntoFuture, Poll};
pub(crate) use self::futures::{Future, IntoFuture, Poll};
use self::futures::future::{result, FutureResult};

#[derive(Clone)]
Expand Down
148 changes: 148 additions & 0 deletions src/singlecore_new.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
//! This is a dummy interface to substitute multicore worker
//! in environments like WASM
extern crate futures_new;

use std::marker::PhantomData;

use std::future::{Future};
use std::task::{Context, Poll};
use std::pin::{Pin};

use self::futures_new::channel::oneshot::{channel, Sender, Receiver};
use self::futures_new::executor::block_on;

#[derive(Clone)]
pub struct Worker {
cpus: usize,
}

impl Worker {
// We don't expose this outside the library so that
// all `Worker` instances have the same number of
// CPUs configured.
pub(crate) fn new_with_cpus(_cpus: usize) -> Worker {
Worker {
cpus: 1,
}
}

pub fn new() -> Worker {
Self::new_with_cpus(1)
}

pub fn log_num_cpus(&self) -> u32 {
0u32
}

pub fn compute<F, T, E>(
&self, f: F
) -> WorkerFuture<T, E>
where F: FnOnce() -> Result<T, E> + Send + 'static,
T: Send + 'static,
E: Send + 'static
{
let result = f();

let (sender, receiver) = channel();
let _ = sender.send(result);

let worker_future = WorkerFuture {
receiver
};

worker_future
}

pub fn scope<'a, F, R>(
&self,
elements: usize,
f: F
) -> R
where F: FnOnce(&Scope<'a>, usize) -> R
{
let chunk_size = if elements == 0 { 1 } else { elements };

let scope = Scope{
_marker: PhantomData
};

f(&scope, chunk_size)
}
}
#[derive(Clone)]
pub struct Scope<'a> {
_marker: PhantomData<& 'a usize>
}

impl<'a> Scope<'a> {
pub fn spawn<F, R>(
&self,
f: F
) -> R
where F: FnOnce(&Scope<'a>) -> R
{
f(&self)
}
}

pub struct WorkerFuture<T, E> {
receiver: Receiver<Result<T, E>>
}

impl<T: Send + 'static, E: Send + 'static> Future for WorkerFuture<T, E> {
type Output = Result<T, E>;

fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output>
{
let rec = unsafe { self.map_unchecked_mut(|s| &mut s.receiver) };
match rec.poll(cx) {
Poll::Ready(v) => {
if let Ok(v) = v {
return Poll::Ready(v)
} else {
panic!("Worker future can not have canceled sender");
}
},
Poll::Pending => {
return Poll::Pending;
}
}
}
}

impl<T: Send + 'static, E: Send + 'static> WorkerFuture<T, E> {
pub fn wait(self) -> <Self as Future>::Output {
block_on(self)
}
}


#[test]
fn test_trivial_singlecore_spawning() {
use self::futures_new::executor::block_on;

fn long_fn() -> Result<usize, ()> {
let mut i: usize = 1;
println!("Start calculating long task");
for _ in 0..1000000 {
i = i.wrapping_mul(42);
}

println!("Done calculating long task");

Ok(i)
}

let worker = Worker::new();
println!("Spawning");
let fut = worker.compute(|| long_fn());
println!("Done spawning");

println!("Will sleep now");

std::thread::sleep(std::time::Duration::from_millis(10000));

println!("Done sleeping");

let _ = block_on(fut);
}

0 comments on commit 885e32e

Please sign in to comment.