Skip to content

Commit

Permalink
Minimal queue code
Browse files Browse the repository at this point in the history
  • Loading branch information
mitsuhiko committed Aug 22, 2023
1 parent 3f667fb commit 46057a6
Show file tree
Hide file tree
Showing 7 changed files with 146 additions and 19 deletions.
3 changes: 3 additions & 0 deletions coda-supervisor/config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[queues]
events = ["process_event", "normalize_event", "store_event"]
symbolicate = ["symbolicate"]
7 changes: 0 additions & 7 deletions coda-supervisor/config.yml

This file was deleted.

7 changes: 6 additions & 1 deletion coda-supervisor/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use anyhow::Error;
use clap::{Parser, Subcommand};
use tracing::metadata::LevelFilter;

use crate::config::Config;
use crate::controller::Controller;

#[derive(Parser, Debug)]
Expand Down Expand Up @@ -32,7 +33,11 @@ pub struct RunCommand {
}

async fn run(cmd: RunCommand) -> Result<(), Error> {
let mut controller = Controller::new(&cmd.args, cmd.worker_count)?;
let config = match cmd.config {
Some(ref filename) => Config::from_path(filename)?,
None => Config::default(),
};
let mut controller = Controller::new(&cmd.args, cmd.worker_count, config)?;
controller.run().await?;
Ok(())
}
Expand Down
15 changes: 15 additions & 0 deletions coda-supervisor/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,19 @@ impl Config {
.with_context(|| format!("malformed config file '{}'", path.display()))?,
})
}

/// Returns the configured queues.
pub fn configured_queues(&self) -> &HashMap<String, HashSet<String>> {
&self.values.queues
}

/// Returns the queue name for a task name.
pub fn queue_name_for_task_name(&self, task_name: &str) -> &str {
for (queue, tasks) in self.values.queues.iter() {
if tasks.contains(task_name) {
return queue;
}
}
"default"
}
}
88 changes: 77 additions & 11 deletions coda-supervisor/src/controller.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use std::collections::HashSet;
use std::collections::{BTreeMap, HashMap, HashSet};
use std::ffi::OsString;
use std::future::Future;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;

use anyhow::{anyhow, Error};
use anyhow::{anyhow, bail, Error};
use bytes::{Buf, BufMut, BytesMut};
use ciborium::Value;
use nix::libc::ENXIO;
use nix::sys::stat;
use nix::unistd::mkfifo;
Expand All @@ -20,7 +22,10 @@ use tokio::{signal, time};
use tracing::{event, Level};
use uuid::Uuid;

use coda_ipc::{Fail, Message, Ping, WorkerDied};
use coda_ipc::{Message, WorkerDied};

use crate::config::Config;
use crate::task::Task;

