Skip to content

Commit

Permalink
refactor(client): replace futures mpsc for tokio mpsc in dispatcher
Browse files Browse the repository at this point in the history
  • Loading branch information
seanmonstar committed Mar 25, 2020
1 parent 5b3724e commit e6a6dde
Showing 1 changed file with 12 additions and 13 deletions.
25 changes: 12 additions & 13 deletions src/client/dispatch.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
use futures_channel::{mpsc, oneshot};
use futures_core::Stream;
use futures_channel::oneshot;
use futures_util::future;
use tokio::sync::mpsc;

use crate::common::{task, Future, Pin, Poll};

pub type RetryPromise<T, U> = oneshot::Receiver<Result<U, (crate::Error, Option<T>)>>;
pub type Promise<T> = oneshot::Receiver<Result<T, crate::Error>>;

pub fn channel<T, U>() -> (Sender<T, U>, Receiver<T, U>) {
let (tx, rx) = mpsc::unbounded();
let (tx, rx) = mpsc::unbounded_channel();
let (giver, taker) = want::new();
let tx = Sender {
buffered_once: false,
Expand Down Expand Up @@ -81,9 +81,9 @@ impl<T, U> Sender<T, U> {
}
let (tx, rx) = oneshot::channel();
self.inner
.unbounded_send(Envelope(Some((val, Callback::Retry(tx)))))
.send(Envelope(Some((val, Callback::Retry(tx)))))
.map(move |_| rx)
.map_err(|e| e.into_inner().0.take().expect("envelope not dropped").0)
.map_err(|mut e| (e.0).0.take().expect("envelope not dropped").0)
}

pub fn send(&mut self, val: T) -> Result<Promise<U>, T> {
Expand All @@ -92,9 +92,9 @@ impl<T, U> Sender<T, U> {
}
let (tx, rx) = oneshot::channel();
self.inner
.unbounded_send(Envelope(Some((val, Callback::NoRetry(tx)))))
.send(Envelope(Some((val, Callback::NoRetry(tx)))))
.map(move |_| rx)
.map_err(|e| e.into_inner().0.take().expect("envelope not dropped").0)
.map_err(|mut e| (e.0).0.take().expect("envelope not dropped").0)
}

pub fn unbound(self) -> UnboundedSender<T, U> {
Expand All @@ -117,9 +117,9 @@ impl<T, U> UnboundedSender<T, U> {
pub fn try_send(&mut self, val: T) -> Result<RetryPromise<T, U>, T> {
let (tx, rx) = oneshot::channel();
self.inner
.unbounded_send(Envelope(Some((val, Callback::Retry(tx)))))
.send(Envelope(Some((val, Callback::Retry(tx)))))
.map(move |_| rx)
.map_err(|e| e.into_inner().0.take().expect("envelope not dropped").0)
.map_err(|mut e| (e.0).0.take().expect("envelope not dropped").0)
}
}

Expand All @@ -142,7 +142,7 @@ impl<T, U> Receiver<T, U> {
&mut self,
cx: &mut task::Context<'_>,
) -> Poll<Option<(T, Callback<T, U>)>> {
match Pin::new(&mut self.inner).poll_next(cx) {
match self.inner.poll_recv(cx) {
Poll::Ready(item) => {
Poll::Ready(item.map(|mut env| env.0.take().expect("envelope not dropped")))
}
Expand All @@ -159,9 +159,8 @@ impl<T, U> Receiver<T, U> {
}

pub(crate) fn try_recv(&mut self) -> Option<(T, Callback<T, U>)> {
match self.inner.try_next() {
Ok(Some(mut env)) => env.0.take(),
Ok(None) => None,
match self.inner.try_recv() {
Ok(mut env) => env.0.take(),
Err(_) => None,
}
}
Expand Down

0 comments on commit e6a6dde

Please sign in to comment.