From d8fff878245042902bc16c6f51dd24ba7ac138a3 Mon Sep 17 00:00:00 2001 From: Marc-Antoine Perennou Date: Tue, 22 Dec 2020 23:23:51 +0100 Subject: [PATCH 01/19] fix race in auto conumer cancelation Signed-off-by: Marc-Antoine Perennou --- src/channel.rs | 18 ++++++++++++++++++ src/consumer.rs | 4 ++++ src/consumer_canceler.rs | 2 +- src/consumer_status.rs | 26 +++++++++++++++++++++++++- src/consumers.rs | 9 +++++++++ src/generated.rs | 18 +++++++++++++----- src/internal_rpc.rs | 11 ++++++----- templates/channel.rs | 4 ++-- templates/lapin.json | 12 ++++++++++++ 9 files changed, 90 insertions(+), 14 deletions(-) diff --git a/src/channel.rs b/src/channel.rs index 78d2e70c..6b599ecf 100644 --- a/src/channel.rs +++ b/src/channel.rs @@ -8,6 +8,7 @@ use crate::{ connection_closer::ConnectionCloser, connection_status::{ConnectionState, ConnectionStep}, consumer::Consumer, + consumer_status::ConsumerStatus, consumers::Consumers, executor::Executor, frames::{ExpectedReply, Frames}, @@ -517,6 +518,23 @@ impl Channel { } } + pub fn basic_cancel( + &self, + consumer_tag: &str, + options: BasicCancelOptions, + ) -> Promise<()> { + self.do_basic_cancel(consumer_tag, options, None) + } + + fn before_basic_cancel(&self, consumer_tag: &str, consumer_status: Option) -> Option> { + if !consumer_status.map_or(false, |consumer_status| consumer_status.state().is_active()) { + Some(Promise::new_with_data(Ok(()))) + } else { + self.consumers.start_cancel(consumer_tag); + None + } + } + fn acknowledgement_error(&self, error: AMQPError, class_id: u16, method_id: u16) -> Result<()> { error!("Got a bad acknowledgement from server, closing channel"); self.internal_rpc diff --git a/src/consumer.rs b/src/consumer.rs index c33605d6..5fadce9b 100644 --- a/src/consumer.rs +++ b/src/consumer.rs @@ -257,6 +257,10 @@ impl Consumer { self.inner.lock().drop_prefetched_messages() } + pub(crate) fn start_cancel(&self) { + self.status.lock().start_cancel(); + } + pub(crate) fn cancel(&self) -> Result<()> { self.inner.lock().cancel() } diff --git a/src/consumer_canceler.rs b/src/consumer_canceler.rs index 44e95572..1c379c2e 100644 --- a/src/consumer_canceler.rs +++ b/src/consumer_canceler.rs @@ -31,7 +31,7 @@ impl Drop for ConsumerCanceler { let status = self.status.lock(); if status.state() == ConsumerState::Active { self.internal_rpc - .cancel_consumer(self.channel_id, self.consumer_tag.clone()); + .cancel_consumer(self.channel_id, self.consumer_tag.clone(), self.status.clone()); } } } diff --git a/src/consumer_status.rs b/src/consumer_status.rs index 6c3e06ce..d524539a 100644 --- a/src/consumer_status.rs +++ b/src/consumer_status.rs @@ -1,5 +1,5 @@ use parking_lot::{Mutex, MutexGuard}; -use std::sync::Arc; +use std::{fmt, sync::Arc}; #[derive(Clone, Default)] pub(crate) struct ConsumerStatus(Arc>); @@ -18,13 +18,33 @@ impl ConsumerStatus { } } +impl fmt::Debug for ConsumerStatus { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let mut debug = f.debug_struct("ConsumerStatus"); + if let Some(inner) = self.try_lock() { + debug.field("state", &inner.state()); + } + debug.finish() + } +} + #[derive(Clone, Copy, Debug, PartialEq, Eq)] pub enum ConsumerState { Active, ActiveWithDelegate, + Canceling, Canceled, } +impl ConsumerState { + pub fn is_active(self) -> bool { + match self { + ConsumerState::Active | ConsumerState::ActiveWithDelegate => true, + _ => false, + } + } +} + impl Default for ConsumerState { fn default() -> Self { Self::Active @@ -45,6 +65,10 @@ impl ConsumerStatusInner { } } + pub(crate) fn start_cancel(&mut self) { + self.0 = ConsumerState::Canceling; + } + pub(crate) fn cancel(&mut self) { self.0 = ConsumerState::Canceled; } diff --git a/src/consumers.rs b/src/consumers.rs index 965bbbf3..40c85087 100644 --- a/src/consumers.rs +++ b/src/consumers.rs @@ -26,6 +26,15 @@ impl Consumers { Ok(()) } + pub(crate) fn start_cancel(&self, consumer_tag: &S) + where + ShortString: Borrow, + { + if let Some(consumer) = self.0.lock().get(consumer_tag) { + consumer.start_cancel(); + } + } + pub(crate) fn start_delivery(&self, consumer_tag: &S, message: Delivery) where ShortString: Borrow, diff --git a/src/generated.rs b/src/generated.rs index 3510fb56..d6564df7 100644 --- a/src/generated.rs +++ b/src/generated.rs @@ -868,7 +868,7 @@ impl Channel { } } #[allow(clippy::too_many_arguments)] - fn do_channel_close( + pub(crate) fn do_channel_close( &self, reply_code: ShortUInt, reply_text: &str, @@ -1021,7 +1021,7 @@ impl Channel { } } #[allow(clippy::too_many_arguments)] - fn do_exchange_declare( + pub(crate) fn do_exchange_declare( &self, exchange: &str, kind: &str, @@ -1749,7 +1749,7 @@ impl Channel { } } #[allow(clippy::too_many_arguments)] - fn do_basic_consume( + pub(crate) fn do_basic_consume( &self, queue: &str, consumer_tag: &str, @@ -1848,11 +1848,19 @@ impl Channel { } } #[allow(clippy::too_many_arguments)] - pub fn basic_cancel(&self, consumer_tag: &str, options: BasicCancelOptions) -> Promise<()> { + pub(crate) fn do_basic_cancel( + &self, + consumer_tag: &str, + options: BasicCancelOptions, + consumer_status: Option, + ) -> Promise<()> { if !self.status.connected() { return Promise::new_with_data(Err(Error::InvalidChannelState(self.status.state()))); } + if let Some(res) = self.before_basic_cancel(consumer_tag, consumer_status) { + return res; + } let BasicCancelOptions { nowait } = options; let method = AMQPClass::Basic(protocol::basic::AMQPMethod::Cancel( protocol::basic::Cancel { @@ -1979,7 +1987,7 @@ impl Channel { self.on_basic_deliver_received(method) } #[allow(clippy::too_many_arguments)] - fn do_basic_get( + pub(crate) fn do_basic_get( &self, queue: &str, options: BasicGetOptions, diff --git a/src/internal_rpc.rs b/src/internal_rpc.rs index 029add36..6b39305c 100644 --- a/src/internal_rpc.rs +++ b/src/internal_rpc.rs @@ -1,5 +1,6 @@ use crate::{ channels::Channels, + consumer_status::ConsumerStatus, executor::Executor, options::{BasicAckOptions, BasicCancelOptions, BasicNackOptions, BasicRejectOptions}, socket_state::SocketStateHandle, @@ -68,8 +69,8 @@ impl InternalRPCHandle { )); } - pub(crate) fn cancel_consumer(&self, channel_id: u16, consumer_tag: String) { - self.send(InternalCommand::CancelConsumer(channel_id, consumer_tag)); + pub(crate) fn cancel_consumer(&self, channel_id: u16, consumer_tag: String, consumer_status: ConsumerStatus) { + self.send(InternalCommand::CancelConsumer(channel_id, consumer_tag, consumer_status)); } pub(crate) fn close_channel(&self, channel_id: u16, reply_code: ShortUInt, reply_text: String) { @@ -152,7 +153,7 @@ enum InternalCommand { BasicAck(u16, DeliveryTag, BasicAckOptions, PromiseResolver<()>), BasicNack(u16, DeliveryTag, BasicNackOptions, PromiseResolver<()>), BasicReject(u16, DeliveryTag, BasicRejectOptions, PromiseResolver<()>), - CancelConsumer(u16, String), + CancelConsumer(u16, String, ConsumerStatus), CloseChannel(u16, ShortUInt, String), CloseConnection(ShortUInt, String, ShortUInt, ShortUInt), SendConnectionCloseOk(Error), @@ -216,11 +217,11 @@ impl InternalRPC { ) }) .unwrap_or(Ok(())), - CancelConsumer(channel_id, consumer_tag) => channels + CancelConsumer(channel_id, consumer_tag, consumer_status) => channels .get(channel_id) .map(|channel| { self.handle.register_internal_future( - channel.basic_cancel(&consumer_tag, BasicCancelOptions::default()), + channel.do_basic_cancel(&consumer_tag, BasicCancelOptions::default(), Some(consumer_status)), ) }) .unwrap_or(Ok(())), diff --git a/templates/channel.rs b/templates/channel.rs index 1d6d7c20..c9bb0e2d 100644 --- a/templates/channel.rs +++ b/templates/channel.rs @@ -63,7 +63,7 @@ impl Channel { {{#unless method.metadata.skip ~}} {{#if method.c2s ~}} #[allow(clippy::too_many_arguments)] -{{include_more class.name method.name}}{{#unless method.metadata.require_wrapper ~}}{{#if method.is_reply ~}}{{#if method.metadata.internal ~}}pub(crate) {{/if ~}}{{else}}pub {{#if method.metadata.internal ~}}(crate) {{/if ~}}{{/if ~}}fn {{else}}fn do_{{/unless ~}}{{snake class.name false}}_{{snake method.name false}}(&self{{#unless method.ignore_args ~}}{{#each_argument method.arguments as |argument| ~}}{{#if @argument_is_value ~}}{{#unless argument.force_default ~}}, {{snake argument.name}}: {{#if (use_str_ref argument.type) ~}}&str{{else}}{{argument.type}}{{/if ~}}{{/unless ~}}{{else}}{{#unless argument.ignore_flags ~}}, options: {{camel class.name}}{{camel method.name}}Options{{/unless ~}}{{/if ~}}{{/each_argument ~}}{{/unless ~}}{{#each method.metadata.extra_args as |arg| ~}}, {{arg.name}}: {{arg.type}}{{/each ~}}) -> Promise{{#if method.metadata.confirmation.type ~}}Chain{{/if ~}}<{{#if method.metadata.confirmation.type ~}}{{method.metadata.confirmation.type}}{{else}}(){{/if ~}}> { +{{include_more class.name method.name}}{{#unless method.metadata.require_wrapper ~}}{{#if method.is_reply ~}}{{#if method.metadata.internal ~}}pub(crate) {{/if ~}}{{else}}pub {{#if method.metadata.internal ~}}(crate) {{/if ~}}{{/if ~}}fn {{else}}pub(crate) fn do_{{/unless ~}}{{snake class.name false}}_{{snake method.name false}}(&self{{#unless method.ignore_args ~}}{{#each_argument method.arguments as |argument| ~}}{{#if @argument_is_value ~}}{{#unless argument.force_default ~}}, {{snake argument.name}}: {{#if (use_str_ref argument.type) ~}}&str{{else}}{{argument.type}}{{/if ~}}{{/unless ~}}{{else}}{{#unless argument.ignore_flags ~}}, options: {{camel class.name}}{{camel method.name}}Options{{/unless ~}}{{/if ~}}{{/each_argument ~}}{{/unless ~}}{{#each method.metadata.extra_args as |arg| ~}}, {{arg.name}}: {{arg.type}}{{/each ~}}) -> Promise{{#if method.metadata.confirmation.type ~}}Chain{{/if ~}}<{{#if method.metadata.confirmation.type ~}}{{method.metadata.confirmation.type}}{{else}}(){{/if ~}}> { {{#if method.metadata.channel_init ~}} if !self.status.initializing() { {{else}} @@ -77,7 +77,7 @@ impl Channel { } {{#if method.metadata.start_hook ~}} - {{#if method.metadata.start_hook.returns ~}}let start_hook_res = {{/if ~}}self.before_{{snake class.name false}}_{{snake method.name false}}({{#each method.metadata.start_hook.params as |param| ~}}{{#unless @first ~}}, {{/unless ~}}{{param}}{{/each ~}}); + {{#if method.metadata.start_hook.returns ~}}let start_hook_res = {{/if ~}}{{#if method.metadata.start_hook.returns_early ~}} if let Some(res) = {{/if ~}}self.before_{{snake class.name false}}_{{snake method.name false}}({{#each method.metadata.start_hook.params as |param| ~}}{{#unless @first ~}}, {{/unless ~}}{{param}}{{/each ~}}){{#if method.metadata.start_hook.returns_early ~}}{ return res; }{{else}};{{/if ~}} {{/if ~}} {{#each method.metadata.init_clones as |init_clone| ~}} diff --git a/templates/lapin.json b/templates/lapin.json index c07f58c1..d52e77a3 100644 --- a/templates/lapin.json +++ b/templates/lapin.json @@ -482,6 +482,18 @@ }, "cancel": { "metadata": { + "internal": true, + "require_wrapper": true, + "extra_args": [ + { + "name": "consumer_status", + "type": "Option" + } + ], + "start_hook": { + "params": ["consumer_tag", "consumer_status"], + "returns_early": true + }, "nowait_hook": { "fields": ["consumer_tag: consumer_tag.into()"] } From 7bcf903495164511a05fa804200d3eaab2bb4f30 Mon Sep 17 00:00:00 2001 From: Marc-Antoine Perennou Date: Wed, 23 Dec 2020 00:05:23 +0100 Subject: [PATCH 02/19] v1.6.2 Signed-off-by: Marc-Antoine Perennou --- CHANGELOG.md | 6 ++++++ Cargo.toml | 2 +- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0bd72b3e..7a6ed476 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,9 @@ +### 1.6.2 (2020-12-23) + +#### Bug Fixes + +* Fix a potential race condition when automatically canceling consumer on drop + ### 1.6.1 (2020-11-28) #### Bug Fixes diff --git a/Cargo.toml b/Cargo.toml index 39c91808..946d6f93 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "lapin" -version = "1.6.1" +version = "1.6.2" edition = "2018" authors = ["Geoffroy Couprie ", "Marc-Antoine Perennou "] description = "AMQP client library" From 0991dba5b8d177b669fdf212b66135d03d9e9fdb Mon Sep 17 00:00:00 2001 From: Marc-Antoine Perennou Date: Wed, 23 Dec 2020 08:48:22 +0100 Subject: [PATCH 03/19] another auto consumer cancel fix Signed-off-by: Marc-Antoine Perennou --- src/channel.rs | 11 ++++++++--- src/consumers.rs | 8 +++++++- 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/src/channel.rs b/src/channel.rs index 6b599ecf..ad7485f7 100644 --- a/src/channel.rs +++ b/src/channel.rs @@ -195,6 +195,11 @@ impl Channel { Ok(()) } + fn set_closing(&self) { + self.set_state(ChannelState::Closing); + self.consumers.start_cancel(); + } + fn set_closed(&self, error: Error) -> Result<()> { self.set_state(ChannelState::Closed); self.error_publisher_confirms(error.clone()); @@ -530,7 +535,7 @@ impl Channel { if !consumer_status.map_or(false, |consumer_status| consumer_status.state().is_active()) { Some(Promise::new_with_data(Ok(()))) } else { - self.consumers.start_cancel(consumer_tag); + self.consumers.start_cancel_one(consumer_tag); None } } @@ -579,7 +584,7 @@ impl Channel { } fn before_channel_close(&self) { - self.set_state(ChannelState::Closing); + self.set_closing(); } fn on_channel_close_ok_sent(&self, error: Error) -> Result<()> { @@ -877,7 +882,7 @@ impl Channel { info!("Channel closed on channel {}: {:?}", self.id, method); Error::InvalidChannelState(ChannelState::Closing) }); - self.set_state(ChannelState::Closing); + self.set_closing(); self.internal_rpc .register_internal_future(self.channel_close_ok(error)) } diff --git a/src/consumers.rs b/src/consumers.rs index 40c85087..c4552c9f 100644 --- a/src/consumers.rs +++ b/src/consumers.rs @@ -26,7 +26,7 @@ impl Consumers { Ok(()) } - pub(crate) fn start_cancel(&self, consumer_tag: &S) + pub(crate) fn start_cancel_one(&self, consumer_tag: &S) where ShortString: Borrow, { @@ -84,6 +84,12 @@ impl Consumers { .fold(Ok(()), Result::and) } + pub(crate) fn start_cancel(&self) { + for consumer in self.0.lock().values() { + consumer.start_cancel(); + } + } + pub(crate) fn cancel(&self) -> Result<()> { self.0 .lock() From e3032bda50b6cda5a0caefa0feb7a1cf4f4f4a95 Mon Sep 17 00:00:00 2001 From: Marc-Antoine Perennou Date: Wed, 23 Dec 2020 17:03:45 +0100 Subject: [PATCH 04/19] properly set connection as closing when starting to close it Signed-off-by: Marc-Antoine Perennou --- src/connection.rs | 1 + src/generated.rs | 2 +- templates/lapin.json | 1 + 3 files changed, 3 insertions(+), 1 deletion(-) diff --git a/src/connection.rs b/src/connection.rs index b625f73d..b86e60d2 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -234,6 +234,7 @@ impl Connection { } pub fn close(&self, reply_code: ShortUInt, reply_text: &str) -> Promise<()> { + self.channels.set_connection_closing(); self.channels .get(0) .map(|channel0| channel0.connection_close(reply_code, reply_text, 0, 0)) diff --git a/src/generated.rs b/src/generated.rs index d6564df7..da624300 100644 --- a/src/generated.rs +++ b/src/generated.rs @@ -551,7 +551,7 @@ impl Channel { class_id: ShortUInt, method_id: ShortUInt, ) -> Promise<()> { - if !self.status.connected() { + if !self.status.closing() { return Promise::new_with_data(Err(Error::InvalidChannelState(self.status.state()))); } diff --git a/templates/lapin.json b/templates/lapin.json index d52e77a3..6ada32f8 100644 --- a/templates/lapin.json +++ b/templates/lapin.json @@ -50,6 +50,7 @@ }, "close": { "metadata": { + "channel_deinit": true, "internal": true, "end_hook": true } From 87d6e1a4d91a59341fcd132887ee60491d694c54 Mon Sep 17 00:00:00 2001 From: Marc-Antoine Perennou Date: Wed, 23 Dec 2020 17:14:58 +0100 Subject: [PATCH 05/19] consumer: don't auto cancel when channel is closing Signed-off-by: Marc-Antoine Perennou --- src/internal_rpc.rs | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/src/internal_rpc.rs b/src/internal_rpc.rs index 6b39305c..2dadd5c2 100644 --- a/src/internal_rpc.rs +++ b/src/internal_rpc.rs @@ -220,9 +220,13 @@ impl InternalRPC { CancelConsumer(channel_id, consumer_tag, consumer_status) => channels .get(channel_id) .map(|channel| { - self.handle.register_internal_future( - channel.do_basic_cancel(&consumer_tag, BasicCancelOptions::default(), Some(consumer_status)), - ) + if channel.status().connected() { + self.handle.register_internal_future( + channel.do_basic_cancel(&consumer_tag, BasicCancelOptions::default(), Some(consumer_status)), + ) + } else { + Ok(()) + } }) .unwrap_or(Ok(())), CloseChannel(channel_id, reply_code, reply_text) => channels From 6dc4f6924b76aa2ad766699fdac6f5532a98cdf2 Mon Sep 17 00:00:00 2001 From: Marc-Antoine Perennou Date: Wed, 23 Dec 2020 17:15:50 +0100 Subject: [PATCH 06/19] v1.6.3 Signed-off-by: Marc-Antoine Perennou --- CHANGELOG.md | 6 ++++++ Cargo.toml | 2 +- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7a6ed476..9a8366b0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,9 @@ +### 1.6.3 (2020-12-23) + +#### Bug Fixes + +* Better handling of automatic consumer cancelation when connection is closing + ### 1.6.2 (2020-12-23) #### Bug Fixes diff --git a/Cargo.toml b/Cargo.toml index 946d6f93..d7f3079f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "lapin" -version = "1.6.2" +version = "1.6.3" edition = "2018" authors = ["Geoffroy Couprie ", "Marc-Antoine Perennou "] description = "AMQP client library" From 9fcb7ac92e763bfcd74a752e6d4743a304d52435 Mon Sep 17 00:00:00 2001 From: Marc-Antoine Perennou Date: Wed, 23 Dec 2020 20:07:51 +0100 Subject: [PATCH 07/19] properly handle missing replies before close-ok Fixes #315 Signed-off-by: Marc-Antoine Perennou --- src/channel.rs | 4 ++++ src/frames.rs | 16 ++++++++++++++++ src/generated.rs | 2 +- templates/channel.rs | 2 +- templates/lapin.json | 1 + 5 files changed, 23 insertions(+), 2 deletions(-) diff --git a/src/channel.rs b/src/channel.rs index ad7485f7..49378dea 100644 --- a/src/channel.rs +++ b/src/channel.rs @@ -583,6 +583,10 @@ impl Channel { Ok(()) } + fn next_expected_close_ok_reply(&self) -> Option { + self.frames.next_expected_close_ok_reply(self.id, Error::InvalidChannelState(ChannelState::Closed)) + } + fn before_channel_close(&self) { self.set_closing(); } diff --git a/src/frames.rs b/src/frames.rs index d39cec05..e8a661d8 100644 --- a/src/frames.rs +++ b/src/frames.rs @@ -59,6 +59,10 @@ impl Frames { .map(|t| t.0) } + pub(crate) fn next_expected_close_ok_reply(&self, channel_id: u16, error: Error) -> Option { + self.inner.lock().next_expected_close_ok_reply(channel_id, error) + } + pub(crate) fn has_pending(&self) -> bool { self.inner.lock().has_pending() } @@ -218,6 +222,18 @@ impl Inner { } } + fn next_expected_close_ok_reply(&mut self, channel_id: u16, error: Error) -> Option { + let expected_replies = self.expected_replies.get_mut(&channel_id)?; + while let Some(reply) = expected_replies.pop_front() { + match &reply.0 { + Reply::ChannelCloseOk(_) => return Some(reply.0), + Reply::BasicCancelOk(pinky) => pinky.swear(Ok(())), // Channel close means consumer is canceled automatically + _ => reply.1.cancel(error.clone()), + } + } + None + } + fn clear_expected_replies(&mut self, channel_id: u16, error: Error) { if let Some(replies) = self.expected_replies.remove(&channel_id) { Self::cancel_expected_replies(replies, error); diff --git a/src/generated.rs b/src/generated.rs index da624300..17b87d30 100644 --- a/src/generated.rs +++ b/src/generated.rs @@ -940,7 +940,7 @@ impl Channel { return Err(Error::InvalidChannelState(self.status.state())); } - match self.frames.next_expected_reply(self.id) { + match self.next_expected_close_ok_reply() { Some(Reply::ChannelCloseOk(resolver)) => { let res = self.on_channel_close_ok_received(); resolver.swear(res.clone()); diff --git a/templates/channel.rs b/templates/channel.rs index c9bb0e2d..bd6ddb95 100644 --- a/templates/channel.rs +++ b/templates/channel.rs @@ -168,7 +168,7 @@ impl Channel { return Err(Error::InvalidChannelState(self.status.state())); } - match self.frames.next_expected_reply(self.id) { + match {{#if method.metadata.expected_reply_getter ~}}{{method.metadata.expected_reply_getter}}{{else}}self.frames.next_expected_reply(self.id){{/if ~}} { Some(Reply::{{camel class.name}}{{camel method.name}}(resolver{{#each method.metadata.state as |state| ~}}, {{state.name}}{{/each ~}})) => { {{#unless method.metadata.confirmation.type ~}}let res ={{/unless ~}} {{#if method.arguments ~}} diff --git a/templates/lapin.json b/templates/lapin.json index 6ada32f8..041be55a 100644 --- a/templates/lapin.json +++ b/templates/lapin.json @@ -124,6 +124,7 @@ "metadata": { "channel_deinit": true, "received_hook": true, + "expected_reply_getter": "self.next_expected_close_ok_reply()", "extra_args": [ { "name": "error", From 302df57cdcab70a255c832abef241ea2f3261f38 Mon Sep 17 00:00:00 2001 From: Marc-Antoine Perennou Date: Wed, 23 Dec 2020 20:20:26 +0100 Subject: [PATCH 08/19] simplify recent basic_cancel tweaks Signed-off-by: Marc-Antoine Perennou --- src/channel.rs | 18 ++---------------- src/generated.rs | 19 ++++++------------- src/internal_rpc.rs | 4 ++-- templates/channel.rs | 4 ++-- templates/lapin.json | 11 +---------- 5 files changed, 13 insertions(+), 43 deletions(-) diff --git a/src/channel.rs b/src/channel.rs index 49378dea..d1f1b3af 100644 --- a/src/channel.rs +++ b/src/channel.rs @@ -8,7 +8,6 @@ use crate::{ connection_closer::ConnectionCloser, connection_status::{ConnectionState, ConnectionStep}, consumer::Consumer, - consumer_status::ConsumerStatus, consumers::Consumers, executor::Executor, frames::{ExpectedReply, Frames}, @@ -523,21 +522,8 @@ impl Channel { } } - pub fn basic_cancel( - &self, - consumer_tag: &str, - options: BasicCancelOptions, - ) -> Promise<()> { - self.do_basic_cancel(consumer_tag, options, None) - } - - fn before_basic_cancel(&self, consumer_tag: &str, consumer_status: Option) -> Option> { - if !consumer_status.map_or(false, |consumer_status| consumer_status.state().is_active()) { - Some(Promise::new_with_data(Ok(()))) - } else { - self.consumers.start_cancel_one(consumer_tag); - None - } + fn before_basic_cancel(&self, consumer_tag: &str) { + self.consumers.start_cancel_one(consumer_tag); } fn acknowledgement_error(&self, error: AMQPError, class_id: u16, method_id: u16) -> Result<()> { diff --git a/src/generated.rs b/src/generated.rs index 17b87d30..fd11f091 100644 --- a/src/generated.rs +++ b/src/generated.rs @@ -868,7 +868,7 @@ impl Channel { } } #[allow(clippy::too_many_arguments)] - pub(crate) fn do_channel_close( + pub fn do_channel_close( &self, reply_code: ShortUInt, reply_text: &str, @@ -1021,7 +1021,7 @@ impl Channel { } } #[allow(clippy::too_many_arguments)] - pub(crate) fn do_exchange_declare( + pub fn do_exchange_declare( &self, exchange: &str, kind: &str, @@ -1749,7 +1749,7 @@ impl Channel { } } #[allow(clippy::too_many_arguments)] - pub(crate) fn do_basic_consume( + pub fn do_basic_consume( &self, queue: &str, consumer_tag: &str, @@ -1848,19 +1848,12 @@ impl Channel { } } #[allow(clippy::too_many_arguments)] - pub(crate) fn do_basic_cancel( - &self, - consumer_tag: &str, - options: BasicCancelOptions, - consumer_status: Option, - ) -> Promise<()> { + pub fn basic_cancel(&self, consumer_tag: &str, options: BasicCancelOptions) -> Promise<()> { if !self.status.connected() { return Promise::new_with_data(Err(Error::InvalidChannelState(self.status.state()))); } - if let Some(res) = self.before_basic_cancel(consumer_tag, consumer_status) { - return res; - } + self.before_basic_cancel(consumer_tag); let BasicCancelOptions { nowait } = options; let method = AMQPClass::Basic(protocol::basic::AMQPMethod::Cancel( protocol::basic::Cancel { @@ -1987,7 +1980,7 @@ impl Channel { self.on_basic_deliver_received(method) } #[allow(clippy::too_many_arguments)] - pub(crate) fn do_basic_get( + pub fn do_basic_get( &self, queue: &str, options: BasicGetOptions, diff --git a/src/internal_rpc.rs b/src/internal_rpc.rs index 2dadd5c2..a67aefa4 100644 --- a/src/internal_rpc.rs +++ b/src/internal_rpc.rs @@ -220,9 +220,9 @@ impl InternalRPC { CancelConsumer(channel_id, consumer_tag, consumer_status) => channels .get(channel_id) .map(|channel| { - if channel.status().connected() { + if channel.status().connected() && consumer_status.state().is_active() { self.handle.register_internal_future( - channel.do_basic_cancel(&consumer_tag, BasicCancelOptions::default(), Some(consumer_status)), + channel.basic_cancel(&consumer_tag, BasicCancelOptions::default()), ) } else { Ok(()) diff --git a/templates/channel.rs b/templates/channel.rs index bd6ddb95..9ad0b7d8 100644 --- a/templates/channel.rs +++ b/templates/channel.rs @@ -63,7 +63,7 @@ impl Channel { {{#unless method.metadata.skip ~}} {{#if method.c2s ~}} #[allow(clippy::too_many_arguments)] -{{include_more class.name method.name}}{{#unless method.metadata.require_wrapper ~}}{{#if method.is_reply ~}}{{#if method.metadata.internal ~}}pub(crate) {{/if ~}}{{else}}pub {{#if method.metadata.internal ~}}(crate) {{/if ~}}{{/if ~}}fn {{else}}pub(crate) fn do_{{/unless ~}}{{snake class.name false}}_{{snake method.name false}}(&self{{#unless method.ignore_args ~}}{{#each_argument method.arguments as |argument| ~}}{{#if @argument_is_value ~}}{{#unless argument.force_default ~}}, {{snake argument.name}}: {{#if (use_str_ref argument.type) ~}}&str{{else}}{{argument.type}}{{/if ~}}{{/unless ~}}{{else}}{{#unless argument.ignore_flags ~}}, options: {{camel class.name}}{{camel method.name}}Options{{/unless ~}}{{/if ~}}{{/each_argument ~}}{{/unless ~}}{{#each method.metadata.extra_args as |arg| ~}}, {{arg.name}}: {{arg.type}}{{/each ~}}) -> Promise{{#if method.metadata.confirmation.type ~}}Chain{{/if ~}}<{{#if method.metadata.confirmation.type ~}}{{method.metadata.confirmation.type}}{{else}}(){{/if ~}}> { +{{include_more class.name method.name}}{{#unless method.metadata.require_wrapper ~}}{{#if method.is_reply ~}}{{#if method.metadata.internal ~}}pub(crate) {{/if ~}}{{else}}pub {{#if method.metadata.internal ~}}(crate) {{/if ~}}{{/if ~}}fn {{else}}pub fn do_{{/unless ~}}{{snake class.name false}}_{{snake method.name false}}(&self{{#unless method.ignore_args ~}}{{#each_argument method.arguments as |argument| ~}}{{#if @argument_is_value ~}}{{#unless argument.force_default ~}}, {{snake argument.name}}: {{#if (use_str_ref argument.type) ~}}&str{{else}}{{argument.type}}{{/if ~}}{{/unless ~}}{{else}}{{#unless argument.ignore_flags ~}}, options: {{camel class.name}}{{camel method.name}}Options{{/unless ~}}{{/if ~}}{{/each_argument ~}}{{/unless ~}}{{#each method.metadata.extra_args as |arg| ~}}, {{arg.name}}: {{arg.type}}{{/each ~}}) -> Promise{{#if method.metadata.confirmation.type ~}}Chain{{/if ~}}<{{#if method.metadata.confirmation.type ~}}{{method.metadata.confirmation.type}}{{else}}(){{/if ~}}> { {{#if method.metadata.channel_init ~}} if !self.status.initializing() { {{else}} @@ -77,7 +77,7 @@ impl Channel { } {{#if method.metadata.start_hook ~}} - {{#if method.metadata.start_hook.returns ~}}let start_hook_res = {{/if ~}}{{#if method.metadata.start_hook.returns_early ~}} if let Some(res) = {{/if ~}}self.before_{{snake class.name false}}_{{snake method.name false}}({{#each method.metadata.start_hook.params as |param| ~}}{{#unless @first ~}}, {{/unless ~}}{{param}}{{/each ~}}){{#if method.metadata.start_hook.returns_early ~}}{ return res; }{{else}};{{/if ~}} + {{#if method.metadata.start_hook.returns ~}}let start_hook_res = {{/if ~}}self.before_{{snake class.name false}}_{{snake method.name false}}({{#each method.metadata.start_hook.params as |param| ~}}{{#unless @first ~}}, {{/unless ~}}{{param}}{{/each ~}}); {{/if ~}} {{#each method.metadata.init_clones as |init_clone| ~}} diff --git a/templates/lapin.json b/templates/lapin.json index 041be55a..411ab30d 100644 --- a/templates/lapin.json +++ b/templates/lapin.json @@ -484,17 +484,8 @@ }, "cancel": { "metadata": { - "internal": true, - "require_wrapper": true, - "extra_args": [ - { - "name": "consumer_status", - "type": "Option" - } - ], "start_hook": { - "params": ["consumer_tag", "consumer_status"], - "returns_early": true + "params": ["consumer_tag"] }, "nowait_hook": { "fields": ["consumer_tag: consumer_tag.into()"] From 5abc46c676adfdc07164e2f1c114212081a56100 Mon Sep 17 00:00:00 2001 From: Marc-Antoine Perennou Date: Wed, 23 Dec 2020 20:26:12 +0100 Subject: [PATCH 09/19] connection close hook is no longer necessary Signed-off-by: Marc-Antoine Perennou --- src/channel.rs | 5 ----- src/generated.rs | 4 ---- templates/lapin.json | 3 +-- 3 files changed, 1 insertion(+), 11 deletions(-) diff --git a/src/channel.rs b/src/channel.rs index d1f1b3af..052a62e7 100644 --- a/src/channel.rs +++ b/src/channel.rs @@ -555,11 +555,6 @@ impl Channel { Ok(()) } - fn on_connection_close_sent(&self) -> Result<()> { - self.internal_rpc.set_connection_closing(); - Ok(()) - } - fn on_connection_close_ok_sent(&self, error: Error) -> Result<()> { if let Error::ProtocolError(_) = error { self.internal_rpc.set_connection_error(error); diff --git a/src/generated.rs b/src/generated.rs index fd11f091..24549b0d 100644 --- a/src/generated.rs +++ b/src/generated.rs @@ -580,10 +580,6 @@ impl Channel { Box::new(resolver), )), ); - let end_hook_res = self.on_connection_close_sent(); - if let Err(err) = end_hook_res { - return Promise::new_with_data(Err(err)); - } promise } diff --git a/templates/lapin.json b/templates/lapin.json index 411ab30d..e18c841a 100644 --- a/templates/lapin.json +++ b/templates/lapin.json @@ -51,8 +51,7 @@ "close": { "metadata": { "channel_deinit": true, - "internal": true, - "end_hook": true + "internal": true } }, "close-ok": { From ab9c0f8b258cb6a8c463724ebe06755aba81dd8c Mon Sep 17 00:00:00 2001 From: Marc-Antoine Perennou Date: Wed, 23 Dec 2020 20:28:09 +0100 Subject: [PATCH 10/19] v1.6.4 Signed-off-by: Marc-Antoine Perennou --- CHANGELOG.md | 7 +++++++ Cargo.toml | 2 +- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9a8366b0..698a28a3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,10 @@ +### 1.6.4 (2020-12-23) + +#### Bug Fixes + +* Cleanup basic cancel handling +* Fix undocumented rabbitmq behaviour on channel close with expected replies + ### 1.6.3 (2020-12-23) #### Bug Fixes diff --git a/Cargo.toml b/Cargo.toml index d7f3079f..6d401c30 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "lapin" -version = "1.6.3" +version = "1.6.4" edition = "2018" authors = ["Geoffroy Couprie ", "Marc-Antoine Perennou "] description = "AMQP client library" From 1b262d26dc5a2cb5b125eae839ca21ec7e692eff Mon Sep 17 00:00:00 2001 From: Marc-Antoine Perennou Date: Wed, 23 Dec 2020 20:37:16 +0100 Subject: [PATCH 11/19] frames: consider a channel close/error as a success for basic cancel Signed-off-by: Marc-Antoine Perennou --- src/frames.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/frames.rs b/src/frames.rs index e8a661d8..259636ed 100644 --- a/src/frames.rs +++ b/src/frames.rs @@ -241,8 +241,11 @@ impl Inner { } fn cancel_expected_replies(replies: VecDeque, error: Error) { - for ExpectedReply(_, cancel) in replies { - cancel.cancel(error.clone()); + for ExpectedReply(reply, cancel) in replies { + match reply { + Reply::BasicCancelOk(pinky) => pinky.swear(Ok(())), + _ => cancel.cancel(error.clone()), + } } } } From 445a2eba47eb22f94b479ff3a22770e5c55affde Mon Sep 17 00:00:00 2001 From: Marc-Antoine Perennou Date: Wed, 23 Dec 2020 20:42:32 +0100 Subject: [PATCH 12/19] consider errors for basic cancel as success With an error, the cosumer is always de facto canceled Signed-off-by: Marc-Antoine Perennou --- src/frames.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/frames.rs b/src/frames.rs index 259636ed..c0b0bbb7 100644 --- a/src/frames.rs +++ b/src/frames.rs @@ -1,5 +1,5 @@ use crate::{channel::Reply, Error, Promise, PromiseResolver}; -use amq_protocol::frame::AMQPFrame; +use amq_protocol::{frame::AMQPFrame, protocol::{AMQPClass, basic::AMQPMethod}}; use log::{log_enabled, trace, Level::Trace}; use parking_lot::Mutex; use pinky_swear::Cancellable; @@ -215,9 +215,12 @@ impl Inner { frames: &mut VecDeque<(AMQPFrame, Option>)>, error: Error, ) { - for (_, resolver) in std::mem::take(frames) { + for (frame, resolver) in std::mem::take(frames) { if let Some(resolver) = resolver { - resolver.swear(Err(error.clone())); + match frame { + AMQPFrame::Method(_, AMQPClass::Basic(AMQPMethod::Cancel(_))) => resolver.swear(Ok(())), + _ => resolver.swear(Err(error.clone())), + } } } } From 81e2fcbea2ee7dd7e717a2df69ef63c8a7c94878 Mon Sep 17 00:00:00 2001 From: Marc-Antoine Perennou Date: Wed, 23 Dec 2020 20:43:18 +0100 Subject: [PATCH 13/19] v1.6.5 Signed-off-by: Marc-Antoine Perennou --- CHANGELOG.md | 6 ++++++ Cargo.toml | 2 +- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 698a28a3..86484f2c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,9 @@ +### 1.6.5 (2020-12-23) + +#### Bug Fixes + +* Treat close and error as success for consumers cancel as it forcibly implies cancelation + ### 1.6.4 (2020-12-23) #### Bug Fixes diff --git a/Cargo.toml b/Cargo.toml index 6d401c30..d812121d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "lapin" -version = "1.6.4" +version = "1.6.5" edition = "2018" authors = ["Geoffroy Couprie ", "Marc-Antoine Perennou "] description = "AMQP client library" From 338b7a666445dc1d18c5a30e1f7b39cf2e4f884f Mon Sep 17 00:00:00 2001 From: Marc-Antoine Perennou Date: Wed, 23 Dec 2020 21:03:41 +0100 Subject: [PATCH 14/19] rustfmt Signed-off-by: Marc-Antoine Perennou --- src/channel.rs | 3 ++- src/consumer_canceler.rs | 7 +++++-- src/frames.rs | 19 +++++++++++++++---- src/internal_rpc.rs | 13 +++++++++++-- 4 files changed, 33 insertions(+), 9 deletions(-) diff --git a/src/channel.rs b/src/channel.rs index 052a62e7..6814950a 100644 --- a/src/channel.rs +++ b/src/channel.rs @@ -565,7 +565,8 @@ impl Channel { } fn next_expected_close_ok_reply(&self) -> Option { - self.frames.next_expected_close_ok_reply(self.id, Error::InvalidChannelState(ChannelState::Closed)) + self.frames + .next_expected_close_ok_reply(self.id, Error::InvalidChannelState(ChannelState::Closed)) } fn before_channel_close(&self) { diff --git a/src/consumer_canceler.rs b/src/consumer_canceler.rs index 1c379c2e..327e3b64 100644 --- a/src/consumer_canceler.rs +++ b/src/consumer_canceler.rs @@ -30,8 +30,11 @@ impl Drop for ConsumerCanceler { fn drop(&mut self) { let status = self.status.lock(); if status.state() == ConsumerState::Active { - self.internal_rpc - .cancel_consumer(self.channel_id, self.consumer_tag.clone(), self.status.clone()); + self.internal_rpc.cancel_consumer( + self.channel_id, + self.consumer_tag.clone(), + self.status.clone(), + ); } } } diff --git a/src/frames.rs b/src/frames.rs index c0b0bbb7..fdbe413c 100644 --- a/src/frames.rs +++ b/src/frames.rs @@ -1,5 +1,8 @@ use crate::{channel::Reply, Error, Promise, PromiseResolver}; -use amq_protocol::{frame::AMQPFrame, protocol::{AMQPClass, basic::AMQPMethod}}; +use amq_protocol::{ + frame::AMQPFrame, + protocol::{basic::AMQPMethod, AMQPClass}, +}; use log::{log_enabled, trace, Level::Trace}; use parking_lot::Mutex; use pinky_swear::Cancellable; @@ -59,8 +62,14 @@ impl Frames { .map(|t| t.0) } - pub(crate) fn next_expected_close_ok_reply(&self, channel_id: u16, error: Error) -> Option { - self.inner.lock().next_expected_close_ok_reply(channel_id, error) + pub(crate) fn next_expected_close_ok_reply( + &self, + channel_id: u16, + error: Error, + ) -> Option { + self.inner + .lock() + .next_expected_close_ok_reply(channel_id, error) } pub(crate) fn has_pending(&self) -> bool { @@ -218,7 +227,9 @@ impl Inner { for (frame, resolver) in std::mem::take(frames) { if let Some(resolver) = resolver { match frame { - AMQPFrame::Method(_, AMQPClass::Basic(AMQPMethod::Cancel(_))) => resolver.swear(Ok(())), + AMQPFrame::Method(_, AMQPClass::Basic(AMQPMethod::Cancel(_))) => { + resolver.swear(Ok(())) + } _ => resolver.swear(Err(error.clone())), } } diff --git a/src/internal_rpc.rs b/src/internal_rpc.rs index a67aefa4..2411472d 100644 --- a/src/internal_rpc.rs +++ b/src/internal_rpc.rs @@ -69,8 +69,17 @@ impl InternalRPCHandle { )); } - pub(crate) fn cancel_consumer(&self, channel_id: u16, consumer_tag: String, consumer_status: ConsumerStatus) { - self.send(InternalCommand::CancelConsumer(channel_id, consumer_tag, consumer_status)); + pub(crate) fn cancel_consumer( + &self, + channel_id: u16, + consumer_tag: String, + consumer_status: ConsumerStatus, + ) { + self.send(InternalCommand::CancelConsumer( + channel_id, + consumer_tag, + consumer_status, + )); } pub(crate) fn close_channel(&self, channel_id: u16, reply_code: ShortUInt, reply_text: String) { From 0959c166b58726106abbbf1ae5963f5a9ed83826 Mon Sep 17 00:00:00 2001 From: Marc-Antoine Perennou Date: Wed, 23 Dec 2020 21:46:57 +0100 Subject: [PATCH 15/19] update to tokio 1.0.0 Signed-off-by: Marc-Antoine Perennou --- tokio/Cargo.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tokio/Cargo.toml b/tokio/Cargo.toml index 38dafcc8..d4a32945 100644 --- a/tokio/Cargo.toml +++ b/tokio/Cargo.toml @@ -20,7 +20,7 @@ path = ".." default-features = false [dependencies.tokio] -version = "^0.3.4" +version = "^1.0" default-features = false features = ["net", "rt", "time"] @@ -32,6 +32,6 @@ env_logger = "^0.8" log = "^0.4" [dev-dependencies.tokio] -version = "^0.3.4" +version = "^1.0" default-features = false features = ["macros", "net", "rt", "rt-multi-thread", "time"] From b5a4e7bd03c09edc448716f1e88002cee4df73bb Mon Sep 17 00:00:00 2001 From: Marc-Antoine Perennou Date: Wed, 23 Dec 2020 21:48:13 +0100 Subject: [PATCH 16/19] tokio-amqp: v1.0.0 Signed-off-by: Marc-Antoine Perennou --- tokio/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/Cargo.toml b/tokio/Cargo.toml index d4a32945..66cf430f 100644 --- a/tokio/Cargo.toml +++ b/tokio/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "tokio-amqp" -version = "0.3.1" +version = "1.0.0" edition = "2018" authors = ["Marc-Antoine Perennou "] description = "lapin integration with tokio" From 39eaba0797287ed2790a9e74bc6b6eb162a064d6 Mon Sep 17 00:00:00 2001 From: Marc-Antoine Perennou Date: Wed, 23 Dec 2020 22:34:24 +0100 Subject: [PATCH 17/19] lapin-async-global-executor: v2.0.0 Signed-off-by: Marc-Antoine Perennou --- async-global-executor/Cargo.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/async-global-executor/Cargo.toml b/async-global-executor/Cargo.toml index 70af05ed..5ed2f755 100644 --- a/async-global-executor/Cargo.toml +++ b/async-global-executor/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "lapin-async-global-executor" -version = "1.1.0" +version = "2.0.0" edition = "2018" authors = ["Marc-Antoine Perennou "] description = "lapin integration with async-global-executor" @@ -16,7 +16,7 @@ default = ["lapin/default", "async-io"] async-io = ["async-global-executor/async-io", "async-lapin"] [dependencies.async-global-executor] -version = "^1.4.3" +version = "^2.0" default-features = false [dependencies.async-lapin] From 113be3ae8cbdc48497f2e3b520b7305b05beb42d Mon Sep 17 00:00:00 2001 From: Marc-Antoine Perennou Date: Fri, 25 Dec 2020 17:30:41 +0100 Subject: [PATCH 18/19] fix automatic conneciton closing Signed-off-by: Marc-Antoine Perennou --- src/internal_rpc.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/internal_rpc.rs b/src/internal_rpc.rs index 2411472d..871322b0 100644 --- a/src/internal_rpc.rs +++ b/src/internal_rpc.rs @@ -95,6 +95,7 @@ impl InternalRPCHandle { class_id: ShortUInt, method_id: ShortUInt, ) { + self.set_connection_closing(); self.send(InternalCommand::CloseConnection( reply_code, reply_text, class_id, method_id, )); From 5785da0e5f3c79d0b86d279b70264d8aeb13c817 Mon Sep 17 00:00:00 2001 From: Marc-Antoine Perennou Date: Fri, 25 Dec 2020 17:33:06 +0100 Subject: [PATCH 19/19] v1.6.6 Signed-off-by: Marc-Antoine Perennou --- CHANGELOG.md | 6 ++++++ Cargo.toml | 2 +- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 86484f2c..27ab79fe 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,9 @@ +### 1.6.6 (2020-12-25) + +#### Bug Fixes + +* Fix an issue with automatic connection close on drop + ### 1.6.5 (2020-12-23) #### Bug Fixes diff --git a/Cargo.toml b/Cargo.toml index d812121d..85d2b4b9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "lapin" -version = "1.6.5" +version = "1.6.6" edition = "2018" authors = ["Geoffroy Couprie ", "Marc-Antoine Perennou "] description = "AMQP client library"