Skip to content

Commit

Permalink
initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
rajasekarv committed Oct 23, 2019
1 parent 6cf314c commit 67a5d0c
Show file tree
Hide file tree
Showing 40 changed files with 9,905 additions and 1 deletion.
3,039 changes: 3,039 additions & 0 deletions Cargo.lock

Large diffs are not rendered by default.

44 changes: 44 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
[package]
name = "fast_spark"
version = "0.1.0"
authors = ["raja <[email protected]>"]
edition = "2018"

build = "build.rs"


[dependencies]
objekt = "0.1.2"
objekt-clonable-impl = "0.2.2"
downcast-rs = "1.0.3"
threadpool = "1.7.1"
#derivative = "1.0.2"
serde_traitobject = "0.1.3"
toml = "0.5.0"
serde_closure = "0.1.3"
serde = { version = "1.0.90", features = ["rc"] }
reqwest = "0.9.16"
lazy_static = "1.3.0"
serde_derive = "1.0.90"
fasthash = "0.4.0"
bincode = "1.1.3"
num_cpus = "1.10.1"
uuid = "0.7.4"
env_logger = "0.5"
actix-web = "1.0.0-beta.3"
#actix-files = "0.1.0-beta.1"
rand = "*"
parking_lot = { version = "0.9.0", features = ["serde"] }
capnp = "0.9.5"
simplelog = "0.7.4"
log = "0.4.8"

[build-dependencies]
capnpc = "0.9.5"

[dev-dependencies]
chrono = "0.4"
serde_closure = "0.1.3"
itertools = "0.8.0"
parquet = "0.15.0"

48 changes: 47 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,48 @@
# fast_spark
A new arguably faster implementation of Apache Spark from scratch in Rust
A new arguably faster implementation of Apache Spark from scratch in Rust. WIP

Just install Cap'n proto(https://capnproto.org/install.html) and you are good to go. Code is tested only on linux and requires nightly version. It is tested for version 1.39 only, there are some breaking changes in specialization from version to version, so use 1.39 only for now. So just use
cargo +nightly-nightly-2019-09-11 build --release

Refer make_rdd.rs and other examples in example code to get the basic idea

You need to have hosts.conf in the format present inside config folder in all of the machines when running in distributed mode and all should be sshable.
master port can be configured in hosts.conf and 10500 in executors should be free. ports 5000-6000 is reserved for shuffle manager. It will be handled internally soon.

Since File readers are not done, you have to use manual file reading for now(like manually reading from S3 or hack around local files by distributing copies of all files to all machines and make rdd using filename list).

Ctrl-C handling and panic handling is not done yet. So if there is some problem in runtime, exceutors won't be shut down automatically and you have to manually kill the processes.

One of the limitations of current implementation is that the input and return types of all closures and all input to make_rdd should be owned data.

## ToDo:

- [ ] Error Handling(Priority)

### RDD
Most of these except file reader and writer are trivial to implement
- [x] map
- [x] flat_map
- [x] filter
- [ ] step_by
- [ ] take_sample
- [ ] union
- [ ] glom
- [ ] cartesian
- [x] group_by
- [x] reduceby
- [ ] pipe
- [ ] map_partitions
- [ ] for_each
- [x] collect
- [ ] reduce
- [ ] fold
- [ ] aggregate
- [ ] take
- [ ] first
- [ ] save_as_text_file(can save only as text file in executors local file system)

### Congig Files
- [ ] Replace hard coded values


9 changes: 9 additions & 0 deletions build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
extern crate capnpc;

fn main() {
capnpc::CompilerCommand::new()
.src_prefix("src")
.file("src/capnp/serialized_data.capnp")
.run()
.expect("capnpc compiling issue");
}
2 changes: 2 additions & 0 deletions config_files/hosts.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
master = "localhost:3000"
slaves = ["user@ip", "user@ip", "user@ip", "user@ip"]
37 changes: 37 additions & 0 deletions examples/file_read.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
use fast_spark::*;
#[macro_use]
extern crate serde_closure;
use chrono::prelude::*;
use std::fs;
use std::io::{BufRead, BufReader};

