forked from ut-amrl/robofleet_client_lib
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathMessageSchedulerLib.hpp
136 lines (116 loc) · 4.39 KB
/
MessageSchedulerLib.hpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
#pragma once
#include <chrono>
#include <deque>
#include <iostream>
#include <unordered_map>
#include <vector>
#include <functional>
using SchedulerClock = std::chrono::high_resolution_clock;
template <typename T> struct WaitingMessage {
// the message data
T message;
// is this message unsent and waiting?
bool message_ready = false;
double priority = 0;
double publish_interval = 0;
// when a message was last sent on this topic
SchedulerClock::time_point last_send_time = SchedulerClock::now();
// updated as now() - last_send_time, only when a message is ready
std::chrono::duration<double> time_waiting =
std::chrono::duration<double>::zero();
};
/**
* @brief Queues messages and schedules them on demand.
* This implements Robofleet's backpressure monitoring algorithm for traffic control, and expects backpressure updates via the websocket ping/pong mechanism.
* This does *not* include rate limiting, which is expected to be done by a client *before* calling `enqueue`,
* as the client can avoid paying the cost of encoding a message by rate-limiting earlier.
* Messages are enqueued, and then later scheduled
* when schedule() is called.
*/
template <typename T> class MessageSchedulerLib {
std::function<void(const T&)> sc_;
uint64_t network_backpressure_counter_ = 0;
uint64_t max_queue_before_waiting_ = 1;
std::deque<T> no_drop_queue;
std::unordered_map<std::string, WaitingMessage<T>> topic_queue;
public:
MessageSchedulerLib(uint64_t mq, std::function<void(const T&)> sc) {
max_queue_before_waiting_ = mq;
sc_ = sc;
}
void enqueue(
const std::string& topic, const T& data, double priority, double rate_limit,
bool no_drop) {
if (no_drop) {
no_drop_queue.push_back(data);
} else {
topic_queue[topic].message = data;
topic_queue[topic].message_ready = true;
topic_queue[topic].priority = priority;
topic_queue[topic].publish_interval = rate_limit != 0 ? 1.0 / rate_limit : 0.0;
}
schedule();
}
/**
* @brief Fire this to indicate that the network is free
* Updates the counter for network backpressure
*/
void backpressure_update(uint64_t message_index, uint64_t last_ponged_index) {
network_backpressure_counter_ = message_index - last_ponged_index;
schedule();
}
/**
* @brief Schedule messages now.
* Messages flagged as no_drop are sent first, in FIFO fashion.
* Then, messages are sent by topic priority.
*/
void schedule() {
if (network_backpressure_counter_ >= max_queue_before_waiting_) {
return;
}
// flush no-drop queue
if (!no_drop_queue.empty()) {
while (!no_drop_queue.empty()) {
const auto& next = no_drop_queue.front();
sc_(next);
no_drop_queue.pop_front();
network_backpressure_counter_++;
}
if (network_backpressure_counter_ >= max_queue_before_waiting_) {
return;
}
}
if (topic_queue.empty()) {
return;
}
const auto now = SchedulerClock::now();
// Collect candidates
std::vector<WaitingMessage<T>*> candidates;
for (auto it = topic_queue.begin(); it != topic_queue.end(); ++it) {
WaitingMessage<T>& candidate = it->second;
const auto duration =
std::chrono::duration_cast<std::chrono::milliseconds>(now - candidate.last_send_time);
if (candidate.message_ready && duration.count() / 1000.0 > candidate.publish_interval) {
candidate.time_waiting =
(duration) * candidate.priority;
candidates.push_back(&candidate);
}
}
// Determine how many messages we are allowed to send
int messages_to_send = std::min(
(max_queue_before_waiting_ - network_backpressure_counter_),
(uint64_t)candidates.size());
// Sort candidates
auto compare = [](WaitingMessage<T>* lhs, WaitingMessage<T>* rhs) {
return lhs->time_waiting < rhs->time_waiting;
};
std::sort(candidates.begin(), candidates.end(), compare);
// attempt to publish top-K candidates
for (int cand_idx = 0; cand_idx < messages_to_send; cand_idx++) {
sc_(candidates[cand_idx]->message);
candidates[cand_idx]->message_ready = false;
candidates[cand_idx]->last_send_time = now;
network_backpressure_counter_++;
}
}
};