forked from amqp-rs/lapin
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathconsumers.rs
113 lines (100 loc) · 3.1 KB
/
consumers.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
use crate::{
consumer::Consumer,
message::Delivery,
topology_internal::ConsumerDefinitionInternal,
types::{PayloadSize, ShortString},
BasicProperties, Error,
};
use parking_lot::Mutex;
use std::{borrow::Borrow, collections::HashMap, fmt, hash::Hash, sync::Arc};
#[derive(Clone, Default)]
pub(crate) struct Consumers(Arc<Mutex<HashMap<ShortString, Consumer>>>);
impl Consumers {
pub(crate) fn register(&self, tag: ShortString, consumer: Consumer) {
self.0.lock().insert(tag, consumer);
}
pub(crate) fn deregister<S: Hash + Eq + ?Sized>(&self, consumer_tag: &S)
where
ShortString: Borrow<S>,
{
if let Some(consumer) = self.0.lock().remove(consumer_tag) {
consumer.cancel();
}
}
pub(crate) fn start_cancel_one<S: Hash + Eq + ?Sized>(&self, consumer_tag: &S)
where
ShortString: Borrow<S>,
{
if let Some(consumer) = self.0.lock().get(consumer_tag) {
consumer.start_cancel();
}
}
pub(crate) fn start_delivery<S: Hash + Eq + ?Sized>(&self, consumer_tag: &S, message: Delivery)
where
ShortString: Borrow<S>,
{
if let Some(consumer) = self.0.lock().get_mut(consumer_tag) {
consumer.start_new_delivery(message);
}
}
pub(crate) fn handle_content_header_frame<S: Hash + Eq + ?Sized>(
&self,
consumer_tag: &S,
size: PayloadSize,
properties: BasicProperties,
) where
ShortString: Borrow<S>,
{
if let Some(consumer) = self.0.lock().get_mut(consumer_tag) {
consumer.handle_content_header_frame(size, properties);
}
}
pub(crate) fn handle_body_frame<S: Hash + Eq + ?Sized>(
&self,
consumer_tag: &S,
remaining_size: PayloadSize,
payload: Vec<u8>,
) where
ShortString: Borrow<S>,
{
if let Some(consumer) = self.0.lock().get_mut(consumer_tag) {
consumer.handle_body_frame(remaining_size, payload);
}
}
pub(crate) fn drop_prefetched_messages(&self) {
for consumer in self.0.lock().values() {
consumer.drop_prefetched_messages();
}
}
pub(crate) fn start_cancel(&self) {
for consumer in self.0.lock().values() {
consumer.start_cancel();
}
}
pub(crate) fn cancel(&self) {
for (_, consumer) in self.0.lock().drain() {
consumer.cancel();
}
}
pub(crate) fn error(&self, error: Error) {
for (_, consumer) in self.0.lock().drain() {
consumer.set_error(error.clone());
}
}
pub(crate) fn topology(&self) -> Vec<ConsumerDefinitionInternal> {
self.0
.lock()
.values()
.map(|consumer| ConsumerDefinitionInternal::new(consumer.clone()))
.collect()
}
}
impl fmt::Debug for Consumers {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let mut debug = f.debug_tuple("Consumers");
if let Some(consumers) = self.0.try_lock() {
debug.field(&*consumers);
}
debug.finish()
}
}