forked from amqp-rs/lapin
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathchannel_receiver_state.rs
109 lines (102 loc) · 3.44 KB
/
channel_receiver_state.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
use crate::{
types::{ChannelId, Identifier, PayloadSize, ShortString},
Result,
};
use std::collections::VecDeque;
#[derive(Debug, Default)]
pub(crate) struct ChannelReceiverStates(VecDeque<ChannelReceiverState>);
impl ChannelReceiverStates {
#[cfg(test)]
pub(crate) fn receiver_state(&self) -> ChannelReceiverState {
self.0.front().unwrap().clone()
}
pub(crate) fn set_will_receive(&mut self, class_id: Identifier, delivery_cause: DeliveryCause) {
self.0.push_back(ChannelReceiverState::WillReceiveContent(
class_id,
delivery_cause,
));
}
pub(crate) fn set_content_length<
Handler: FnOnce(&DeliveryCause, bool),
OnInvalidClass: FnOnce(String) -> Result<()>,
OnError: FnOnce(String) -> Result<()>,
>(
&mut self,
channel_id: ChannelId,
class_id: Identifier,
length: PayloadSize,
handler: Handler,
invalid_class_hanlder: OnInvalidClass,
error_handler: OnError,
confirm_mode: bool,
) -> Result<()> {
if let Some(ChannelReceiverState::WillReceiveContent(expected_class_id, delivery_cause)) =
self.0.pop_front()
{
if expected_class_id == class_id {
handler(&delivery_cause, confirm_mode);
if length > 0 {
self.0.push_front(ChannelReceiverState::ReceivingContent(
delivery_cause,
length,
));
}
Ok(())
} else {
invalid_class_hanlder(format!(
"content header frame with class id {} instead of {} received on channel {}",
class_id, expected_class_id, channel_id
))
}
} else {
error_handler(format!(
"unexpected content header frame received on channel {}",
channel_id
))
}
}
pub(crate) fn receive<
Handler: FnOnce(&DeliveryCause, PayloadSize, bool),
OnError: FnOnce(String) -> Result<()>,
>(
&mut self,
channel_id: ChannelId,
length: PayloadSize,
handler: Handler,
error_handler: OnError,
confirm_mode: bool,
) -> Result<()> {
if let Some(ChannelReceiverState::ReceivingContent(delivery_cause, len)) =
self.0.pop_front()
{
if let Some(remaining) = len.checked_sub(length) {
handler(&delivery_cause, remaining, confirm_mode);
if remaining > 0 {
self.0.push_front(ChannelReceiverState::ReceivingContent(
delivery_cause,
remaining,
));
}
Ok(())
} else {
error_handler(format!("unexpectedly large content body frame received on channel {} ({} bytes, expected {} bytes)", channel_id, length, len))
}
} else {
error_handler(format!(
"unexpected content body frame received on channel {}",
channel_id
))
}
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub(crate) enum ChannelReceiverState {
WillReceiveContent(Identifier, DeliveryCause),
ReceivingContent(DeliveryCause, PayloadSize),
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub(crate) enum DeliveryCause {
Consume(ShortString),
Get,
Return,
}