pub struct Controller {
command: Vec<OsString>,
Expand All @@ -30,6 +35,8 @@ pub struct Controller {
worker_rx: mpsc::Receiver<(Uuid, Result<Message, Error>)>,
home: TempDir,
shutting_down: bool,
storage: Storage,
config: Config,
}

#[derive(Default, Debug)]
Expand All @@ -38,6 +45,25 @@ struct WorkerState {
workflows: HashSet<String>,
}

#[derive(Debug, Default)]
struct Storage {
params: HashMap<Uuid, Arc<BTreeMap<String, Value>>>,
task_queues: HashMap<String, TaskQueue>,
}

#[derive(Debug)]
struct TaskQueue {
tx: mpsc::Sender<Task>,
rx: mpsc::Receiver<Task>,
}

impl TaskQueue {
pub fn new() -> TaskQueue {
let (tx, rx) = mpsc::channel(20);
TaskQueue { tx, rx }
}
}

/// Represents a worker process.
pub struct Worker {
worker_id: Uuid,
Expand All @@ -47,7 +73,11 @@ pub struct Worker {

impl Controller {
/// Creates a fresh controller.
pub fn new(cmd: &[OsString], target_worker_count: usize) -> Result<Controller, Error> {
pub fn new(
cmd: &[OsString],
target_worker_count: usize,
config: Config,
) -> Result<Controller, Error> {
let (worker_tx, worker_rx) = mpsc::channel(20 * target_worker_count);
Ok(Controller {
command: cmd.iter().cloned().collect(),
Expand All @@ -57,6 +87,8 @@ impl Controller {
worker_rx,
home: tempfile::tempdir()?,
shutting_down: false,
storage: Storage::default(),
config,
})
}

Expand Down Expand Up @@ -87,12 +119,6 @@ impl Controller {
self.workers.push(worker??);
}

// make one worker fail
self.send_msg(self.workers[0].worker_id, Message::Fail(Fail {}))
.await?;
self.send_msg(self.workers[0].worker_id, Message::Ping(Ping {}))
.await?;

Ok(())
}

Expand Down Expand Up @@ -123,7 +149,12 @@ impl Controller {
Some((worker_id, rv)) = self.worker_rx.recv() => {
match rv {
Ok(msg) => {
self.handle_message(worker_id, msg).await?;
match self.handle_message(worker_id, msg).await {
Ok(()) => {}
Err(err) => {
event!(Level::ERROR, "message handler errored: {}", err);
}
}
}
Err(err) => {
event!(Level::ERROR, "worker errored: {}", err);
Expand Down Expand Up @@ -153,6 +184,26 @@ impl Controller {
self.workers.push(self.spawn_worker().await?);
}
}
Message::StoreParams(cmd) => {
self.storage
.params
.insert(cmd.params_id, Arc::new(cmd.params));
}
Message::SpawnTask(cmd) => {
let params = match self.storage.params.remove(&cmd.params_id) {
Some(params) => params,
None => bail!("cannot find parameters"),
};
let task = Task::new(
cmd.task_name,
cmd.task_id,
cmd.task_key,
params,
cmd.workflow_run_id,
cmd.persist_result,
);
self.enqueue_task(task).await?;
}
Message::WorkerStart(cmd) => {
let worker = self.get_worker(worker_id)?;
let mut state = worker.state.lock().await;
Expand All @@ -166,6 +217,21 @@ impl Controller {
}
Ok(())
}

async fn enqueue_task(&mut self, task: Task) -> Result<(), Error> {
let queue_name = self.config.queue_name_for_task_name(task.name());
let q = match self.storage.task_queues.get(queue_name) {
Some(q) => q,
None => {
self.storage
.task_queues
.insert(queue_name.to_string(), TaskQueue::new());
self.storage.task_queues.get(queue_name).unwrap()
}
};
q.tx.send(task).await?;
Ok(())
}
}

/// Spawns a worker that sends its messages into `worker_tx`.
Expand Down
7 changes: 7 additions & 0 deletions coda-supervisor/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
mod cli;
mod config;
mod controller;
mod task;

#[tokio::main]
async fn main() {
Expand All @@ -14,6 +15,12 @@ async fn main() {

if let Err(err) = cli::execute().await {
eprintln!("error: {}", err);
let mut source_opt = err.source();
while let Some(source) = source_opt {
eprintln!();
eprintln!("caused by: {source}");
source_opt = source.source();
}
std::process::exit(1);
}
}
38 changes: 38 additions & 0 deletions coda-supervisor/src/task.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
use std::collections::BTreeMap;
use std::sync::Arc;

use ciborium::Value;
use uuid::Uuid;

pub struct Task {
task_name: String,
task_id: Uuid,
task_key: Uuid,
params: Arc<BTreeMap<String, Value>>,
workflow_run_id: Uuid,
persist_result: bool,
}

impl Task {
pub fn new(
task_name: String,
task_id: Uuid,
task_key: Uuid,
params: Arc<BTreeMap<String, Value>>,
workflow_run_id: Uuid,
persist_result: bool,
) -> Task {
Task {
task_name,
task_id,
task_key,
params,
workflow_run_id,
persist_result,
}
}

pub fn name(&self) -> &str {
&self.task_name
}
}

0 comments on commit 46057a6

Please sign in to comment.