diff --git a/Cargo.lock b/Cargo.lock index 8e4e610..48e6e72 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2027,6 +2027,7 @@ dependencies = [ "r2d2", "r2d2_redis", "redis", + "serde", "uuid", ] @@ -2756,6 +2757,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9fde2f6a4bea1d6e007c4ad38c6839fa71cbb63b6dbf5b595aa38dc9b1093c11" dependencies = [ "rand", + "serde", ] [[package]] diff --git a/scout/Cargo.toml b/scout/Cargo.toml index 6a187d6..56dbda5 100644 --- a/scout/Cargo.toml +++ b/scout/Cargo.toml @@ -8,8 +8,9 @@ edition = "2018" [dependencies] pathfinding = "2.0.4" -uuid = { version = "0.8.1", features = ["v4"] } +uuid = { version = "0.8.1", features = ["v4", "serde"] } r2d2 = "0.8" redis = "0.15" r2d2_redis = { git = "https://github.com/sorccu/r2d2-redis"} anyhow = "1.0.28" +serde = "1.0.63" \ No newline at end of file diff --git a/scout/src/node/mod.rs b/scout/src/node/mod.rs index d45a375..2fd16e9 100644 --- a/scout/src/node/mod.rs +++ b/scout/src/node/mod.rs @@ -1,6 +1,8 @@ use std::default::Default; -use std::{sync::{atomic::{AtomicPtr, Ordering::SeqCst}}}; +use std::{sync::{atomic::{AtomicPtr, Ordering::SeqCst}}, time::{SystemTime}}; +use serde::{Serialize, Deserialize}; use uuid::Uuid; +use crate::pool::Claim; #[derive(Debug)] pub struct AudioUnit { @@ -95,10 +97,7 @@ impl std::hash::Hash for IO { } } -#[derive(Clone, PartialEq)] -pub struct Claim { - pub uid: Uuid, -} + impl std::ops::Drop for IO { fn drop(&mut self) { diff --git a/scout/src/pool/mock.rs b/scout/src/pool/mock.rs index b10a83a..904b088 100644 --- a/scout/src/pool/mock.rs +++ b/scout/src/pool/mock.rs @@ -12,7 +12,7 @@ impl MockPool { } impl PoolTrait for MockPool { - fn claim(&self, resource: Uuid, claimant: Uuid, expire: Option) -> Result { + fn claim(&self, resource: Uuid, claimant: Uuid, expire: Option) -> Result { Ok(Uuid::new_v4()) } diff --git a/scout/src/pool/mod.rs b/scout/src/pool/mod.rs index 9ffad55..f6d4a9d 100644 --- a/scout/src/pool/mod.rs +++ b/scout/src/pool/mod.rs @@ -1,19 +1,26 @@ use std::{ - ops::DerefMut, - time::Duration, - thread, + time::SystemTime, }; use uuid::Uuid; +use serde::{Serialize, Deserialize}; type Result = std::result::Result; pub mod mock; pub mod redis; +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +pub struct Claim { + pub uid: Uuid, + pub resource: Uuid, + pub claimant: Uuid, + pub expire: Option, +} + /// pub trait PoolTrait { /// - fn claim(&self, resource: Uuid, claimant: Uuid, expire: Option) -> Result; + fn claim(&self, resource: Uuid, claimant: Uuid, expire: Option) -> Result; /// fn release(&self, resource: Uuid) -> Result<()>; diff --git a/scout/src/pool/redis.rs b/scout/src/pool/redis.rs index 870b99d..5ccfb8f 100644 --- a/scout/src/pool/redis.rs +++ b/scout/src/pool/redis.rs @@ -1,4 +1,5 @@ use super::*; +use std::ops::DerefMut; use r2d2_redis::{ r2d2, redis, @@ -24,12 +25,41 @@ impl RedisPool { } impl PoolTrait for RedisPool { - fn claim(&self, resource: Uuid, claimant: Uuid, expire: Option) -> Result { + fn claim(&self, resource: Uuid, claimant: Uuid, expire: Option) -> Result { let mut conn = self.pool.get()?; + { + let mut pubsub = conn.deref_mut().as_pubsub(); + pubsub.subscribe("paradise")?; + loop { + let msg = pubsub.get_message()?; + let payload : String = msg.get_payload()?; + println!("channel '{}': {}", msg.get_channel_name(), payload); + } + } + // Generate a novel uid for the claim attempt - let claim_uid = Uuid::new_v4(); + let claim = Claim { + uid: Uuid::new_v4(), + resource, + claimant, + expire: None, + }; + // TODO: write redis script to claim given uid - let reply = redis::cmd("PING").query::(conn.deref_mut())?; + let mut cmd = redis::cmd("SET"); + if let Some(expire) = expire { + //cmd = *cmd.arg("PX") + // .arg(expire.as_millis().to_string()) + } + let _: () = cmd + .arg("NX") + .query::<()>(conn.deref_mut())?; + // TODO: announce claim over redis + + redis::cmd("PUBLISH") + .arg("paradise") + .arg("") + .query(conn.deref_mut())?; Ok(Uuid::new_v4()) }