fn main() {
let sc = Context::new("local");
let files = fs::read_dir("csv_folder")
.unwrap()
.into_iter()
.map(|x| x.unwrap().path().to_str().unwrap().to_owned())
.collect::<Vec<_>>();
let len = files.len();
let files = sc.make_rdd(files, len);
let lines = files.flat_map(Fn!(|file| {
let f = fs::File::open(file).expect("unable to create file");
let f = BufReader::new(f);
Box::new(f.lines().map(|line| line.unwrap())) as Box<dyn Iterator<Item = String>>
}));
let line = lines.map(Fn!(|line: String| {
let line = line.split(" ").collect::<Vec<_>>();
let mut time: i64 = line[8].parse::<i64>().unwrap();
time = time / 1000;
let time = Utc.timestamp(time, 0).hour();
(
(line[0].to_string(), line[1].to_string(), time),
(line[7].parse::<i64>().unwrap(), 1.0),
)
}));
let sum = line.reduce_by_key(Fn!(|((vl, cl), (vr, cr))| (vl + vr, cl + cr)), 1);
let avg = sum.map(Fn!(|(k, (v, c))| (k, v as f64 / c)));
let res = avg.collect();
println!("{:?}", &res[0]);
sc.drop_executors();
}
29 changes: 29 additions & 0 deletions examples/groupby.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
use fast_spark::*;
#[macro_use]
extern crate serde_closure;

fn main() {
let sc = Context::new("local");
let vec = vec![
("x".to_string(), 1),
("x".to_string(), 2),
("x".to_string(), 3),
("x".to_string(), 4),
("x".to_string(), 5),
("x".to_string(), 6),
("x".to_string(), 7),
("y".to_string(), 1),
("y".to_string(), 2),
("y".to_string(), 3),
("y".to_string(), 4),
("y".to_string(), 5),
("y".to_string(), 6),
("y".to_string(), 7),
("y".to_string(), 8),
];
let r = sc.make_rdd(vec, 4);
let g = r.group_by_key(4);
let res = g.collect();
println!("res {:?}", res);
sc.drop_executors();
}
26 changes: 26 additions & 0 deletions examples/join.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
use fast_spark::*;
#[macro_use]
extern crate serde_closure;

fn main() {
let sc = Context::new("local");
let col1 = vec![
(1, ("A".to_string(), "B".to_string())),
(2, ("C".to_string(), "D".to_string())),
(3, ("E".to_string(), "F".to_string())),
(4, ("G".to_string(), "H".to_string())),
];
let col1 = sc.parallelize(col1, 4);
let col2 = vec![
(1, "A1".to_string()),
(1, "A2".to_string()),
(2, "B1".to_string()),
(2, "B2".to_string()),
(3, "C1".to_string()),
(3, "C2".to_string()),
];
let col2 = sc.parallelize(col2, 4);
let inner_joined_rdd = col2.join(col1.clone(), 4);
let res = inner_joined_rdd.collect();
println!("res {:?}", res);
}
14 changes: 14 additions & 0 deletions examples/make_rdd.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
use fast_spark::*;
#[macro_use]
extern crate serde_closure;

fn main() {
// for distributed mode, use Context::new("distributed")
let sc = Context::new("local");
let col = sc.make_rdd((0..10).collect::<Vec<_>>(), 32);
//Fn! will make the closures serializable. It is necessary. use serde_closure version 0.1.3.
let vec_iter = col.map(Fn!(|i| (0..i).collect::<Vec<_>>()));
let res = vec_iter.collect();
println!("{:?}", res[100]);
sc.drop_executors();
}
97 changes: 97 additions & 0 deletions examples/parquet_column_read.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
#[macro_use]
extern crate serde_closure;
#[macro_use]
extern crate itertools;
use chrono::prelude::*;
use fast_spark::*;
use parquet::column::reader::get_typed_column_reader;
use parquet::data_type::{ByteArrayType, Int32Type, Int64Type};
use parquet::file::reader::{FileReader, SerializedFileReader};
use std::fs;
use std::fs::File;
use std::path::Path;

fn main() {
let sc = Context::new("local");
let files = fs::read_dir("parquet_file_dir")
.unwrap()
.into_iter()
.map(|x| x.unwrap().path().to_str().unwrap().to_owned())
.collect::<Vec<_>>();
let len = files.len();
let files = sc.make_rdd(files, len);
let read = files.flat_map(Fn!(|file| read(file)));
let sum = read.reduce_by_key(Fn!(|((vl, cl), (vr, cr))| (vl + vr, cl + cr)), 1);
let avg = sum.map(Fn!(|(k, (v, c))| (k, v as f64/c)));
let res = avg.collect();
println!("{:?}", &res[0]);
sc.drop_executors();
}

