Skip to content

Commit

Permalink
automatically dropping executors done(#26)
Browse files Browse the repository at this point in the history
  • Loading branch information
rajasekarv committed Nov 4, 2019
1 parent 80b60ad commit 9c8a0d3
Show file tree
Hide file tree
Showing 18 changed files with 52 additions and 53 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ You can specify the local IP address using the environmental variable `SPARK_LOC

## ToDo:

- [ ] Error Handling(Priority)
- [x] Error Handling(Priority)
- [ ] Fault tolerance

### RDD
Most of these except file reader and writer are trivial to implement
Expand Down
2 changes: 1 addition & 1 deletion config_files/hosts.conf
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
master = "0.0.0.0:3000"
master = "ip:3000"
slaves = ["user@ip", "user@ip", "user@ip", "user@ip"]
1 change: 0 additions & 1 deletion examples/file_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,5 @@ fn main() -> Result<()> {
let avg = sum.map(Fn!(|(k, (v, c))| (k, v as f64 / c)));
let res = avg.collect();
println!("{:?}", &res[0]);
sc.drop_executors();
Ok(())
}
1 change: 0 additions & 1 deletion examples/groupby.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,5 @@ fn main() -> Result<()> {
let g = r.group_by_key(4);
let res = g.collect();
println!("res {:?}", res);
sc.drop_executors();
Ok(())
}
1 change: 0 additions & 1 deletion examples/make_rdd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,5 @@ fn main() -> Result<()> {
let vec_iter = col.map(Fn!(|i| (0..i).collect::<Vec<_>>()));
let res = vec_iter.collect();
println!("{:?}", res);
sc.drop_executors();
Ok(())
}
1 change: 0 additions & 1 deletion examples/parquet_column_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ fn main() -> Result<()> {
let avg = sum.map(Fn!(|(k, (v, c))| (k, v as f64 / c)));
let res = avg.collect();
println!("{:?}", &res[0]);
sc.drop_executors();
Ok(())
}

Expand Down
2 changes: 1 addition & 1 deletion src/co_grouped_rdd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ impl<K: Data + Eq + Hash> RddBase for CoGroupedRdd<K> {
fn get_rdd_id(&self) -> usize {
self.vals.id
}
fn get_context(&self) -> Context {
fn get_context(&self) -> Arc<Context> {
self.vals.context.clone()
}
fn get_dependencies(&self) -> &[Dependency] {
Expand Down
41 changes: 25 additions & 16 deletions src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ impl Default for Schedulers {

impl Schedulers {
pub fn run_job<T: Data, U: Data, F, RT>(
&mut self,
&self,
func: Arc<F>,
final_rdd: Arc<RT>,
partitions: Vec<usize>,
Expand All @@ -65,7 +65,7 @@ impl Schedulers {
}
}

#[derive(Clone, Default)]
#[derive(Default)]
pub struct Context {
next_rdd_id: Arc<AtomicUsize>,
next_shuffle_id: Arc<AtomicUsize>,
Expand All @@ -74,9 +74,16 @@ pub struct Context {
distributed_master: bool,
}

impl Drop for Context{
fn drop(&mut self) {
//TODO clean up temp files
self.drop_executors();
}
}

impl Context {
// Sends the binary to all nodes present in hosts.conf and starts them
pub fn new(mode: &str) -> Result<Self> {
pub fn new(mode: &str) -> Result<Arc<Self>> {
let next_rdd_id = Arc::new(AtomicUsize::new(0));
let next_shuffle_id = Arc::new(AtomicUsize::new(0));
use Schedulers::*;
Expand Down Expand Up @@ -161,7 +168,7 @@ impl Context {
})?;
port += 5000;
}
Ok(Context {
Ok(Arc::new(Context {
next_rdd_id,
next_shuffle_id,
scheduler: Distributed(DistributedScheduler::new(
Expand All @@ -173,7 +180,7 @@ impl Context {
)),
address_map,
distributed_master: true,
})
}))
//TODO handle if master is in another node than from where the program is executed
// ::std::process::exit(0);
}
Expand All @@ -183,27 +190,27 @@ impl Context {
let uuid = Uuid::new_v4().to_string();
initialize_loggers(format!("/tmp/master-{}", uuid));
let scheduler = Local(LocalScheduler::new(num_cpus::get(), 20, true));
Ok(Context {
Ok(Arc::new(Context {
next_rdd_id,
next_shuffle_id,
scheduler,
address_map: Vec::new(),
distributed_master: false,
})
}))
}
_ => {
let scheduler = Local(LocalScheduler::new(num_cpus::get(), 20, true));
Ok(Context {
Ok(Arc::new(Context {
next_rdd_id,
next_shuffle_id,
scheduler,
address_map: Vec::new(),
distributed_master: false,
})
}))
}
}
}
pub fn drop_executors(self) {
fn drop_executors(&self) {
info!("inside context drop in master {}", self.distributed_master);

for (address, port) in self.address_map.clone() {
Expand Down Expand Up @@ -234,17 +241,19 @@ impl Context {

// currently it accepts only vector.
// TODO change this to accept any iterator
pub fn make_rdd<T: Data>(&self, seq: Vec<T>, num_slices: usize) -> ParallelCollection<T> {
// &Arc<Self> is an unstable feature. used here just to keep the user end context usage same as before.
// Can be removed if sc.clone() API seems ok.
pub fn make_rdd<T: Data>(self: &Arc<Self>, seq: Vec<T>, num_slices: usize) -> ParallelCollection<T> {
//let num_slices = seq.len() / num_slices;
self.parallelize(seq, num_slices)
}

pub fn parallelize<T: Data>(&self, seq: Vec<T>, num_slices: usize) -> ParallelCollection<T> {
pub fn parallelize<T: Data>(self: &Arc<Self>, seq: Vec<T>, num_slices: usize) -> ParallelCollection<T> {
ParallelCollection::new(self.clone(), seq, num_slices)
}

/// Load files from the local host and turn them into a parallel collection.
pub fn read_files<F, C, R, D: Data>(&mut self, config: C, func: F) -> impl Rdd<D>
pub fn read_files<F, C, R, D: Data>(self: &Arc<Self>, config: C, func: F) -> impl Rdd<D>
where
F: SerFunc(R) -> D,
C: ReaderConfiguration<R>,
Expand All @@ -255,7 +264,7 @@ impl Context {
parallel_readers.map(func)
}

pub fn run_job<T: Data, U: Data, RT, F>(&mut self, rdd: Arc<RT>, func: F) -> Vec<U>
pub fn run_job<T: Data, U: Data, RT, F>(&self, rdd: Arc<RT>, func: F) -> Vec<U>
where
F: SerFunc(Box<dyn Iterator<Item = T>>) -> U,
RT: Rdd<T> + 'static,
Expand All @@ -271,7 +280,7 @@ impl Context {
}

pub fn run_job_with_partitions<T: Data, U: Data, RT, F, P>(
&mut self,
&self,
rdd: Arc<RT>,
func: F,
partitions: P,
Expand All @@ -286,7 +295,7 @@ impl Context {
.run_job(Arc::new(cl), rdd, partitions.into_iter().collect(), false)
}

pub fn run_job_with_context<T: Data, U: Data, RT, F>(&mut self, rdd: Arc<RT>, func: F) -> Vec<U>
pub fn run_job_with_context<T: Data, U: Data, RT, F>(&self, rdd: Arc<RT>, func: F) -> Vec<U>
where
F: SerFunc((TasKContext, Box<dyn Iterator<Item = T>>)) -> U,
RT: Rdd<T> + 'static,
Expand Down
8 changes: 4 additions & 4 deletions src/distributed_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ impl DistributedScheduler {
}

pub fn run_job<T: Data, U: Data, F, RT>(
&mut self,
&self,
func: Arc<F>,
final_rdd: Arc<RT>,
partitions: Vec<usize>,
Expand Down Expand Up @@ -767,14 +767,14 @@ impl DistributedScheduler {
Vec::new()
}

fn wait_for_event(&mut self, run_id: usize, timeout: u64) -> Option<CompletionEvent> {
fn wait_for_event(&self, run_id: usize, timeout: u64) -> Option<CompletionEvent> {
let end = Instant::now() + Duration::from_millis(timeout);
while self.event_queues.lock().get(&run_id).unwrap().is_empty() {
if Instant::now() > end {
return None;
} else{
thread::sleep(end - Instant::now());
}

thread::sleep(Duration::from_millis(250));
}
self.event_queues
.lock()
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
fn_traits,
specialization,
unboxed_closures,
arbitrary_self_types,
unsize
)]
#![allow(
Expand Down
8 changes: 4 additions & 4 deletions src/local_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ impl LocalScheduler {
}

pub fn run_job<T: Data, U: Data, F, RT>(
&mut self,
&self,
func: Arc<F>,
final_rdd: Arc<RT>,
partitions: Vec<usize>,
Expand Down Expand Up @@ -751,14 +751,14 @@ impl LocalScheduler {
Vec::new()
}

fn wait_for_event(&mut self, run_id: usize, timeout: u64) -> Option<CompletionEvent> {
fn wait_for_event(&self, run_id: usize, timeout: u64) -> Option<CompletionEvent> {
let end = Instant::now() + Duration::from_millis(timeout);
while self.event_queues.lock().get(&run_id).unwrap().is_empty() {
if Instant::now() > end {
return None;
} else{
thread::sleep(end - Instant::now());
}

thread::sleep(Duration::from_millis(250));
}
self.event_queues
.lock()
Expand Down
4 changes: 2 additions & 2 deletions src/pair_rdd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ where
fn get_rdd_id(&self) -> usize {
self.vals.id
}
fn get_context(&self) -> Context {
fn get_context(&self) -> Arc<Context> {
self.vals.context.clone()
}
fn get_dependencies(&self) -> &[Dependency] {
Expand Down Expand Up @@ -383,7 +383,7 @@ where
fn get_rdd_id(&self) -> usize {
self.vals.id
}
fn get_context(&self) -> Context {
fn get_context(&self) -> Arc<Context> {
self.vals.context.clone()
}
fn get_dependencies(&self) -> &[Dependency] {
Expand Down
8 changes: 4 additions & 4 deletions src/parallel_collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ impl<T: Data> ParallelCollectionSplit<T> {
pub struct ParallelCollectionVals<T> {
vals: Arc<RddVals>,
#[serde(skip_serializing, skip_deserializing)]
context: Context,
context: Arc<Context>,
// data: Option<Vec<T>>,
splits_: Vec<Arc<Vec<T>>>,
num_slices: usize,
Expand All @@ -80,7 +80,7 @@ impl<T: Data> Clone for ParallelCollection<T> {
}

impl<T: Data> ParallelCollection<T> {
pub fn new(context: Context, data: Vec<T>, num_slices: usize) -> Self {
pub fn new(context: Arc<Context>, data: Vec<T>, num_slices: usize) -> Self {
ParallelCollection {
rdd_vals: Arc::new(ParallelCollectionVals {
vals: Arc::new(RddVals::new(context.clone())),
Expand All @@ -91,7 +91,7 @@ impl<T: Data> ParallelCollection<T> {
}
}

pub fn from_chunkable<C>(context: Context, data: C) -> Self
pub fn from_chunkable<C>(context: Arc<Context>, data: C) -> Self
where
C: Chunkable<T>,
{
Expand Down Expand Up @@ -160,7 +160,7 @@ impl<T: Data> RddBase for ParallelCollection<T> {
fn get_rdd_id(&self) -> usize {
self.rdd_vals.vals.id
}
fn get_context(&self) -> Context {
fn get_context(&self) -> Arc<Context> {
self.rdd_vals.vals.context.clone()
}
fn get_dependencies(&self) -> &[Dependency] {
Expand Down
12 changes: 6 additions & 6 deletions src/rdd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,16 @@ pub struct RddVals {
pub dependencies: Vec<Dependency>,
should_cache: bool,
#[serde(skip_serializing, skip_deserializing)]
pub context: Context,
pub context: Arc<Context>,
}

impl RddVals {
pub fn new(sc: Context) -> Self {
pub fn new(sc: Arc<Context>) -> Self {
RddVals {
id: sc.new_rdd_id(),
dependencies: Vec::new(),
should_cache: false,
context: sc.clone(),
context: sc,
}
}

Expand All @@ -64,7 +64,7 @@ impl RddVals {
// Another separate Rdd containing generic methods like map, etc.,
pub trait RddBase: Send + Sync + Serialize + Deserialize {
fn get_rdd_id(&self) -> usize;
fn get_context(&self) -> Context;
fn get_context(&self) -> Arc<Context>;
fn get_dependencies(&self) -> &[Dependency];
fn preferred_locations(&self, split: Box<dyn Split>) -> Vec<Ipv4Addr> {
Vec::new()
Expand Down Expand Up @@ -390,7 +390,7 @@ where
self.vals.id
}

fn get_context(&self) -> Context {
fn get_context(&self) -> Arc<Context> {
self.vals.context.clone()
}

Expand Down Expand Up @@ -521,7 +521,7 @@ where
self.vals.id
}

fn get_context(&self) -> Context {
fn get_context(&self) -> Arc<Context> {
self.vals.context.clone()
}

Expand Down
2 changes: 1 addition & 1 deletion src/shuffle_fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ pub struct ShuffleFetcher;
impl ShuffleFetcher {
pub fn fetch<K: Data, V: Data>(
&self,
sc: Context,
sc: Arc<Context>,
shuffle_id: usize,
reduce_id: usize,
mut func: impl FnMut((K, V)) -> (),
Expand Down
2 changes: 1 addition & 1 deletion src/shuffled_rdd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ where
fn get_rdd_id(&self) -> usize {
self.vals.id
}
fn get_context(&self) -> Context {
fn get_context(&self) -> Arc<Context> {
self.vals.context.clone()
}
fn get_dependencies(&self) -> &[Dependency] {
Expand Down
2 changes: 0 additions & 2 deletions tests/test_pair_rdd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ fn test_group_by() {
let mut res = g.collect();
res.sort();
println!("res {:?}", res);
sc.drop_executors();

let expected = vec![
("x".to_string(), vec![1, 2, 3, 4, 5, 6, 7]),
Expand Down Expand Up @@ -58,7 +57,6 @@ fn test_join() {
let mut res = inner_joined_rdd.collect();
println!("res {:?}", res);
res.sort();
sc.drop_executors();

let expected = vec![
(1, "A1", "A", "B"),
Expand Down
Loading

0 comments on commit 9c8a0d3

Please sign in to comment.