Skip to content

Commit

Permalink
feat: allow backends to emit errors (#454)
Browse files Browse the repository at this point in the history
* feat: allow backends to emit errors

* lint: fmt

* fix: pass in a reference to prevent mutation
  • Loading branch information
geofmureithi authored Nov 23, 2024
1 parent 995be7c commit 9863a60
Show file tree
Hide file tree
Showing 13 changed files with 228 additions and 83 deletions.
2 changes: 1 addition & 1 deletion examples/redis-mq-example/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ where

type Layer = AckLayer<Self, Req, RedisMqContext, Res>;

fn poll<Svc>(mut self, _worker_id: WorkerId) -> Poller<Self::Stream, Self::Layer> {
fn poll<Svc>(mut self, _worker: &Worker<Context>) -> Poller<Self::Stream, Self::Layer> {
let (mut tx, rx) = mpsc::channel(self.config.get_buffer_size());
let stream: RequestStream<Request<Req, RedisMqContext>> = Box::pin(rx);
let layer = AckLayer::new(self.clone());
Expand Down
9 changes: 5 additions & 4 deletions packages/apalis-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use futures::Stream;
use poller::Poller;
use serde::{Deserialize, Serialize};
use tower::Service;
use worker::WorkerId;
use worker::{Context, Worker};

/// Represent utilities for creating worker instances.
pub mod builder;
Expand Down Expand Up @@ -81,7 +81,7 @@ pub trait Backend<Req, Res> {
/// Returns a poller that is ready for streaming
fn poll<Svc: Service<Req, Response = Res>>(
self,
worker: WorkerId,
worker: &Worker<Context>,
) -> Poller<Self::Stream, Self::Layer>;
}
/// A codec allows backends to encode and decode data
Expand Down Expand Up @@ -165,7 +165,7 @@ pub mod test_utils {
use crate::error::BoxDynError;
use crate::request::Request;
use crate::task::task_id::TaskId;
use crate::worker::WorkerId;
use crate::worker::{Worker, WorkerId};
use crate::Backend;
use futures::channel::mpsc::{channel, Receiver, Sender};
use futures::future::BoxFuture;
Expand Down Expand Up @@ -264,8 +264,9 @@ pub mod test_utils {
>>::Future: Send + 'static,
{
let worker_id = WorkerId::new("test-worker");
let worker = Worker::new(worker_id, crate::worker::Context::default());
let b = backend.clone();
let mut poller = b.poll::<S>(worker_id);
let mut poller = b.poll::<S>(&worker);
let (stop_tx, mut stop_rx) = channel::<()>(1);

let (mut res_tx, res_rx) = channel(10);
Expand Down
4 changes: 2 additions & 2 deletions packages/apalis-core/src/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::{
mq::MessageQueue,
poller::{controller::Controller, stream::BackendStream},
request::{Request, RequestStream},
worker::WorkerId,
worker::{self, Worker},
Backend, Poller,
};
use futures::{
Expand Down Expand Up @@ -101,7 +101,7 @@ impl<T: Send + 'static + Sync, Res> Backend<Request<T, ()>, Res> for MemoryStora

type Layer = Identity;

fn poll<Svc>(self, _worker: WorkerId) -> Poller<Self::Stream> {
fn poll<Svc>(self, _worker: &Worker<worker::Context>) -> Poller<Self::Stream> {
let stream = self.inner.map(|r| Ok(Some(r))).boxed();
Poller {
stream: BackendStream::new(stream, self.controller),
Expand Down
6 changes: 3 additions & 3 deletions packages/apalis-core/src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::{
error::Error,
poller::Poller,
task::{attempt::Attempt, namespace::Namespace, task_id::TaskId},
worker::WorkerId,
worker::{Context, Worker},
Backend,
};

Expand Down Expand Up @@ -111,10 +111,10 @@ impl<T, Res, Ctx> Backend<Request<T, Ctx>, Res> for RequestStream<Request<T, Ctx

type Layer = Identity;

fn poll<Svc>(self, _worker: WorkerId) -> Poller<Self::Stream> {
fn poll<Svc>(self, _worker: &Worker<Context>) -> Poller<Self::Stream> {
Poller {
stream: self,
heartbeat: Box::pin(async {}),
heartbeat: Box::pin(futures::future::pending()),
layer: Identity::new(),
}
}
Expand Down
4 changes: 2 additions & 2 deletions packages/apalis-core/src/worker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ impl<S, P> Worker<Ready<S, P>> {
};
let backend = self.state.backend;
let service = self.state.service;
let poller = backend.poll::<S>(worker_id.clone());
let poller = backend.poll::<S>(&worker);
let stream = poller.stream;
let heartbeat = poller.heartbeat.boxed();
let layer = poller.layer;
Expand Down Expand Up @@ -387,7 +387,7 @@ impl Future for Runnable {
}

/// Stores the Workers context
#[derive(Clone)]
#[derive(Clone, Default)]
pub struct Context {
task_count: Arc<AtomicUsize>,
wakers: Arc<Mutex<Vec<Waker>>>,
Expand Down
6 changes: 3 additions & 3 deletions packages/apalis-cron/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ use apalis_core::layers::Identity;
use apalis_core::poller::Poller;
use apalis_core::request::RequestStream;
use apalis_core::task::namespace::Namespace;
use apalis_core::worker::WorkerId;
use apalis_core::worker::{Context, Worker};
use apalis_core::Backend;
use apalis_core::{error::Error, request::Request};
use chrono::{DateTime, TimeZone, Utc};
Expand Down Expand Up @@ -145,8 +145,8 @@ where

type Layer = Identity;

fn poll<Svc>(self, _worker: WorkerId) -> Poller<Self::Stream, Self::Layer> {
fn poll<Svc>(self, _worker: &Worker<Context>) -> Poller<Self::Stream, Self::Layer> {
let stream = self.into_stream();
Poller::new(stream, async {})
Poller::new(stream, futures::future::pending())
}
}
1 change: 1 addition & 0 deletions packages/apalis-redis/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ tokio = { version = "1", features = ["rt", "net"], optional = true }
async-std = { version = "1.13.0", optional = true }
async-trait = "0.1.80"
tower = "0.4"
thiserror = "1"


[dev-dependencies]
Expand Down
1 change: 1 addition & 0 deletions packages/apalis-redis/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,6 @@ mod storage;
pub use storage::connect;
pub use storage::Config;
pub use storage::RedisContext;
pub use storage::RedisPollError;
pub use storage::RedisQueueInfo;
pub use storage::RedisStorage;
55 changes: 42 additions & 13 deletions packages/apalis-redis/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ use apalis_core::service_fn::FromRequest;
use apalis_core::storage::Storage;
use apalis_core::task::namespace::Namespace;
use apalis_core::task::task_id::TaskId;
use apalis_core::worker::WorkerId;
use apalis_core::worker::{Event, Worker, WorkerId};
use apalis_core::{Backend, Codec};
use chrono::{DateTime, Utc};
use futures::channel::mpsc::{self, Sender};
use futures::channel::mpsc::{self, SendError, Sender};
use futures::{select, FutureExt, SinkExt, StreamExt, TryFutureExt};
use log::*;
use redis::aio::ConnectionLike;
Expand Down Expand Up @@ -106,6 +106,34 @@ impl<Req> FromRequest<Request<Req, RedisContext>> for RedisContext {
}
}

/// Errors that can occur while polling a Redis backend.
#[derive(thiserror::Error, Debug)]
pub enum RedisPollError {
/// Error during a keep-alive heartbeat.
#[error("KeepAlive heartbeat encountered an error: `{0}`")]
KeepAliveError(RedisError),

/// Error during enqueueing scheduled tasks.
#[error("EnqueueScheduled heartbeat encountered an error: `{0}`")]
EnqueueScheduledError(RedisError),

/// Error during polling for the next task or message.
#[error("PollNext heartbeat encountered an error: `{0}`")]
PollNextError(RedisError),

/// Error during enqueueing tasks for worker consumption.
#[error("Enqueue for worker consumption encountered an error: `{0}`")]
EnqueueError(SendError),

/// Error during acknowledgment of tasks.
#[error("Ack heartbeat encountered an error: `{0}`")]
AckError(RedisError),

/// Error during re-enqueuing orphaned tasks.
#[error("ReenqueueOrphaned heartbeat encountered an error: `{0}`")]
ReenqueueOrphanedError(RedisError),
}

/// Config for a [RedisStorage]
#[derive(Clone, Debug)]
pub struct Config {
Expand Down Expand Up @@ -412,14 +440,15 @@ where

fn poll<Svc: Service<Request<T, RedisContext>>>(
mut self,
worker: WorkerId,
worker: &Worker<apalis_core::worker::Context>,
) -> Poller<Self::Stream, Self::Layer> {
let (mut tx, rx) = mpsc::channel(self.config.buffer_size);
let (ack, ack_rx) = mpsc::channel(self.config.buffer_size);
let layer = AckLayer::new(ack);
let controller = self.controller.clone();
let config = self.config.clone();
let stream: RequestStream<Request<T, RedisContext>> = Box::pin(rx);
let worker = worker.clone();
let heartbeat = async move {
let mut reenqueue_orphaned_stm =
apalis_core::interval::interval(config.poll_interval).fuse();
Expand All @@ -433,32 +462,32 @@ where

let mut ack_stream = ack_rx.fuse();

if let Err(e) = self.keep_alive(&worker).await {
error!("RegistrationError: {}", e);
if let Err(e) = self.keep_alive(worker.id()).await {
worker.emit(Event::Error(Box::new(RedisPollError::KeepAliveError(e))));
}

loop {
select! {
_ = keep_alive_stm.next() => {
if let Err(e) = self.keep_alive(&worker).await {
error!("KeepAliveError: {}", e);
if let Err(e) = self.keep_alive(worker.id()).await {
worker.emit(Event::Error(Box::new(RedisPollError::KeepAliveError(e))));
}
}
_ = enqueue_scheduled_stm.next() => {
if let Err(e) = self.enqueue_scheduled(config.buffer_size).await {
error!("EnqueueScheduledError: {}", e);
worker.emit(Event::Error(Box::new(RedisPollError::EnqueueScheduledError(e))));
}
}
_ = poll_next_stm.next() => {
let res = self.fetch_next(&worker).await;
let res = self.fetch_next(worker.id()).await;
match res {
Err(e) => {
error!("PollNextError: {}", e);
worker.emit(Event::Error(Box::new(RedisPollError::PollNextError(e))));
}
Ok(res) => {
for job in res {
if let Err(e) = tx.send(Ok(Some(job))).await {
error!("EnqueueError: {}", e);
worker.emit(Event::Error(Box::new(RedisPollError::EnqueueError(e))));
}
}
}
Expand All @@ -468,15 +497,15 @@ where
id_to_ack = ack_stream.next() => {
if let Some((ctx, res)) = id_to_ack {
if let Err(e) = self.ack(&ctx, &res).await {
error!("AckError: {}", e);
worker.emit(Event::Error(Box::new(RedisPollError::AckError(e))));
}
}
}
_ = reenqueue_orphaned_stm.next() => {
let dead_since = Utc::now()
- chrono::Duration::from_std(config.reenqueue_orphaned_after).unwrap();
if let Err(e) = self.reenqueue_orphaned((config.buffer_size * 10) as i32, dead_since).await {
error!("ReenqueueOrphanedError: {}", e);
worker.emit(Event::Error(Box::new(RedisPollError::ReenqueueOrphanedError(e))));
}
}
};
Expand Down
1 change: 1 addition & 0 deletions packages/apalis-sql/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ tokio = { version = "1", features = ["rt", "net"], optional = true }
futures-lite = "2.3.0"
async-std = { version = "1.13.0", optional = true }
chrono = { version = "0.4", features = ["serde"] }
thiserror = "1"


[dev-dependencies]
Expand Down
Loading

0 comments on commit 9863a60

Please sign in to comment.