Skip to content

Commit

Permalink
Merge branch 'lapin-1.x'
Browse files Browse the repository at this point in the history
* lapin-1.x:
  v1.6.6
  fix automatic conneciton closing
  lapin-async-global-executor: v2.0.0
  tokio-amqp: v1.0.0
  update to tokio 1.0.0
  rustfmt
  v1.6.5
  consider errors for basic cancel as success
  frames: consider a channel close/error as a success for basic cancel
  v1.6.4
  connection close hook is no longer necessary
  simplify recent basic_cancel tweaks
  properly handle missing replies before close-ok
  v1.6.3
  consumer: don't auto cancel when channel is closing
  properly set connection as closing when starting to close it
  another auto consumer cancel fix
  v1.6.2
  fix race in auto conumer cancelation
  • Loading branch information
Keruspe committed Dec 25, 2020
2 parents b690e86 + 5785da0 commit 55189bc
Show file tree
Hide file tree
Showing 15 changed files with 170 additions and 35 deletions.
31 changes: 31 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,34 @@
### 1.6.6 (2020-12-25)

#### Bug Fixes

* Fix an issue with automatic connection close on drop

### 1.6.5 (2020-12-23)

#### Bug Fixes

* Treat close and error as success for consumers cancel as it forcibly implies cancelation

### 1.6.4 (2020-12-23)

#### Bug Fixes

* Cleanup basic cancel handling
* Fix undocumented rabbitmq behaviour on channel close with expected replies

### 1.6.3 (2020-12-23)

#### Bug Fixes

* Better handling of automatic consumer cancelation when connection is closing

### 1.6.2 (2020-12-23)

#### Bug Fixes

* Fix a potential race condition when automatically canceling consumer on drop

### 1.6.1 (2020-11-28)

#### Bug Fixes
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "lapin"
version = "1.6.1"
version = "1.6.6"
edition = "2018"
authors = ["Geoffroy Couprie <[email protected]>", "Marc-Antoine Perennou <[email protected]>"]
description = "AMQP client library"
Expand Down
4 changes: 2 additions & 2 deletions async-global-executor/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "lapin-async-global-executor"
version = "1.1.0"
version = "2.0.0"
edition = "2018"
authors = ["Marc-Antoine Perennou <[email protected]>"]
description = "lapin integration with async-global-executor"
Expand All @@ -19,7 +19,7 @@ async-io = ["async-global-executor/async-io", "async-lapin"]
blocking = "^1.0"

[dependencies.async-global-executor]
version = "^1.4.3"
version = "^2.0"
default-features = false

[dependencies.async-lapin]
Expand Down
22 changes: 16 additions & 6 deletions src/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,11 @@ impl Channel {
Ok(())
}

fn set_closing(&self) {
self.set_state(ChannelState::Closing);
self.consumers.start_cancel();
}

fn set_closed(&self, error: Error) {
self.set_state(ChannelState::Closed);
self.error_publisher_confirms(error.clone());
Expand Down Expand Up @@ -508,6 +513,10 @@ impl Channel {
}
}

fn before_basic_cancel(&self, consumer_tag: &str) {
self.consumers.start_cancel_one(consumer_tag);
}

