Skip to content

Commit

Permalink
added explicit import statements with proper scope and did rustfmt
Browse files Browse the repository at this point in the history
  • Loading branch information
rajasekarv committed Jan 24, 2020
1 parent ab50b00 commit 16eec95
Show file tree
Hide file tree
Showing 39 changed files with 878 additions and 764 deletions.
1 change: 1 addition & 0 deletions rustfmt.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
reorder_imports = true
6 changes: 2 additions & 4 deletions src/aggregator.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
use crate::serializable_traits::Data;
use serde_derive::{Deserialize, Serialize};
use std::marker::PhantomData;

use super::*;

// check once where C have to satisfy Data trait

// Aggregator for shuffle tasks.
#[derive(Serialize, Deserialize)]
pub struct Aggregator<K: Data, V: Data, C: Data> {
Expand Down
2 changes: 1 addition & 1 deletion src/cache.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::*;
use parking_lot::Mutex;
use serde_derive::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
Expand Down
10 changes: 6 additions & 4 deletions src/cache_tracker.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
use super::*;
use crate::cache::{BoundedMemoryCache, CachePutResponse, KeySpace};
use crate::env;
//use crate::cache::CachePutResponse::CachePutSuccess;
use crate::rdd::Rdd;
use crate::serializable_traits::Data;
use crate::serialized_data_capnp::serialized_data;
use crate::split::Split;
use capnp::serialize_packed;
use parking_lot::RwLock;
use serde_derive::{Deserialize, Serialize};
use std::collections::LinkedList;
use std::collections::{HashMap, HashSet};
//use std::io::BufReader;
//use std::iter::FromIterator;
use std::net::{Ipv4Addr, SocketAddr, TcpListener, TcpStream};
use std::sync::Arc;
use std::thread;
Expand Down
14 changes: 13 additions & 1 deletion src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,25 @@ use std::process::Command;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;

use crate::serialized_data_capnp::serialized_data;
use capnp::serialize_packed;
use log::{error, info};
use simplelog::*;
use toml;
use uuid::Uuid;

use super::*;
use crate::distributed_scheduler::DistributedScheduler;
use crate::error::{Error, Result};
use crate::executor::Executor;
use crate::io::ReaderConfiguration;
use crate::local_scheduler::LocalScheduler;
use crate::parallel_collection::ParallelCollection;
use crate::rdd::union_rdd::UnionRdd;
use crate::rdd::{Rdd, RddBase};
use crate::scheduler::NativeScheduler;
use crate::serializable_traits::{Data, SerFunc};
use crate::task::TaskContext;
use crate::{env, hosts};

// there is a problem with this approach since T needs to satisfy PartialEq, Eq for Range
// No such restrictions are needed for Vec
Expand Down
3 changes: 2 additions & 1 deletion src/dag_scheduler.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use super::*;
use crate::scheduler::Scheduler;
use crate::task::TaskBase;
use std::any::Any;
use std::collections::HashMap;
use std::error::Error;
Expand Down
10 changes: 8 additions & 2 deletions src/dependency.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
use crate::aggregator::Aggregator;
use crate::env;
use crate::partitioner::Partitioner;
use crate::rdd::RddBase;
use crate::serializable_traits::Data;
use log::info;
use serde_derive::{Deserialize, Serialize};
use serde_traitobject::{Deserialize, Serialize};
use std::cmp::Ordering;
use std::collections::HashMap;
use std::hash::Hash;
use std::sync::Arc;

use super::*;

// Revise if enum is good choice. Considering enum since down casting one trait object to another trait object is difficult.
#[derive(Clone, Serialize, Deserialize)]
pub enum Dependency {
Expand Down
19 changes: 17 additions & 2 deletions src/distributed_scheduler.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use super::*;
use crate::scheduler::Scheduler;
use crate::scheduler::{NativeScheduler, Scheduler};

use std::any::Any;
use std::collections::{
Expand All @@ -16,7 +15,23 @@ use std::thread;
use std::time;
use std::time::{Duration, Instant};

use crate::dag_scheduler::{CompletionEvent, FetchFailedVals, TastEndReason};
use crate::dependency::{Dependency, ShuffleDependencyTrait};
use crate::env;
use crate::error::{Error, Result};
use crate::job::{Job, JobTracker};
use crate::local_scheduler::LocalScheduler;
use crate::map_output_tracker::MapOutputTracker;
use crate::rdd::{Rdd, RddBase};
use crate::result_task::ResultTask;
use crate::scheduler::*;
use crate::serializable_traits::{Data, SerFunc};
use crate::serialized_data_capnp::serialized_data;
use crate::shuffle_map_task::ShuffleMapTask;
use crate::stage::Stage;
use crate::task::{TaskBase, TaskContext, TaskOption, TaskResult};
use capnp::serialize_packed;
use log::info;
use parking_lot::Mutex;
use threadpool::ThreadPool;

Expand Down
9 changes: 7 additions & 2 deletions src/env.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use super::*;

use std::collections::HashMap;
use std::fs::File;
use std::fs::OpenOptions;
Expand Down Expand Up @@ -56,6 +54,13 @@ mod config_vars {
pub(super) const LOG_LEVEL: &str = "NS_LOG_LEVEL";
}

use crate::cache::BoundedMemoryCache;
use crate::cache_tracker::CacheTracker;
use crate::error::Error;
use crate::hosts::Hosts;
use crate::map_output_tracker::MapOutputTracker;
use crate::shuffle_fetcher::ShuffleFetcher;
use crate::shuffle_manager::ShuffleManager;
use config_vars::*;

#[derive(Clone, Copy)]
Expand Down
4 changes: 3 additions & 1 deletion src/executor.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use super::*;
use crate::serialized_data_capnp::serialized_data;
use crate::task::TaskOption;
use capnp::serialize_packed;
use log::info;
use std::net::TcpListener;
use std::thread;
use std::time::Instant;
Expand Down
5 changes: 3 additions & 2 deletions src/hosts.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use super::*;

use std::net::SocketAddr;
use std::path::Path;

use crate::error::{Error, Result};
use once_cell::sync::OnceCell;
use serde_derive::{Deserialize, Serialize};

static HOSTS: OnceCell<Hosts> = OnceCell::new();

Expand Down Expand Up @@ -41,6 +41,7 @@ impl Hosts {
#[cfg(test)]
mod tests {
use super::*;
use std::io::Write;

#[test]
fn test_missing_hosts_file() {
Expand Down
14 changes: 11 additions & 3 deletions src/io/local_file_reader.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,21 @@
use std::fs;
use std::io::BufReader;
use std::io::{BufReader, Read};
use std::net::Ipv4Addr;
use std::path::{Path, PathBuf};
use std::sync::Arc;

use crate::context::Context;
use crate::dependency::Dependency;
use crate::error::{Error, Result};
use crate::io::ReaderConfiguration;
use crate::rdd::{Rdd, RddBase};
use crate::serializable_traits::AnyData;
use crate::split::Split;
use log::debug;
use log::info;
use rand::prelude::*;

use super::*;
use serde_derive::{Deserialize, Serialize};
use serde_traitobject::Arc as SerArc;

pub struct LocalFsReaderConfig {
filter_ext: Option<std::ffi::OsString>,
Expand Down
4 changes: 3 additions & 1 deletion src/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ use std::sync::Arc;
use downcast_rs::Downcast;
use serde_traitobject::Arc as SerArc;

use crate::context::Context;
use crate::rdd::Rdd;

mod local_file_reader;
use crate::*;
pub use local_file_reader::{LocalFsReader, LocalFsReaderConfig, LocalFsReaderSplit};

pub trait ReaderConfiguration {
Expand Down
7 changes: 5 additions & 2 deletions src/job.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use crate::*;

use std::any::Any;
use std::cell::RefCell;
use std::clone::Clone;
Expand All @@ -17,6 +15,11 @@ use std::thread;
use std::time;
use std::time::{Duration, Instant};

use crate::rdd::Rdd;
use crate::scheduler::NativeScheduler;
use crate::serializable_traits::{Data, SerFunc};
use crate::stage::Stage;
use crate::task::{TaskBase, TaskContext};
use threadpool::ThreadPool;

#[derive(Clone, Debug)]
Expand Down
80 changes: 19 additions & 61 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,87 +29,45 @@ pub mod serialized_data_capnp {
include!(concat!(env!("OUT_DIR"), "/capnp/serialized_data_capnp.rs"));
}

use std::io::prelude::*;

pub mod context;
pub use context::*;

pub use context::Context;
mod executor;
use executor::*;

pub mod partitioner;
pub use partitioner::*;

pub mod rdd;
pub use rdd::*;

pub mod io;

pub use io::*;
mod dependency;
use dependency::*;

mod split;
use split::*;

pub use dependency::*;
pub mod split;
pub use split::*;
mod parallel_collection;
use parallel_collection::*;

mod cache_tracker;
use cache_tracker::*;

pub use parallel_collection::*;
mod cache;
use cache::*;

mod cache_tracker;
mod shuffle_fetcher;
use shuffle_fetcher::*;

mod shuffle_manager;
use shuffle_manager::*;

mod shuffle_map_task;
use shuffle_map_task::*;

pub mod shuffle_map_task;
pub use shuffle_map_task::*;
#[macro_use]
mod scheduler;
use scheduler::*;

pub mod aggregator;
mod dag_scheduler;
use dag_scheduler::*;

mod task;
use task::*;

mod local_scheduler;
use local_scheduler::*;

mod distributed_scheduler;
use distributed_scheduler::*;

mod local_scheduler;
mod stage;
use stage::*;

mod aggregator;
use aggregator::*;

mod task;
pub use aggregator::*;
mod env;
mod job;
mod map_output_tracker;
use map_output_tracker::*;

mod result_task;
use result_task::*;

mod job;
use job::*;

mod serializable_traits;
use serializable_traits::{AnyData, Data, Func, SerFunc};

mod env;
pub mod serializable_traits;
pub use env::DeploymentMode;

pub mod error;
pub use error::{Error, Result};

pub use error::*;
mod hosts;
use hosts::Hosts;

mod utils;
pub mod utils;
pub use utils::*;
16 changes: 14 additions & 2 deletions src/local_scheduler.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use super::*;
use crate::job::JobTracker;
use crate::job::{Job, JobTracker};
use crate::scheduler::NativeScheduler;

use std::any::Any;
Expand All @@ -18,6 +17,19 @@ use std::thread;
use std::time;
use std::time::{Duration, Instant};

use crate::dag_scheduler::{CompletionEvent, TastEndReason};
use crate::dependency::ShuffleDependencyTrait;
use crate::env;
use crate::error::{Error, Result};
use crate::map_output_tracker::MapOutputTracker;
use crate::rdd::{Rdd, RddBase};
use crate::result_task::ResultTask;
use crate::serializable_traits::{Data, SerFunc};
use crate::serialized_data_capnp::serialized_data;
use crate::shuffle_map_task::ShuffleMapTask;
use crate::stage::Stage;
use crate::task::{TaskBase, TaskContext, TaskOption, TaskResult};
use log::info;
use parking_lot::Mutex;
use threadpool::ThreadPool;

Expand Down
5 changes: 3 additions & 2 deletions src/map_output_tracker.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use super::*;

use crate::serialized_data_capnp::serialized_data;
use log::info;
use serde_derive::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet};
use std::net::{IpAddr, Ipv4Addr, SocketAddr, TcpListener, TcpStream};
use std::sync::Arc;
Expand Down
11 changes: 8 additions & 3 deletions src/parallel_collection.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
use super::*;

use crate::context::Context;
use crate::dependency::Dependency;
use crate::error::Result;
use crate::rdd::{Rdd, RddBase, RddVals};
use crate::serializable_traits::{AnyData, Data};
use crate::split::Split;
use log::info;
use serde_derive::{Deserialize, Serialize};
use std::hash::Hash;
use std::sync::Arc;

// This module implements parallel collection RDD for dividing the input collection for parallel processing

/// A collection of objects which can be sliced into partitions with a partitioning function.
Expand Down
4 changes: 3 additions & 1 deletion src/partitioner.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use super::*;
use crate::serializable_traits::Data;
use downcast_rs::Downcast;
use fasthash::MetroHasher;
use serde_derive::{Deserialize, Serialize};
use serde_traitobject::{Deserialize, Serialize};
use std::any::Any;
use std::hash::{Hash, Hasher};
use std::marker::PhantomData;
Expand Down
Loading

0 comments on commit 16eec95

Please sign in to comment.