forked from rnadigital/agentcloud
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathclient.rs
122 lines (115 loc) · 3.95 KB
/
client.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
use crate::rabbitmq::models::RabbitConnect;
use amqp_serde::types::{FieldTable, FieldValue, ShortStr};
use amqprs::channel::{Channel, ExchangeDeclareArguments};
use amqprs::{
callbacks::{DefaultChannelCallback, DefaultConnectionCallback},
channel::{BasicQosArguments, QueueBindArguments, QueueDeclareArguments},
connection::{Connection, OpenConnectionArguments},
};
use tokio::time::{sleep, Duration};
pub async fn connect_rabbitmq(connection_details: &RabbitConnect) -> Connection {
let mut res = Connection::open(
OpenConnectionArguments::new(
&connection_details.host,
connection_details.port,
&connection_details.username,
&connection_details.password,
)
.virtual_host("/"),
)
.await;
while res.is_err() {
println!("trying to connect after error");
sleep(Duration::from_millis(2000)).await;
res = Connection::open(&OpenConnectionArguments::new(
&connection_details.host,
connection_details.port,
&connection_details.username,
&connection_details.password,
))
.await;
}
let connection = res.unwrap();
connection
.register_callback(DefaultConnectionCallback)
.await
.unwrap();
connection
}
pub async fn channel_rabbitmq(connection: &Connection) -> Channel {
let channel = connection.open_channel(None).await.unwrap();
channel
.register_callback(DefaultChannelCallback)
.await
.unwrap();
channel
}
pub async fn bind_queue_to_exchange(
connection: &mut Connection,
channel: &mut Channel,
connection_details: &RabbitConnect,
exchange: &str,
queue: &str,
routing_key: &str,
) {
if !connection.is_open() {
println!("Connection not open");
*connection = connect_rabbitmq(connection_details).await;
*channel = channel_rabbitmq(connection).await;
println!("{}", connection);
}
// Declaring the exchange on startup
channel
.exchange_declare(ExchangeDeclareArguments::new(exchange, "direct"))
.await
.unwrap();
// Setting up basic quality-of-service parameters for the channel to enable streaming queue
match channel
.basic_qos(BasicQosArguments {
prefetch_count: 10000,
prefetch_size: 0,
global: false,
})
.await {
Ok(_) => {}
Err(e) => { println!("An error occurred while setting up the channel:{}", e) }
}
// adding queue type as custom arguments to the queue declaration
let mut args: FieldTable = FieldTable::new();
let queue_type_x: ShortStr = "x-queue-type".try_into().unwrap();
let queue_type_q: FieldValue = "stream".into();
args.insert(queue_type_x, queue_type_q);
match channel
.queue_declare(
QueueDeclareArguments::default()
.queue(queue.to_owned())
.durable(true)
.arguments(args)
.finish(),
)
.await {
Ok(queue_option) => {
match queue_option {
Some((queue, _, _)) => {
//check if the channel is open, if not then open it
if !channel.is_open() {
println!(
"Channel is not open, does exchange {} exist on rabbitMQ?",
exchange
);
*channel = channel_rabbitmq(connection).await;
}
// bind the queue to the exchange using this channel
channel
.queue_bind(QueueBindArguments::new(&queue, exchange, routing_key))
.await
.unwrap();
}
None => {}
}
}
Err(e) => {
println!("An error occurred while setting up the queue: {}", e)
}
}
}