fn acknowledgement_error(
&self,
error: AMQPError,
Expand Down Expand Up @@ -545,10 +554,6 @@ impl Channel {
.set_connection_step(ConnectionStep::Open(resolver));
}

fn on_connection_close_sent(&self) {
self.internal_rpc.set_connection_closing();
}

fn on_connection_close_ok_sent(&self, error: Error) {
if let Error::ProtocolError(_) = error {
self.internal_rpc.set_connection_error(error);
Expand All @@ -557,8 +562,13 @@ impl Channel {
}
}

fn next_expected_close_ok_reply(&self) -> Option<Reply> {
self.frames
.next_expected_close_ok_reply(self.id, Error::InvalidChannelState(ChannelState::Closed))
}

fn before_channel_close(&self) {
self.set_state(ChannelState::Closing);
self.set_closing();
}

fn on_channel_close_ok_sent(&self, error: Error) {
Expand Down Expand Up @@ -878,7 +888,7 @@ impl Channel {
info!(channel=%self.id, ?method, "Channel closed");
Error::InvalidChannelState(ChannelState::Closing)
});
self.set_state(ChannelState::Closing);
self.set_closing();
let channel = self.clone();
self.internal_rpc
.register_internal_future(async move { channel.channel_close_ok(error).await });
Expand Down
1 change: 1 addition & 0 deletions src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ impl Connection {
}

pub async fn close(&self, reply_code: ReplyCode, reply_text: &str) -> Result<()> {
self.channels.set_connection_closing();
if let Some(channel0) = self.channels.get(0) {
channel0
.connection_close(reply_code, reply_text, 0, 0)
Expand Down
4 changes: 4 additions & 0 deletions src/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,10 @@ impl Consumer {
self.inner.lock().drop_prefetched_messages();
}

pub(crate) fn start_cancel(&self) {
self.status.lock().start_cancel();
}

pub(crate) fn cancel(&self) {
self.inner.lock().cancel();
}
Expand Down
7 changes: 5 additions & 2 deletions src/consumer_canceler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,11 @@ impl Drop for ConsumerCanceler {
fn drop(&mut self) {
let status = self.status.lock();
if status.state() == ConsumerState::Active {
self.internal_rpc
.cancel_consumer(self.channel_id, self.consumer_tag.clone());
self.internal_rpc.cancel_consumer(
self.channel_id,
self.consumer_tag.clone(),
self.status.clone(),
);
}
}
}
26 changes: 25 additions & 1 deletion src/consumer_status.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use parking_lot::{Mutex, MutexGuard};
use std::sync::Arc;
use std::{fmt, sync::Arc};

#[derive(Clone, Default)]
pub(crate) struct ConsumerStatus(Arc<Mutex<ConsumerStatusInner>>);
Expand All @@ -18,13 +18,33 @@ impl ConsumerStatus {
}
}

impl fmt::Debug for ConsumerStatus {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let mut debug = f.debug_struct("ConsumerStatus");
if let Some(inner) = self.try_lock() {
debug.field("state", &inner.state());
}
debug.finish()
}
}

#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum ConsumerState {
Active,
ActiveWithDelegate,
Canceling,
Canceled,
}

impl ConsumerState {
pub fn is_active(self) -> bool {
match self {
ConsumerState::Active | ConsumerState::ActiveWithDelegate => true,
_ => false,
}
}
}

impl Default for ConsumerState {
fn default() -> Self {
Self::Active
Expand All @@ -45,6 +65,10 @@ impl ConsumerStatusInner {
}
}

pub(crate) fn start_cancel(&mut self) {
self.0 = ConsumerState::Canceling;
}

pub(crate) fn cancel(&mut self) {
self.0 = ConsumerState::Canceled;
}
Expand Down
15 changes: 15 additions & 0 deletions src/consumers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,15 @@ impl Consumers {
}
}

pub(crate) fn start_cancel_one<S: Hash + Eq + ?Sized>(&self, consumer_tag: &S)
where
ShortString: Borrow<S>,
{
if let Some(consumer) = self.0.lock().get(consumer_tag) {
consumer.start_cancel();
}
}

pub(crate) fn start_delivery<S: Hash + Eq + ?Sized>(&self, consumer_tag: &S, message: Delivery)
where
ShortString: Borrow<S>,
Expand Down Expand Up @@ -63,6 +72,12 @@ impl Consumers {
}
}

pub(crate) fn start_cancel(&self) {
for consumer in self.0.lock().values() {
consumer.start_cancel();
}
}