fn read(file: String) -> Box<dyn Iterator<Item = ((i32, String, i64), (i64, f64))>> {
let file = File::open(&Path::new(&file)).unwrap();
let reader = SerializedFileReader::new(file).unwrap();
let metadata = reader.metadata();
let batch_size = 5_00_000 as usize;
//let reader = Rc::new(RefCell::new(reader));
let iter = (0..metadata.num_row_groups()).flat_map(move |i| {
//let reader = reader.borrow_mut();
let row_group_reader = reader.get_row_group(i).unwrap();
let mut first_reader =
get_typed_column_reader::<Int32Type>(row_group_reader.get_column_reader(0).unwrap());
let mut second_reader = get_typed_column_reader::<ByteArrayType>(
row_group_reader.get_column_reader(1).unwrap(),
);
let mut bytes_reader =
get_typed_column_reader::<Int64Type>(row_group_reader.get_column_reader(7).unwrap());
let mut time_reader =
get_typed_column_reader::<Int64Type>(row_group_reader.get_column_reader(8).unwrap());
let num_rows = metadata.row_group(i).num_rows() as usize;
println!("row group rows {}", num_rows);
let mut chunks = vec![];
let mut batch_count = 0 as usize;
while batch_count < num_rows {
let begin = batch_count;
let mut end = batch_count + batch_size;
if end > num_rows {
end = num_rows as usize;
}
chunks.push((begin, end));
batch_count = end;
}
println!("total rows-{} chunks-{:?}", num_rows, chunks);
chunks.into_iter().flat_map(move |(begin, end)| {
let end = end as usize;
let begin = begin as usize;
let mut first = vec![Default::default(); end - begin];
let mut second = vec![Default::default(); end - begin];
let mut time = vec![Default::default(); end - begin];
let mut bytes = vec![Default::default(); end - begin];
first_reader
.read_batch(batch_size, None, None, &mut first)
.unwrap();
second_reader
.read_batch(batch_size, None, None, &mut second)
.unwrap();
time_reader
.read_batch(batch_size, None, None, &mut time)
.unwrap();
bytes_reader
.read_batch(batch_size, None, None, &mut bytes)
.unwrap();
let first = first.into_iter();
let second = second
.into_iter()
.map(|x| unsafe { String::from_utf8_unchecked(x.data().to_vec()) });
let time = time.into_iter().map(|t| {
let t = t / 1000;
Utc.timestamp(t, 0).hour() as i64
});
let bytes = bytes.into_iter().map(|b| (b, 1.0));
let key = izip!(first, second, time);
let value = bytes;
key.zip(value)
})
});
Box::new(iter)
}
37 changes: 37 additions & 0 deletions src/aggregator.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
use super::*;
//use parking_lot::Mutex;
use std::marker::PhantomData;
//use std::sync::Arc;
//use std::sync::Mutex;

// check once where C have to satisfy Data trait

// Aggregator for shuffle tasks
// Presently Mutexes are being used just to avoid the lifetime issues associated with mutable references.
// It has significant cost associated with it due to lot of locking requirements
// In future change everything to plain mutable references.
#[derive(Serialize, Deserialize)]
pub struct Aggregator<K: Data, V: Data, C: Data> {
#[serde(with = "serde_traitobject")]
pub create_combiner: Box<dyn serde_traitobject::Fn(V) -> C + Send + Sync>,
#[serde(with = "serde_traitobject")]
pub merge_value: Box<dyn serde_traitobject::Fn((C, V)) -> C + Send + Sync>,
#[serde(with = "serde_traitobject")]
pub merge_combiners: Box<dyn serde_traitobject::Fn((C, C)) -> C + Send + Sync>,
_marker: PhantomData<K>,
}

impl<K: Data, V: Data, C: Data> Aggregator<K, V, C> {
pub fn new(
create_combiner: Box<dyn serde_traitobject::Fn(V) -> C + Send + Sync>,
merge_value: Box<dyn serde_traitobject::Fn((C, V)) -> C + Send + Sync>,
merge_combiners: Box<dyn serde_traitobject::Fn((C, C)) -> C + Send + Sync>,
) -> Self {
Aggregator {
create_combiner,
merge_value,
merge_combiners,
_marker: PhantomData,
}
}
}
Loading

0 comments on commit 67a5d0c

Please sign in to comment.