Skip to content

Commit

Permalink
chore: cargo fmt
Browse files Browse the repository at this point in the history
  • Loading branch information
0b01 committed Jan 3, 2018
1 parent de75e8c commit ba6cec7
Show file tree
Hide file tree
Showing 40 changed files with 1,448 additions and 1,054 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@ test.dtf
*.pyc
db

test/zrx/
75 changes: 44 additions & 31 deletions src/bin/cli/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ impl fmt::Display for TectonicError {
}
}

struct CxnStream{stream: TcpStream}
struct CxnStream {
stream: TcpStream,
}

impl CxnStream {
fn cmd(&mut self, command: &str) -> Result<String, TectonicError> {
Expand All @@ -54,9 +56,7 @@ impl CxnStream {

if command.starts_with("GET") && !command.contains("AS JSON") && success {
match dtf::read_one_batch(&mut self.stream) {
Ok(vecs) => {
Ok(format!("[{}]\n", dtf::update_vec_to_json(&vecs)))
},
Ok(vecs) => Ok(format!("[{}]\n", dtf::update_vec_to_json(&vecs))),
Err(_) => Err(TectonicError::DecodeError),
}
} else {
Expand All @@ -79,16 +79,16 @@ pub struct Cxn {
}

impl Cxn {
pub fn new(host: &str, port : &str) -> Result<Cxn, TectonicError> {
pub fn new(host: &str, port: &str) -> Result<Cxn, TectonicError> {
let addr = format!("{}:{}", host, port);

let stream = match TcpStream::connect(&addr) {
Ok(stm) => stm,
Err(_) => return Err(TectonicError::ConnectionError)
Err(_) => return Err(TectonicError::ConnectionError),
};

Ok(Cxn {
stream: Arc::new(RwLock::new(CxnStream{stream: stream})),
stream: Arc::new(RwLock::new(CxnStream { stream: stream })),
subscription: None,
})
}
Expand All @@ -102,15 +102,13 @@ impl Cxn {
let tx = Arc::new(Mutex::new(tx));
let rx = Arc::new(Mutex::new(rx));

thread::spawn(move || {
loop {
let res = streamcopy.write().unwrap().cmd("\n").unwrap();
println!("{}", res);
if res == "NONE\n" {
thread::sleep(time::Duration::from_millis(1));
} else {
let _ = tx.lock().unwrap().send(res);
}
thread::spawn(move || loop {
let res = streamcopy.write().unwrap().cmd("\n").unwrap();
println!("{}", res);
if res == "NONE\n" {
thread::sleep(time::Duration::from_millis(1));
} else {
let _ = tx.lock().unwrap().send(res);
}
});

Expand All @@ -119,7 +117,7 @@ impl Cxn {
Ok(())
}

pub fn cmd(&mut self, command : &str) -> Result<String, TectonicError> {
pub fn cmd(&mut self, command: &str) -> Result<String, TectonicError> {
self.stream.write().unwrap().cmd(command)
}

Expand All @@ -131,7 +129,7 @@ impl Cxn {
}
}

pub struct CxnPool{
pub struct CxnPool {
cxns: Vec<Cxn>,
host: String,
port: String,
Expand All @@ -141,7 +139,7 @@ pub struct CxnPool{

use std::collections::VecDeque;
impl CxnPool {
pub fn new(n: usize, host: &str, port : &str) -> Result<Self, TectonicError> {
pub fn new(n: usize, host: &str, port: &str) -> Result<Self, TectonicError> {
let mut v = vec![];
let mut q = VecDeque::new();

Expand All @@ -151,7 +149,7 @@ impl CxnPool {
q.push_back(i);
}

Ok(CxnPool{
Ok(CxnPool {
cxns: v,
host: host.to_owned(),
port: host.to_owned(),
Expand Down Expand Up @@ -191,7 +189,9 @@ impl CxnPool {

pub fn insert(&mut self, cmd: &InsertCommand) -> Result<(), TectonicError> {

for i in self.insert_retry_queue.pop() { let _ = self.insert(&i)?; }
for i in self.insert_retry_queue.pop() {
let _ = self.insert(&i)?;
}

let n = self.available_workers.pop_front();
let n = match n {
Expand All @@ -213,9 +213,8 @@ impl CxnPool {
self.insert_retry_queue.push(cmd.clone());
self.cxns[n] = Cxn::new(&self.host, &self.port)?;
return Err(TectonicError::ConnectionError);
},
Err(TectonicError::ServerError(msg)) => {
},
}
Err(TectonicError::ServerError(msg)) => {}
_ => (),
}
}
Expand All @@ -239,18 +238,32 @@ impl InsertCommand {
pub fn into_string(self) -> Vec<String> {
match self {
InsertCommand::Add(dbname, up) => {
let is_trade = if up.is_trade {"t"} else {"f"};
let is_bid = if up.is_bid {"t"} else {"f"};
let s = format!("ADD {}, {}, {}, {}, {}, {}; INTO {}\n",
up.ts, up.seq, is_trade, is_bid, up.price, up.size, dbname
let is_trade = if up.is_trade { "t" } else { "f" };
let is_bid = if up.is_bid { "t" } else { "f" };
let s = format!(
"ADD {}, {}, {}, {}, {}, {}; INTO {}\n",
up.ts,
up.seq,
is_trade,
is_bid,
up.price,
up.size,
dbname
);
vec![s]
},
}
InsertCommand::BulkAdd(dbname, ups) => {
let mut cmds = vec![format!("BULKADD INTO {}\n", dbname)];
for up in ups {
cmds.push(format!("{}, {}, {}, {}, {}, {};\n",
up.ts, up.seq, up.is_trade, up.is_bid, up.price, up.size));
cmds.push(format!(
"{}, {}, {}, {}, {}, {};\n",
up.ts,
up.seq,
up.is_trade,
up.is_bid,
up.price,
up.size
));
}

cmds.push("DDAKLUB\n".to_owned());
Expand Down
84 changes: 48 additions & 36 deletions src/bin/cli/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,44 +11,54 @@ mod db;

fn main() {
let matches = App::new("tectonic-cli")
.version("0.0.1")
.author("Ricky Han <[email protected]>")
.about("command line client for tectonic financial datastore")
.arg(Arg::with_name("host")
.short("h")
.long("host")
.value_name("HOST")
.help("Sets the host to connect to (default 0.0.0.0)")
.takes_value(true))
.arg(Arg::with_name("port")
.short("p")
.long("port")
.value_name("PORT")
.help("Sets the port to connect to (default 9001)")
.takes_value(true))
.arg(Arg::with_name("s")
.short("s")
.long("subscription")
.value_name("DBNAME")
.help("subscribe to the datastore")
.takes_value(true))
.arg(Arg::with_name("b")
.short("b")
.value_name("ITERATION")
.multiple(false)
.help("Benchmark network latency")
.takes_value(true))
.get_matches();
.version("0.0.1")
.author("Ricky Han <[email protected]>")
.about("command line client for tectonic financial datastore")
.arg(
Arg::with_name("host")
.short("h")
.long("host")
.value_name("HOST")
.help("Sets the host to connect to (default 0.0.0.0)")
.takes_value(true),
)
.arg(
Arg::with_name("port")
.short("p")
.long("port")
.value_name("PORT")
.help("Sets the port to connect to (default 9001)")
.takes_value(true),
)
.arg(
Arg::with_name("s")
.short("s")
.long("subscription")
.value_name("DBNAME")
.help("subscribe to the datastore")
.takes_value(true),
)
.arg(
Arg::with_name("b")
.short("b")
.value_name("ITERATION")
.multiple(false)
.help("Benchmark network latency")
.takes_value(true),
)
.get_matches();

let host = matches.value_of("host").unwrap_or("0.0.0.0");
let port = matches.value_of("port").unwrap_or("9001");

let mut cxn = db::Cxn::new(host, port).unwrap();

if matches.is_present("b") {
let times = matches.value_of("b") .unwrap_or("10")
.parse::<usize>()
.unwrap_or(10) + 1;
let times = matches
.value_of("b")
.unwrap_or("10")
.parse::<usize>()
.unwrap_or(10) + 1;
benchmark(&mut cxn, times);
} else if matches.is_present("s") {
let dbname = matches.value_of("s").unwrap_or("");
Expand All @@ -66,13 +76,15 @@ fn benchmark(cxn: &mut db::Cxn, times: usize) {
let mut acc = vec![];
let _create = cxn.cmd("CREATE bnc_gas_btc\n");
for _ in 1..times {
let _res = cxn.cmd("ADD 1513922718770, 0, t, f, 0.001939, 22.85; INTO bnc_gas_btc\n");
let _res = cxn.cmd(
"ADD 1513922718770, 0, t, f, 0.001939, 22.85; INTO bnc_gas_btc\n",
);
acc.push(t.elapsed().unwrap().subsec_nanos());
// println!("res: {:?}, latency: {:?}", res, t.elapsed());
t = time::SystemTime::now();
}

let avg_ns = acc.iter().fold(0, |s,i|s+i) as f32 / acc.len() as f32;
let avg_ns = acc.iter().fold(0, |s, i| s + i) as f32 / acc.len() as f32;
println!("AVG ns/insert: {}", avg_ns);
println!("AVG inserts/s: {}", 1. / (avg_ns / 1_000_000_000.));
}
Expand All @@ -88,13 +100,13 @@ fn handle_query(cxn: &mut db::Cxn) {
match cxn.cmd(&cmd) {
Err(db::TectonicError::DecodeError) => {
panic!("Decode Error");
},
}
Err(db::TectonicError::ConnectionError) => {
panic!("Connection Error");
},
}
Err(db::TectonicError::ServerError(msg)) => {
print!("{}", msg);
},
}
Ok(msg) => {
print!("{}", msg);
}
Expand Down
55 changes: 32 additions & 23 deletions src/bin/dtfcat/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,29 @@ use libtectonic::dtf;
use clap::{Arg, App};

fn main() {
let matches = App::new("dtfcat")
.version("1.0.0")
.author("Ricky Han <[email protected]>")
.about("command line client for tectonic financial datastore")
.arg(Arg::with_name("input")
.short("i")
.long("input")
.value_name("INPUT")
.help("file to read")
.required(true)
.takes_value(true))
.arg(Arg::with_name("metadata")
.short("m")
.long("metadata")
.help("read only the metadata"))
.arg(Arg::with_name("csv")
.short("c")
.long("csv")
.help("output csv (default is JSON)"))
.get_matches();
let matches = App::new("dtfcat")
.version("1.0.0")
.author("Ricky Han <[email protected]>")
.about("command line client for tectonic financial datastore")
.arg(
Arg::with_name("input")
.short("i")
.long("input")
.value_name("INPUT")
.help("file to read")
.required(true)
.takes_value(true),
)
.arg(
Arg::with_name("metadata")
.short("m")
.long("metadata")
.help("read only the metadata"),
)
.arg(Arg::with_name("csv").short("c").long("csv").help(
"output csv (default is JSON)",
))
.get_matches();

let input = matches.value_of("input").unwrap();
let metadata = matches.is_present("metadata");
Expand All @@ -34,11 +37,17 @@ fn main() {
if metadata {
println!("{}", dtf::read_meta(input).unwrap());
return;
} else if csv{
println!("{}", dtf::update_vec_to_csv(&dtf::decode(input, None).unwrap()));
} else if csv {
println!(
"{}",
dtf::update_vec_to_csv(&dtf::decode(input, None).unwrap())
);
return;
} else {
println!("[{}]", dtf::update_vec_to_json(&dtf::decode(input, None).unwrap()));
println!(
"[{}]",
dtf::update_vec_to_json(&dtf::decode(input, None).unwrap())
);
return;
}
}
Loading

0 comments on commit ba6cec7

Please sign in to comment.