Skip to content

Commit

Permalink
...
Browse files Browse the repository at this point in the history
  • Loading branch information
thavlik committed Apr 29, 2020
1 parent 01b39bc commit 3e820ee
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 14 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion scout/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ edition = "2018"

[dependencies]
pathfinding = "2.0.4"
uuid = { version = "0.8.1", features = ["v4"] }
uuid = { version = "0.8.1", features = ["v4", "serde"] }
r2d2 = "0.8"
redis = "0.15"
r2d2_redis = { git = "https://github.com/sorccu/r2d2-redis"}
anyhow = "1.0.28"
serde = "1.0.63"
9 changes: 4 additions & 5 deletions scout/src/node/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use std::default::Default;
use std::{sync::{atomic::{AtomicPtr, Ordering::SeqCst}}};
use std::{sync::{atomic::{AtomicPtr, Ordering::SeqCst}}, time::{SystemTime}};
use serde::{Serialize, Deserialize};
use uuid::Uuid;
use crate::pool::Claim;

#[derive(Debug)]
pub struct AudioUnit {
Expand Down Expand Up @@ -95,10 +97,7 @@ impl std::hash::Hash for IO {
}
}

#[derive(Clone, PartialEq)]
pub struct Claim {
pub uid: Uuid,
}


impl std::ops::Drop for IO {
fn drop(&mut self) {
Expand Down
2 changes: 1 addition & 1 deletion scout/src/pool/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ impl MockPool {
}

impl PoolTrait for MockPool {
fn claim(&self, resource: Uuid, claimant: Uuid, expire: Option<Duration>) -> Result<Uuid> {
fn claim(&self, resource: Uuid, claimant: Uuid, expire: Option<SystemTime>) -> Result<Uuid> {
Ok(Uuid::new_v4())
}

Expand Down
15 changes: 11 additions & 4 deletions scout/src/pool/mod.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,26 @@
use std::{
ops::DerefMut,
time::Duration,
thread,
time::SystemTime,
};
use uuid::Uuid;
use serde::{Serialize, Deserialize};

type Result<T> = std::result::Result<T, anyhow::Error>;

pub mod mock;
pub mod redis;

#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
pub struct Claim {
pub uid: Uuid,
pub resource: Uuid,
pub claimant: Uuid,
pub expire: Option<SystemTime>,
}

///
pub trait PoolTrait {
///
fn claim(&self, resource: Uuid, claimant: Uuid, expire: Option<Duration>) -> Result<Uuid>;
fn claim(&self, resource: Uuid, claimant: Uuid, expire: Option<SystemTime>) -> Result<Uuid>;

///
fn release(&self, resource: Uuid) -> Result<()>;
Expand Down
36 changes: 33 additions & 3 deletions scout/src/pool/redis.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use super::*;
use std::ops::DerefMut;
use r2d2_redis::{
r2d2,
redis,
Expand All @@ -24,12 +25,41 @@ impl RedisPool {
}

impl PoolTrait for RedisPool {
fn claim(&self, resource: Uuid, claimant: Uuid, expire: Option<Duration>) -> Result<Uuid> {
fn claim(&self, resource: Uuid, claimant: Uuid, expire: Option<SystemTime>) -> Result<Uuid> {
let mut conn = self.pool.get()?;
{
let mut pubsub = conn.deref_mut().as_pubsub();
pubsub.subscribe("paradise")?;
loop {
let msg = pubsub.get_message()?;
let payload : String = msg.get_payload()?;
println!("channel '{}': {}", msg.get_channel_name(), payload);
}
}

// Generate a novel uid for the claim attempt
let claim_uid = Uuid::new_v4();
let claim = Claim {
uid: Uuid::new_v4(),
resource,
claimant,
expire: None,
};

// TODO: write redis script to claim given uid
let reply = redis::cmd("PING").query::<String>(conn.deref_mut())?;
let mut cmd = redis::cmd("SET");
if let Some(expire) = expire {
//cmd = *cmd.arg("PX")
// .arg(expire.as_millis().to_string())
}
let _: () = cmd
.arg("NX")
.query::<()>(conn.deref_mut())?;
// TODO: announce claim over redis

redis::cmd("PUBLISH")
.arg("paradise")
.arg("")
.query(conn.deref_mut())?;
Ok(Uuid::new_v4())
}

Expand Down

0 comments on commit 3e820ee

Please sign in to comment.