Skip to content

Commit

Permalink
switch to executor-trait
Browse files Browse the repository at this point in the history
Signed-off-by: Marc-Antoine Perennou <[email protected]>
  • Loading branch information
Keruspe committed Dec 27, 2020
1 parent 337bd94 commit 64f6298
Show file tree
Hide file tree
Showing 10 changed files with 66 additions and 100 deletions.
6 changes: 4 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ optional = true
version = "7.0.0-alpha.4"
default-features = false

[dependencies.async-global-executor]
version = "^2.0"
[dependencies.async-global-executor-trait]
version = "^1.0"
features = ["async-io"]

[dependencies.flume]
Expand All @@ -59,12 +59,14 @@ default-features = false
async-io = "^1.0"
async-trait = "^0.1"
blocking = "^1.0"
executor-trait = "^1.0"
futures-lite = "^1.7"
parking_lot = "^0.11"
pinky-swear = "^5.1"
waker-fn = "^1.1"

[dev-dependencies]
async-global-executor = "^2.0"
serde_json = "^1.0"
waker-fn = "^1.1"

Expand Down
7 changes: 3 additions & 4 deletions src/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use crate::{
connection_status::{ConnectionState, ConnectionStep},
consumer::Consumer,
consumers::Consumers,
executor::Executor,
frames::{ExpectedReply, Frames},
id_sequence::IdSequence,
internal_rpc::InternalRPCHandle,
Expand All @@ -27,6 +26,7 @@ use crate::{
PromiseResolver, Result,
};
use amq_protocol::frame::{AMQPContentHeader, AMQPFrame};
use executor_trait::Executor;
use serde::{Deserialize, Serialize};
use std::{convert::TryFrom, fmt, sync::Arc};
use tracing::{error, info, level_enabled, trace, Level};
Expand Down Expand Up @@ -56,7 +56,7 @@ pub struct Channel {
waker: SocketStateHandle,
internal_rpc: InternalRPCHandle,
frames: Frames,
executor: Arc<dyn Executor>,
executor: Arc<dyn Executor + Send + Sync>,
channel_closer: Option<Arc<ChannelCloser>>,
connection_closer: Option<Arc<ConnectionCloser>>,
}
Expand All @@ -80,7 +80,6 @@ impl fmt::Debug for Channel {
.field("basic_get_delivery", &self.basic_get_delivery)
.field("returned_messages", &self.returned_messages)
.field("frames", &self.frames)
.field("executor", &self.executor)
.finish()
}
}
Expand All @@ -94,7 +93,7 @@ impl Channel {
waker: SocketStateHandle,
internal_rpc: InternalRPCHandle,
frames: Frames,
executor: Arc<dyn Executor>,
executor: Arc<dyn Executor + Send + Sync>,
connection_closer: Option<Arc<ConnectionCloser>>,
) -> Channel {
let returned_messages = ReturnedMessages::default();
Expand Down
11 changes: 5 additions & 6 deletions src/channels.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use crate::{
connection_closer::ConnectionCloser,
error_handler::ErrorHandler,
executor::Executor,
frames::Frames,
id_sequence::IdSequence,
internal_rpc::InternalRPCHandle,
Expand All @@ -14,6 +13,7 @@ use crate::{
Error, Promise, Result,
};
use amq_protocol::frame::{AMQPFrame, ProtocolVersion};
use executor_trait::Executor;
use parking_lot::Mutex;
use std::{collections::HashMap, fmt, sync::Arc};
use tracing::{debug, error, level_enabled, trace, Level};
Expand All @@ -24,7 +24,7 @@ pub(crate) struct Channels {
connection_status: ConnectionStatus,
global_registry: Registry,
internal_rpc: InternalRPCHandle,
executor: Arc<dyn Executor>,
executor: Arc<dyn Executor + Send + Sync>,
frames: Frames,
error_handler: ErrorHandler,
}
Expand All @@ -37,7 +37,7 @@ impl Channels {
waker: SocketStateHandle,
internal_rpc: InternalRPCHandle,
frames: Frames,
executor: Arc<dyn Executor>,
executor: Arc<dyn Executor + Send + Sync>,
) -> Self {
Self {
inner: Arc::new(Mutex::new(Inner::new(configuration, waker))),
Expand Down Expand Up @@ -285,7 +285,6 @@ impl fmt::Debug for Channels {
}
debug
.field("frames", &self.frames)
.field("executor", &self.executor)
.field("connection_status", &self.connection_status)
.field("error_handler", &self.error_handler)
.finish()
Expand Down Expand Up @@ -316,7 +315,7 @@ impl Inner {
global_registry: Registry,
internal_rpc: InternalRPCHandle,
frames: Frames,
executor: Arc<dyn Executor>,
executor: Arc<dyn Executor + Send + Sync>,
connection_closer: Option<Arc<ConnectionCloser>>,
) -> Channel {
debug!(%id, "create channel");
Expand All @@ -341,7 +340,7 @@ impl Inner {
global_registry: Registry,
internal_rpc: InternalRPCHandle,
frames: Frames,
executor: Arc<dyn Executor>,
executor: Arc<dyn Executor + Send + Sync>,
connection_closer: Arc<ConnectionCloser>,
) -> Result<Channel> {
debug!("create channel");
Expand Down
56 changes: 31 additions & 25 deletions src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use crate::{
connection_closer::ConnectionCloser,
connection_properties::ConnectionProperties,
connection_status::{ConnectionState, ConnectionStatus, ConnectionStep},
executor::{DefaultExecutor, Executor},
frames::Frames,
heartbeat::Heartbeat,
internal_rpc::{InternalRPC, InternalRPCHandle},
Expand All @@ -24,6 +23,7 @@ use crate::{
};
use amq_protocol::frame::{AMQPFrame, ProtocolVersion};
use async_trait::async_trait;
use executor_trait::Executor;
use std::{fmt, io, sync::Arc};
use tracing::{level_enabled, Level};

Expand Down Expand Up @@ -52,7 +52,7 @@ impl Connection {
waker: SocketStateHandle,
internal_rpc: InternalRPCHandle,
frames: Frames,
executor: Arc<dyn Executor>,
executor: Arc<dyn Executor + Send + Sync>,
) -> Self {
let configuration = Configuration::default();
let status = ConnectionStatus::default();
Expand Down Expand Up @@ -295,31 +295,37 @@ impl Connection {
let executor = options
.executor
.take()
.map(Ok)
.unwrap_or_else(DefaultExecutor::default)?;
.unwrap_or_else(|| Arc::new(async_global_executor_trait::AsyncGlobalExecutor));

let (connect_promise, resolver) = pinky_swear::PinkySwear::<Result<TcpStream>>::new();
let connect_uri = uri.clone();
executor.spawn_blocking(Box::new(move || {
let mut res = connect(&connect_uri);
loop {
match res {
Ok(stream) => {
resolver.swear(Ok(stream));
break;
}
Err(mid) => match mid.into_mid_handshake_tls_stream() {
Err(err) => {
resolver.swear(Err(err.into()));
break;
executor.spawn({
let executor = executor.clone();
Box::pin(async move {
executor
.spawn_blocking(Box::new(move || {
let mut res = connect(&connect_uri);
loop {
match res {
Ok(stream) => {
resolver.swear(Ok(stream));
break;
}
Err(mid) => match mid.into_mid_handshake_tls_stream() {
Err(err) => {
resolver.swear(Err(err.into()));
break;
}
Ok(mid) => {
res = mid.handshake();
}
},
}
}
Ok(mid) => {
res = mid.handshake();
}
},
}
}
}));
}))
.await;
})
});

let reactor = options
.reactor
Expand Down Expand Up @@ -480,7 +486,7 @@ mod tests {
use crate::consumer::Consumer;

// Bootstrap connection state to a consuming state
let executor = DefaultExecutor::default().unwrap();
let executor = Arc::new(async_global_executor_trait::AsyncGlobalExecutor);
let socket_state = SocketState::default();
let waker = socket_state.handle();
let internal_rpc = InternalRPC::new(executor.clone(), waker.clone());
Expand Down Expand Up @@ -561,7 +567,7 @@ mod tests {
// Bootstrap connection state to a consuming state
let socket_state = SocketState::default();
let waker = socket_state.handle();
let executor = DefaultExecutor::default().unwrap();
let executor = Arc::new(async_global_executor_trait::AsyncGlobalExecutor);
let internal_rpc = InternalRPC::new(executor.clone(), waker.clone());
let conn = Connection::new(
waker,
Expand Down
9 changes: 5 additions & 4 deletions src/connection_properties.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use crate::{executor::Executor, reactor::Reactor, types::FieldTable};
use crate::{reactor::Reactor, types::FieldTable};
use executor_trait::Executor;
use std::sync::Arc;

#[derive(Clone, Debug)]
#[derive(Clone)]
pub struct ConnectionProperties {
pub locale: String,
pub client_properties: FieldTable,
pub executor: Option<Arc<dyn Executor>>,
pub executor: Option<Arc<dyn Executor + Send + Sync>>,
pub reactor: Option<Arc<dyn Reactor + Send + Sync>>,
}

Expand All @@ -21,7 +22,7 @@ impl Default for ConnectionProperties {
}

impl ConnectionProperties {
pub fn with_executor<E: Executor + 'static>(mut self, executor: E) -> Self {
pub fn with_executor<E: Executor + Send + Sync + 'static>(mut self, executor: E) -> Self {
self.executor = Some(Arc::new(executor));
self
}
Expand Down
21 changes: 11 additions & 10 deletions src/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use crate::{
channel_closer::ChannelCloser,
consumer_canceler::ConsumerCanceler,
consumer_status::{ConsumerState, ConsumerStatus},
executor::Executor,
internal_rpc::InternalRPCHandle,
message::{Delivery, DeliveryResult},
options::BasicConsumeOptions,
Expand All @@ -11,6 +10,7 @@ use crate::{
wakers::Wakers,
BasicProperties, Error, Result,
};
use executor_trait::Executor;
use flume::{Receiver, Sender};
use futures_lite::Stream;
use parking_lot::Mutex;
Expand Down Expand Up @@ -146,7 +146,7 @@ pub struct Consumer {
impl Consumer {
pub(crate) fn new(
consumer_tag: ShortString,
executor: Arc<dyn Executor>,
executor: Arc<dyn Executor + Send + Sync>,
channel_closer: Option<Arc<ChannelCloser>>,
queue: ShortString,
options: BasicConsumeOptions,
Expand Down Expand Up @@ -275,16 +275,14 @@ struct ConsumerInner {
wakers: Wakers,
tag: ShortString,
delegate: Option<Arc<Box<dyn ConsumerDelegate>>>,
executor: Arc<dyn Executor>,
executor: Arc<dyn Executor + Send + Sync>,
}

impl fmt::Debug for Consumer {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let mut debug = f.debug_struct("Consumer");
if let Some(inner) = self.inner.try_lock() {
debug
.field("tag", &inner.tag)
.field("executor", &inner.executor);
debug.field("tag", &inner.tag);
}
if let Some(status) = self.status.try_lock() {
debug.field("state", &status.state());
Expand All @@ -294,7 +292,11 @@ impl fmt::Debug for Consumer {
}

impl ConsumerInner {
fn new(status: ConsumerStatus, consumer_tag: ShortString, executor: Arc<dyn Executor>) -> Self {
fn new(
status: ConsumerStatus,
consumer_tag: ShortString,
executor: Arc<dyn Executor + Send + Sync>,
) -> Self {
let (sender, receiver) = flume::unbounded();
Self {
status,
Expand Down Expand Up @@ -428,7 +430,6 @@ impl Stream for Consumer {
#[cfg(test)]
mod futures_tests {
use super::*;
use crate::executor::DefaultExecutor;

use std::sync::{
atomic::{AtomicUsize, Ordering},
Expand All @@ -452,7 +453,7 @@ mod futures_tests {

let mut consumer = Consumer::new(
ShortString::from("test-consumer"),
DefaultExecutor::default().unwrap(),
Arc::new(async_global_executor_trait::AsyncGlobalExecutor),
None,
"test".into(),
BasicConsumeOptions::default(),
Expand Down Expand Up @@ -488,7 +489,7 @@ mod futures_tests {

let mut consumer = Consumer::new(
ShortString::from("test-consumer"),
DefaultExecutor::default().unwrap(),
Arc::new(async_global_executor_trait::AsyncGlobalExecutor),
None,
"test".into(),
BasicConsumeOptions::default(),
Expand Down
42 changes: 0 additions & 42 deletions src/executor.rs

This file was deleted.

Loading

0 comments on commit 64f6298

Please sign in to comment.