pub(crate) fn cancel(&self) {
for (_, consumer) in self.0.lock().drain() {
consumer.cancel();
Expand Down
40 changes: 35 additions & 5 deletions src/frames.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::{channel::Reply, ChannelId, Error, Promise, PromiseResolver, Result};
use amq_protocol::frame::AMQPFrame;
use amq_protocol::{frame::AMQPFrame, protocol::{AMQPClass, basic::AMQPMethod}};
use parking_lot::Mutex;
use pinky_swear::Cancellable;
use std::{
Expand Down Expand Up @@ -60,6 +60,16 @@ impl Frames {
.map(|t| t.0)
}

pub(crate) fn next_expected_close_ok_reply(
&self,
channel_id: u16,
error: Error,
) -> Option<Reply> {
self.inner
.lock()
.next_expected_close_ok_reply(channel_id, error)
}

pub(crate) fn has_pending(&self) -> bool {
self.inner.lock().has_pending()
}
Expand Down Expand Up @@ -211,22 +221,42 @@ impl Inner {
frames: &mut VecDeque<(AMQPFrame, Option<PromiseResolver<()>>)>,
error: Error,
) {
for (_, resolver) in std::mem::take(frames) {
for (frame, resolver) in std::mem::take(frames) {
if let Some(resolver) = resolver {
resolver.swear(Err(error.clone()));
match frame {
AMQPFrame::Method(_, AMQPClass::Basic(AMQPMethod::Cancel(_))) => {
resolver.swear(Ok(()))
}
_ => resolver.swear(Err(error.clone())),
}
}
}
}

fn next_expected_close_ok_reply(&mut self, channel_id: u16, error: Error) -> Option<Reply> {
let expected_replies = self.expected_replies.get_mut(&channel_id)?;
while let Some(reply) = expected_replies.pop_front() {
match &reply.0 {
Reply::ChannelCloseOk(_) => return Some(reply.0),
Reply::BasicCancelOk(pinky) => pinky.swear(Ok(())), // Channel close means consumer is canceled automatically
_ => reply.1.cancel(error.clone()),
}
}
None
}

fn clear_expected_replies(&mut self, channel_id: ChannelId, error: Error) {
if let Some(replies) = self.expected_replies.remove(&channel_id) {
Self::cancel_expected_replies(replies, error);
}
}

fn cancel_expected_replies(replies: VecDeque<ExpectedReply>, error: Error) {
for ExpectedReply(_, cancel) in replies {
cancel.cancel(error.clone());
for ExpectedReply(reply, cancel) in replies {
match reply {
Reply::BasicCancelOk(pinky) => pinky.swear(Ok(())),
_ => cancel.cancel(error.clone()),
}
}
}
}
6 changes: 3 additions & 3 deletions src/generated.rs
Original file line number Diff line number Diff line change
Expand Up @@ -541,7 +541,7 @@ impl Channel {
class_id: ShortUInt,
method_id: ShortUInt,
) -> Result<()> {
if !self.status.connected() {
if !self.status.closing() {
return Err(Error::InvalidChannelState(self.status.state()));
}

Expand Down Expand Up @@ -570,7 +570,6 @@ impl Channel {
Box::new(resolver),
)),
);
self.on_connection_close_sent();
promise_out.await?;
promise.await
}
Expand Down Expand Up @@ -915,7 +914,7 @@ impl Channel {
return Err(Error::InvalidChannelState(self.status.state()));
}

match self.frames.next_expected_reply(self.id) {
match self.next_expected_close_ok_reply() {
Some(Reply::ChannelCloseOk(resolver)) => {
let res = self.on_channel_close_ok_received();
resolver.swear(res.clone());
Expand Down Expand Up @@ -1821,6 +1820,7 @@ impl Channel {
return Err(Error::InvalidChannelState(self.status.state()));
}

self.before_basic_cancel(consumer_tag);
let BasicCancelOptions { nowait } = options;
let method = AMQPClass::Basic(protocol::basic::AMQPMethod::Cancel(
protocol::basic::Cancel {
Expand Down
Loading

0 comments on commit 55189bc

Please sign in to comment.