diff --git a/Cargo.lock b/Cargo.lock index f62da94..5980f58 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4,8 +4,10 @@ version = "0.1.0" dependencies = [ "bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)", "linked-hash-map 0.5.2 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", "prost 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)", "prost-build 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)", + "stderrlog 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)", "tokio 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -73,6 +75,16 @@ name = "cfg-if" version = "0.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "chrono" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "num-integer 0.1.39 (registry+https://github.com/rust-lang/crates.io-index)", + "num-traits 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)", + "time 0.1.42 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "cloudabi" version = "0.0.3" @@ -302,6 +314,19 @@ name = "nodrop" version = "0.1.13" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "num-integer" +version = "0.1.39" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "num-traits 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "num-traits" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "num_cpus" version = "1.10.0" @@ -574,6 +599,17 @@ name = "stable_deref_trait" version = "1.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "stderrlog" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "chrono 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", + "termcolor 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", + "thread_local 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "syn" version = "0.15.30" @@ -597,6 +633,32 @@ dependencies = [ "winapi 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "termcolor" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "wincolor 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "thread_local" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "lazy_static 1.3.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "time" +version = "0.1.42" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "libc 0.2.51 (registry+https://github.com/rust-lang/crates.io-index)", + "redox_syscall 0.1.54 (registry+https://github.com/rust-lang/crates.io-index)", + "winapi 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "tokio" version = "0.1.18" @@ -823,6 +885,14 @@ name = "winapi-x86_64-pc-windows-gnu" version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "wincolor" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "winapi 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "ws2_32-sys" version = "0.2.1" @@ -842,6 +912,7 @@ dependencies = [ "checksum bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)" = "206fdffcfa2df7cbe15601ef46c813fce0965eb3286db6b56c583b814b51c81c" "checksum cc 1.0.35 (registry+https://github.com/rust-lang/crates.io-index)" = "5e5f3fee5eeb60324c2781f1e41286bdee933850fff9b3c672587fed5ec58c83" "checksum cfg-if 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)" = "11d43355396e872eefb45ce6342e4374ed7bc2b3a502d1b28e36d6e23c05d1f4" +"checksum chrono 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)" = "45912881121cb26fad7c38c17ba7daa18764771836b34fab7d3fbd93ed633878" "checksum cloudabi 0.0.3 (registry+https://github.com/rust-lang/crates.io-index)" = "ddfc5b9aa5d4507acaf872de71051dfd0e309860e88966e1051e462a077aac4f" "checksum crossbeam-deque 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)" = "b18cd2e169ad86297e6bc0ad9aa679aee9daa4f19e8163860faf7c164e4f5a71" "checksum crossbeam-epoch 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)" = "04c9e3102cc2d69cd681412141b390abd55a362afc1540965dad0ad4d34280b4" @@ -872,6 +943,8 @@ dependencies = [ "checksum multimap 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "2eb04b9f127583ed176e163fb9ec6f3e793b87e21deedd5734a69386a18a0151" "checksum net2 0.2.33 (registry+https://github.com/rust-lang/crates.io-index)" = "42550d9fb7b6684a6d404d9fa7250c2eb2646df731d1c06afc06dcee9e1bcf88" "checksum nodrop 0.1.13 (registry+https://github.com/rust-lang/crates.io-index)" = "2f9667ddcc6cc8a43afc9b7917599d7216aa09c463919ea32c59ed6cac8bc945" +"checksum num-integer 0.1.39 (registry+https://github.com/rust-lang/crates.io-index)" = "e83d528d2677f0518c570baf2b7abdcf0cd2d248860b68507bdcb3e91d4c0cea" +"checksum num-traits 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)" = "0b3a5d7cc97d6d30d8b9bc8fa19bf45349ffe46241e8816f50f62f6d6aaabee1" "checksum num_cpus 1.10.0 (registry+https://github.com/rust-lang/crates.io-index)" = "1a23f0ed30a54abaa0c7e83b1d2d87ada7c3c23078d1d87815af3e3b6385fbba" "checksum owning_ref 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "49a4b8ea2179e6a2e27411d3bca09ca6dd630821cf6894c6c7c8467a8ee7ef13" "checksum parking_lot 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)" = "ab41b4aed082705d1056416ae4468b6ea99d52599ecf3169b00088d43113e337" @@ -904,8 +977,12 @@ dependencies = [ "checksum slab 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "c111b5bd5695e56cffe5129854aa230b39c93a305372fdbb2668ca2394eea9f8" "checksum smallvec 0.6.9 (registry+https://github.com/rust-lang/crates.io-index)" = "c4488ae950c49d403731982257768f48fada354a5203fe81f9bb6f43ca9002be" "checksum stable_deref_trait 1.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "dba1a27d3efae4351c8051072d619e3ade2820635c3958d826bfea39d59b54c8" +"checksum stderrlog 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)" = "61dc66b7ae72b65636dbf36326f9638fb3ba27871bb737a62e2c309b87d91b70" "checksum syn 0.15.30 (registry+https://github.com/rust-lang/crates.io-index)" = "66c8865bf5a7cbb662d8b011950060b3c8743dca141b054bf7195b20d314d8e2" "checksum tempfile 3.0.7 (registry+https://github.com/rust-lang/crates.io-index)" = "b86c784c88d98c801132806dadd3819ed29d8600836c4088e855cdf3e178ed8a" +"checksum termcolor 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)" = "adc4587ead41bf016f11af03e55a624c06568b5a19db4e90fde573d805074f83" +"checksum thread_local 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)" = "c6b53e329000edc2b34dbe8545fd20e55a333362d0a321909685a19bd28c3f1b" +"checksum time 0.1.42 (registry+https://github.com/rust-lang/crates.io-index)" = "db8dcfca086c1143c9270ac42a2bbd8a7ee477b78ac8e45b19abfb0cbede4b6f" "checksum tokio 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)" = "65641e515a437b308ab131a82ce3042ff9795bef5d6c5a9be4eb24195c417fd9" "checksum tokio-codec 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "5c501eceaf96f0e1793cf26beb63da3d11c738c4a943fdf3746d81d64684c39f" "checksum tokio-current-thread 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)" = "d16217cad7f1b840c5a97dfb3c43b0c871fef423a6e8d2118c604e843662a443" @@ -928,4 +1005,5 @@ dependencies = [ "checksum winapi-build 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "2d315eee3b34aca4797b2da6b13ed88266e6d612562a0c46390af8299fc699bc" "checksum winapi-i686-pc-windows-gnu 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" "checksum winapi-x86_64-pc-windows-gnu 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" +"checksum wincolor 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)" = "eeb06499a3a4d44302791052df005d5232b927ed1a9658146d842165c4de7767" "checksum ws2_32-sys 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "d59cefebd0c892fa2dd6de581e937301d8552cb44489cdff035c6187cb63fa5e" diff --git a/Cargo.toml b/Cargo.toml index 22412a4..3f2e6fd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,7 +8,9 @@ build = "build.rs" [dependencies] bytes = "0.4" linked-hash-map = "0.5" +log = "0.4" prost = "0.5" +stderrlog = "0.4" tokio = "0.1" [build-dependencies] diff --git a/src/arbiter.proto b/src/arbiter.proto index 97333e2..32c1045 100644 --- a/src/arbiter.proto +++ b/src/arbiter.proto @@ -2,18 +2,29 @@ syntax = "proto3"; package arbiter; +message IncomingMessage { + oneof message { + StartTransaction start_transaction = 1; + ResourcesAccessed resources_accessed = 2; + } +} + message StartTransaction { - uint64 txn_id = 1; + uint64 tid = 1; } -message ResourceRead { - uint64 txn_id = 1; - string resource_id = 2; +message ResourcesAccessed { + uint64 tid = 1; + repeated string read_rids = 2; + repeated string written_rids = 3; } -message IncomingMessage { +message OutgoingMessage { oneof message { - StartTransaction start_transaction = 1; - ResourceRead resource_read = 2; + InvalidRequest invalid_request = 1; } +} + +message InvalidRequest { + string description = 1; } \ No newline at end of file diff --git a/src/arbiter.rs b/src/arbiter.rs index bcddac1..81e39be 100644 --- a/src/arbiter.rs +++ b/src/arbiter.rs @@ -3,21 +3,20 @@ use std::collections::{HashSet, VecDeque}; use std::fmt::Debug; use std::hash::Hash; -pub type TID = u64; +type CID = u64; -pub struct Arbiter { - next_id: TID, - next_commit_id: TID, - completed_txns: VecDeque<(TID, HashSet)>, +pub struct Arbiter { + next_commit_id: CID, + completed_txns: VecDeque<(CID, HashSet)>, pending_commits: HashSet, txns: LinkedHashMap> } -impl Arbiter - where RID: Eq + Hash + Clone + Debug { +impl Arbiter + where TID: Eq + Hash + Clone + Debug, + RID: Eq + Hash + Clone + Debug { pub fn new() -> Self { Arbiter { - next_id: 0, next_commit_id: 0, completed_txns: VecDeque::new(), pending_commits: HashSet::new(), @@ -25,39 +24,38 @@ impl Arbiter } } - pub fn start_transaction(&mut self) -> TID { - let id = self.next_id; - self.next_id = self.next_id + 1; - self.txns.insert(id.clone(), TransactionState::new(id, self.next_commit_id)); - id + pub fn start_transaction(&mut self, id: TID) { + self.txns.insert(id, TransactionState::new(self.next_commit_id)); } - pub fn transaction_progress_many(&mut self, id: TID, read: HashSet, written: HashSet) -> Result { - self.with_transaction(id, |txn| txn.transaction_progress_many(read, written))?; + pub fn transaction_progress_many(&mut self, id: &TID, read: R, written: W) -> Result, TID> + where R: IntoIterator, + W: IntoIterator { + self.with_transaction(id, |txn| txn.transaction_progress_many(id, read, written))?; Ok(self.advance_txns()) } - pub fn transaction_progress_read(&mut self, id: TID, read: RID) -> Result { - self.with_transaction(id, |txn| txn.transaction_progress_read(read))?; + pub fn transaction_progress_read(&mut self, id: &TID, read: RID) -> Result, TID> { + self.with_transaction(id, |txn| txn.transaction_progress_read(id, read))?; Ok(self.advance_txns()) } - pub fn transaction_progress_write(&mut self, id: TID, written: RID) -> Result { - self.with_transaction(id, |txn| txn.transaction_progress_write(written))?; + pub fn transaction_progress_write(&mut self, id: &TID, written: RID) -> Result, TID> { + self.with_transaction(id, |txn| txn.transaction_progress_write(id, written))?; Ok(self.advance_txns()) } - pub fn start_commit(&mut self, id: TID) -> Result { - self.with_transaction(id, |txn| txn.wait_commit())?; + pub fn start_commit(&mut self, id: &TID) -> Result, TID> { + self.with_transaction(id, |txn| txn.wait_commit(id))?; Ok(self.advance_txns()) } - pub fn commit_completed(&mut self, id: TID) -> Result { - let txn = self.txns.remove(&id).ok_or_else(|| ArbiterError::UnknownTransaction(id))?; + pub fn commit_completed(&mut self, id: &TID) -> Result, TID> { + let txn = self.txns.remove(&id).ok_or_else(|| Error::UnknownTransaction(id.clone()))?; if txn.state_type != TransactionStateType::Committing { // TODO: This changes the order - self.txns.insert(id, txn); - return Err(ArbiterError::InvalidTransactionState(id)); + self.txns.insert(id.clone(), txn); + return Err(Error::InvalidTransactionState(id.clone())); } let commit_id = self.next_commit_id; @@ -70,7 +68,7 @@ impl Arbiter Ok(self.advance_txns()) } - fn advance_txns(&mut self) -> TransactionUpdate { + fn advance_txns(&mut self) -> TransactionUpdate { let mut result = TransactionUpdate::no_change(); result.merge(self.advance_impossible_writes()); result.merge(self.advance_pending_commits()); @@ -80,15 +78,15 @@ impl Arbiter result } - fn advance_impossible_writes(&mut self) -> TransactionUpdate { + fn advance_impossible_writes(&mut self) -> TransactionUpdate { let mut result = TransactionUpdate::no_change(); - for (_, txn) in self.txns.iter_mut().rev() { + for (id, txn) in self.txns.iter_mut().rev() { match txn.state_type { TransactionStateType::InProgress | TransactionStateType::WaitCommit => { for (seq, committed_writes) in self.completed_txns.iter() { if *seq >= txn.created_commit && txn.is_commit_prevented(&committed_writes) { - result.mark_failed(txn.id); + result.mark_failed(id.clone()); } } } @@ -103,14 +101,14 @@ impl Arbiter result } - fn advance_pending_commits(&mut self) -> TransactionUpdate { + fn advance_pending_commits(&mut self) -> TransactionUpdate { let mut result = TransactionUpdate::no_change(); - for (_, txn) in self.txns.iter_mut() { + for (id, txn) in self.txns.iter_mut() { match txn.state_type { TransactionStateType::WaitCommit => { - if txn.try_commit(&self.pending_commits).unwrap() { + if txn.try_commit(id, &self.pending_commits).unwrap() { self.pending_commits.extend(txn.written.iter().cloned()); - result.mark_can_commit(txn.id); + result.mark_can_commit(id.clone()); } } _ => {} @@ -130,15 +128,14 @@ impl Arbiter } } - fn with_transaction(&mut self, id: TID, f: F) -> Result - where F: FnOnce(&mut TransactionState) -> Result { - self.txns.get_mut(&id).map_or(Err(ArbiterError::UnknownTransaction(id)), f) + fn with_transaction(&mut self, id: &TID, f: F) -> Result + where F: FnOnce(&mut TransactionState) -> Result { + self.txns.get_mut(id).map_or(Err(Error::UnknownTransaction(id.clone())), f) } } struct TransactionState { - id: TID, - created_commit: TID, + created_commit: CID, state_type: TransactionStateType, read: HashSet, written: HashSet @@ -146,9 +143,8 @@ struct TransactionState { impl TransactionState where RID: Eq + Hash + Clone { - fn new(id: TID, created_commit: TID) -> TransactionState { + fn new(created_commit: CID) -> TransactionState { TransactionState { - id, created_commit, state_type: TransactionStateType::InProgress, read: HashSet::new(), @@ -156,7 +152,7 @@ impl TransactionState } } - fn transaction_progress_many (&mut self, read: R, written: W) -> Result<()> + fn transaction_progress_many (&mut self, id: &TID, read: R, written: W) -> Result<(), TID> where R: IntoIterator, W: IntoIterator { match self.state_type { @@ -164,40 +160,40 @@ impl TransactionState self.read.extend(read.into_iter()); self.written.extend(written.into_iter()); }, - _ => return Err(ArbiterError::InvalidTransactionState(self.id)) + _ => return Err(Error::InvalidTransactionState(id.clone())) } Ok(()) } - fn transaction_progress_read(&mut self, read: RID) -> Result<()> { + fn transaction_progress_read(&mut self, id: &TID, read: RID) -> Result<(), TID> { match self.state_type { TransactionStateType::InProgress => { self.read.insert(read); }, - _ => return Err(ArbiterError::InvalidTransactionState(self.id)) + _ => return Err(Error::InvalidTransactionState(id.clone())) } Ok(()) } - fn transaction_progress_write(&mut self, written: RID) -> Result<()> { + fn transaction_progress_write(&mut self, id: &TID, written: RID) -> Result<(), TID> { match self.state_type { TransactionStateType::InProgress => { self.written.insert(written); }, - _ => return Err(ArbiterError::InvalidTransactionState(self.id)) + _ => return Err(Error::InvalidTransactionState(id.clone())) } Ok(()) } - fn wait_commit(&mut self) -> Result<()> { + fn wait_commit(&mut self, id: &TID) -> Result<(), TID> { match self.state_type { TransactionStateType::InProgress => self.state_type = TransactionStateType::WaitCommit, - _ => return Err(ArbiterError::InvalidTransactionState(self.id)) + _ => return Err(Error::InvalidTransactionState(id.clone())) } Ok(()) } - fn try_commit(&mut self, pending_writes: &HashSet) -> Result { + fn try_commit(&mut self, id: &TID, pending_writes: &HashSet) -> Result { match self.state_type { TransactionStateType::WaitCommit => { let can_commit = self.written.is_disjoint(pending_writes); @@ -206,14 +202,14 @@ impl TransactionState } Ok(can_commit) }, - _ => return Err(ArbiterError::InvalidTransactionState(self.id)) + _ => return Err(Error::InvalidTransactionState(id.clone())) } } - fn commit_completed(&mut self) -> Result<()> { + fn commit_completed(&mut self, id: &TID) -> Result<(), TID> { match self.state_type { TransactionStateType::Committing => {} - _ => return Err(ArbiterError::InvalidTransactionState(self.id.clone())) + _ => return Err(Error::InvalidTransactionState(id.clone())) } Ok(()) } @@ -235,20 +231,20 @@ enum TransactionStateType { } #[derive(Debug, PartialEq, Eq)] -pub enum ArbiterError { +pub enum Error { UnknownTransaction(TID), InvalidTransactionState(TID) } -pub type Result = std::result::Result; +pub type Result = std::result::Result>; #[derive(Debug, PartialEq, Eq)] -pub struct TransactionUpdate { +pub struct TransactionUpdate { can_commit: Vec, failed: Vec } -impl TransactionUpdate { +impl TransactionUpdate { fn no_change() -> Self { TransactionUpdate { can_commit: Vec::new(), @@ -288,61 +284,61 @@ mod tests { #[test] fn single_txn() { - let mut arbiter: Arbiter = Arbiter::new(); - let id = arbiter.start_transaction(); - assert_eq!(Ok(TransactionUpdate::no_change()), arbiter.transaction_progress_read(id, 100)); - assert_eq!(Ok(TransactionUpdate::no_change()), arbiter.transaction_progress_write(id, 100)); - assert_eq!(Ok(TransactionUpdate::with_can_commit(id)), arbiter.start_commit(id)); - assert_eq!(Ok(TransactionUpdate::no_change()), arbiter.commit_completed(id)); + let mut arbiter: Arbiter = Arbiter::new(); + arbiter.start_transaction(1); + assert_eq!(Ok(TransactionUpdate::no_change()), arbiter.transaction_progress_read(&1, 100)); + assert_eq!(Ok(TransactionUpdate::no_change()), arbiter.transaction_progress_write(&1, 100)); + assert_eq!(Ok(TransactionUpdate::with_can_commit(1)), arbiter.start_commit(&1)); + assert_eq!(Ok(TransactionUpdate::no_change()), arbiter.commit_completed(&1)); } #[test] fn blocking_txn() { - let mut arbiter: Arbiter = Arbiter::new(); - let id1 = arbiter.start_transaction(); - let id2 = arbiter.start_transaction(); - assert_eq!(Ok(TransactionUpdate::no_change()), arbiter.transaction_progress_read(id1, 100)); - assert_eq!(Ok(TransactionUpdate::no_change()), arbiter.transaction_progress_write(id1, 100)); - assert_eq!(Ok(TransactionUpdate::with_can_commit(id1)), arbiter.start_commit(id1)); - assert_eq!(Ok(TransactionUpdate::no_change()), arbiter.transaction_progress_write(id2, 100)); - assert_eq!(Ok(TransactionUpdate::no_change()), arbiter.start_commit(id2)); - assert_eq!(Ok(TransactionUpdate::with_can_commit(id2)), arbiter.commit_completed(id1)); + let mut arbiter: Arbiter = Arbiter::new(); + arbiter.start_transaction(1); + arbiter.start_transaction(2); + assert_eq!(Ok(TransactionUpdate::no_change()), arbiter.transaction_progress_read(&1, 100)); + assert_eq!(Ok(TransactionUpdate::no_change()), arbiter.transaction_progress_write(&1, 100)); + assert_eq!(Ok(TransactionUpdate::with_can_commit(1)), arbiter.start_commit(&1)); + assert_eq!(Ok(TransactionUpdate::no_change()), arbiter.transaction_progress_write(&2, 100)); + assert_eq!(Ok(TransactionUpdate::no_change()), arbiter.start_commit(&2)); + assert_eq!(Ok(TransactionUpdate::with_can_commit(2)), arbiter.commit_completed(&1)); } #[test] fn non_blocking_txn() { - let mut arbiter: Arbiter = Arbiter::new(); - let id1 = arbiter.start_transaction(); - let id2 = arbiter.start_transaction(); - assert_eq!(Ok(TransactionUpdate::no_change()), arbiter.transaction_progress_read(id1, 100)); - assert_eq!(Ok(TransactionUpdate::no_change()), arbiter.transaction_progress_write(id1, 100)); - assert_eq!(Ok(TransactionUpdate::with_can_commit(id1)), arbiter.start_commit(id1)); - assert_eq!(Ok(TransactionUpdate::no_change()), arbiter.transaction_progress_write(id2, 101)); - assert_eq!(Ok(TransactionUpdate::with_can_commit(id2)), arbiter.start_commit(id2)); - assert_eq!(Ok(TransactionUpdate::no_change()), arbiter.commit_completed(id2)); + let mut arbiter: Arbiter = Arbiter::new(); + arbiter.start_transaction(1); + arbiter.start_transaction(2); + assert_eq!(Ok(TransactionUpdate::no_change()), arbiter.transaction_progress_read(&1, 100)); + assert_eq!(Ok(TransactionUpdate::no_change()), arbiter.transaction_progress_write(&1, 100)); + assert_eq!(Ok(TransactionUpdate::with_can_commit(1)), arbiter.start_commit(&1)); + assert_eq!(Ok(TransactionUpdate::no_change()), arbiter.transaction_progress_write(&2, 101)); + assert_eq!(Ok(TransactionUpdate::with_can_commit(2)), arbiter.start_commit(&2)); + assert_eq!(Ok(TransactionUpdate::no_change()), arbiter.commit_completed(&2)); } #[test] fn read_write_conflict_before_commit() { - let mut arbiter: Arbiter = Arbiter::new(); - let id1 = arbiter.start_transaction(); - let id2 = arbiter.start_transaction(); - assert_eq!(Ok(TransactionUpdate::no_change()), arbiter.transaction_progress_read(id1, 100)); - assert_eq!(Ok(TransactionUpdate::no_change()), arbiter.transaction_progress_write(id1, 100)); - assert_eq!(Ok(TransactionUpdate::no_change()), arbiter.transaction_progress_read(id2, 100)); - assert_eq!(Ok(TransactionUpdate::with_can_commit(id1)), arbiter.start_commit(id1)); - assert_eq!(Ok(TransactionUpdate::with_failed(id2)), arbiter.commit_completed(id1)); + let mut arbiter: Arbiter = Arbiter::new(); + arbiter.start_transaction(1); + arbiter.start_transaction(2); + assert_eq!(Ok(TransactionUpdate::no_change()), arbiter.transaction_progress_read(&1, 100)); + assert_eq!(Ok(TransactionUpdate::no_change()), arbiter.transaction_progress_write(&1, 100)); + assert_eq!(Ok(TransactionUpdate::no_change()), arbiter.transaction_progress_read(&2, 100)); + assert_eq!(Ok(TransactionUpdate::with_can_commit(1)), arbiter.start_commit(&1)); + assert_eq!(Ok(TransactionUpdate::with_failed(2)), arbiter.commit_completed(&1)); } #[test] fn read_write_conflict_after_commit() { - let mut arbiter: Arbiter = Arbiter::new(); - let id1 = arbiter.start_transaction(); - let id2 = arbiter.start_transaction(); - assert_eq!(Ok(TransactionUpdate::no_change()), arbiter.transaction_progress_read(id1, 100)); - assert_eq!(Ok(TransactionUpdate::no_change()), arbiter.transaction_progress_write(id1, 100)); - assert_eq!(Ok(TransactionUpdate::with_can_commit(id1)), arbiter.start_commit(id1)); - assert_eq!(Ok(TransactionUpdate::no_change()), arbiter.commit_completed(id1)); - assert_eq!(Ok(TransactionUpdate::with_failed(id2)), arbiter.transaction_progress_read(id2, 100)); + let mut arbiter: Arbiter = Arbiter::new(); + arbiter.start_transaction(1); + arbiter.start_transaction(2); + assert_eq!(Ok(TransactionUpdate::no_change()), arbiter.transaction_progress_read(&1, 100)); + assert_eq!(Ok(TransactionUpdate::no_change()), arbiter.transaction_progress_write(&1, 100)); + assert_eq!(Ok(TransactionUpdate::with_can_commit(1)), arbiter.start_commit(&1)); + assert_eq!(Ok(TransactionUpdate::no_change()), arbiter.commit_completed(&1)); + assert_eq!(Ok(TransactionUpdate::with_failed(2)), arbiter.transaction_progress_read(&2, 100)); } } \ No newline at end of file diff --git a/src/coordinator.rs b/src/coordinator.rs new file mode 100644 index 0000000..8329b1e --- /dev/null +++ b/src/coordinator.rs @@ -0,0 +1,76 @@ +use crate::arbiter::{self, Arbiter}; +use crate::messages; +use std::collections::HashMap; +use tokio::sync::mpsc::UnboundedReceiver; + +pub type SID = u64; +pub type Result = std::result::Result; +pub type TID = (SID, u64); +pub type RID = String; + +pub struct Coordinator { + next_sid: SID, + arbiter: Arbiter, + sessions: HashMap +} + +impl Coordinator { + pub fn new(arbiter: Arbiter) -> Self { + Coordinator { + next_sid: 0, + arbiter, + sessions: HashMap::new() + } + } + + pub fn start_session(&mut self, queue: UnboundedReceiver) -> SID { + let sid = self.next_sid; + self.next_sid = self.next_sid + 1; + + let session = Session { + id: sid, + queue + }; + self.sessions.insert(sid, session); + + sid + } + + pub fn handle_message(&mut self, sid: SID, message: messages::IncomingMessage) { + let res = message.message + .ok_or(Error::MalformedRequest(String::from("message was not specified"))) + .and_then(|message| { + match message { + messages::incoming_message::Message::StartTransaction(message) => self.start_transaction(sid, message), + messages::incoming_message::Message::ResourcesAccessed(message) => self.resources_accessed(sid, message) + } + }); + } + + fn start_transaction(&mut self, sid: SID, message: messages::StartTransaction) -> Result<()> { + let tid = (sid, message.tid); + self.arbiter.start_transaction(tid); + Ok(()) + } + + fn resources_accessed(&mut self, sid: SID, message: messages::ResourcesAccessed) -> Result<()> { + let tid = (sid, message.tid); + let res = self.arbiter.transaction_progress_many(&tid, message.read_rids, message.written_rids); + + self.handle_result(res); + Ok(()) + } + + fn handle_result(&mut self, res: arbiter::Result, TID>) { + + } +} + +struct Session { + id: SID, + queue: UnboundedReceiver +} + +pub enum Error { + MalformedRequest(String) +} \ No newline at end of file diff --git a/src/main.rs b/src/main.rs index 4bccf98..541cebe 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,10 +1,35 @@ mod arbiter; +mod coordinator; mod messages; +use log::{error, info}; +use std::rc::Rc; +use std::cell::RefCell; +use tokio::prelude::*; use tokio::net::TcpListener; +use tokio::runtime::current_thread::{run, spawn}; + +type State = Rc>; fn main() { + stderrlog::new() + .verbosity(4) + .timestamp(stderrlog::Timestamp::Millisecond) + .init().unwrap(); + + let arbiter = arbiter::Arbiter::new(); + let coordinator = coordinator::Coordinator::new(arbiter); + let state: State = Rc::new(RefCell::new(coordinator)); + let addr = "127.0.0.1:4585".parse().unwrap(); - let listener = TcpListener::bind(&addr) - .expect("unable to bind TCP listener"); + info!("Listening on {}", addr); + let listener = TcpListener::bind(&addr).expect("unable to bind TCP listener"); + + let server = listener.incoming() + .map_err(|e| error!("Error accepting connections: {}", e)) + .for_each(|sock| { + Ok(()) + }); + + run(server); }