forked from RobotLocomotion/drake
-
Notifications
You must be signed in to change notification settings - Fork 0
/
drake_lcm_interface.h
354 lines (323 loc) · 13.3 KB
/
drake_lcm_interface.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
#pragma once
#include <cstdint>
#include <functional>
#include <memory>
#include <optional>
#include <stdexcept>
#include <string>
#include <string_view>
#include <utility>
#include <vector>
#include <fmt/format.h>
#include "drake/common/drake_copyable.h"
#include "drake/common/drake_throw.h"
#include "drake/lcm/lcm_messages.h"
namespace drake {
namespace lcm {
// Declared later in this file.
class DrakeLcmInterface;
class DrakeSubscriptionInterface;
namespace internal {
// Used by the drake::lcm::Subscribe() free function to report errors.
void OnHandleSubscriptionsError(DrakeLcmInterface* lcm,
const std::string& error_message);
} // namespace internal
/**
* A pure virtual interface that enables LCM to be mocked.
*
* Because it must be pure, in general it will receive breaking API changes
* without notice. Users should not subclass this interface directly, but
* rather use one of the existing subclasses such as DrakeLcmBase instead.
*
* Similarly, method arguments will receive breaking API changes without
* notice. Users should not call this interface directly, but rather use
* drake::lcm::Publish() or drake::lcm::Subscribe() instead.
*/
class DrakeLcmInterface {
public:
DRAKE_NO_COPY_NO_MOVE_NO_ASSIGN(DrakeLcmInterface);
virtual ~DrakeLcmInterface();
/**
* A callback used by DrakeLcmInterface::Subscribe(), with arguments:
* - `message_buffer` A pointer to the byte vector that is the serial
* representation of the LCM message.
* - `message_size` The size of `message_buffer`.
*
* A callback should never throw an exception, because it is indirectly
* called from C functions.
*/
using HandlerFunction = std::function<void(const void*, int)>;
/**
* A callback used by DrakeLcmInterface::SubscribeMultipleChannels (which
* therefore needs the receiving channel passed in).
*/
using MultichannelHandlerFunction =
std::function<void(std::string_view, const void*, int)>;
/**
* Returns a URL describing the transport of this LCM interface.
*
* When the URL refers to a transport offered by LCM itself (e.g., memq or
* udpm), then this function must return the conventional URL spelling. If
* the implementation of DrakeLcmInterface is using a non-standard back end,
* the result implementation-defined.
*
* In either case, it is always formatted using URI syntax rules per the
* RFC(s).
*/
virtual std::string get_lcm_url() const = 0;
/**
* Most users should use the drake::lcm::Publish() free function, instead of
* this interface method.
*
* Publishes an LCM message on channel @p channel.
*
* @param channel The channel on which to publish the message.
* Must not be the empty string.
*
* @param data A buffer containing the serialized bytes of the message to
* publish.
*
* @param data_size The length of @data in bytes.
*
* @param time_sec Time in seconds when the publish event occurred.
* If unknown, use nullopt or a default-constructed optional.
*/
virtual void Publish(const std::string& channel, const void* data,
int data_size, std::optional<double> time_sec) = 0;
/**
* Most users should use the drake::lcm::Subscribe() free function or the
* drake::lcm::Subscriber wrapper class, instead of this interface method.
*
* Subscribes to an LCM channel without automatic message decoding. The
* handler will be invoked when a message arrives on channel @p channel.
*
* The handler should never throw an exception, because it is indirectly
* called from C functions.
*
* @param channel The channel to subscribe to. Must not be the empty string.
* To use a regex, see SubscribeMultichannel().
*
* @return the object used to manage the subscription if that is supported,
* or else nullptr if not supported. The unsubscribe-on-delete default is
* `false`. Refer to the DrakeSubscriptionInterface class overview for
* details.
*/
virtual std::shared_ptr<DrakeSubscriptionInterface> Subscribe(
const std::string& channel, HandlerFunction) = 0;
/**
* Subscribes to all channels whose name matches the given regular expression.
* The `regex` is treated as an anchored "match" not a "search", i.e., it must
* match the entire channel name. The specific regular expression grammar is
* left unspecified, so it's best to use only patterns that have identical
* semantics in all grammars, e.g., `".*"`. */
virtual std::shared_ptr<DrakeSubscriptionInterface> SubscribeMultichannel(
std::string_view regex, MultichannelHandlerFunction) = 0;
/**
* Subscribe to all channels; this is useful for logging and redirecting LCM
* traffic without regard to its content.
*/
virtual std::shared_ptr<DrakeSubscriptionInterface> SubscribeAllChannels(
MultichannelHandlerFunction) = 0;
/**
* Invokes the HandlerFunction callbacks for all subscriptions' pending
* messages. If @p timeout_millis is >0, blocks for up to that long until at
* least one message is handled.
* @return the number of messages handled, or 0 on timeout.
* @throw std::exception when a subscribed handler throws.
*/
virtual int HandleSubscriptions(int timeout_millis) = 0;
protected:
DrakeLcmInterface();
private:
// Allow our internal function to call the virtual function.
friend void internal::OnHandleSubscriptionsError(
DrakeLcmInterface* /* lcm */, const std::string& /* error_message */);
// A virtual function to be called during HandleSubscriptions processing.
virtual void OnHandleSubscriptionsError(const std::string& error_message) = 0;
};
/**
* A helper class returned by DrakeLcmInterface::Subscribe() that allows for
* (possibly automatic) unsubscription and/or queue capacity control. Refer to
* that method for additional details.
*
* Instance of this object are always stored in `std::shared_ptr` to manage
* them as resources. When a particular DrakeLcmInterface implementation does
* not support subscription controls, the managed pointer will be `nullptr`
* instead of an instance of this object.
*
* To unsubscribe, induce a call to the %DrakeSubscriptionInterface destructor
* by bringing the `std::shared_ptr` use count to zero. That usually means
* either a call to `subscription.reset()` or by allowing it to go out of
* scope.
*
* To *disable* unsubscription so that the pointer loss *never* causes
* unsubscription, call `subscription->set_unsubscribe_on_delete(false)`.
* To *enable* unsubscription, set it to `true`. Which choice is active by
* default is specified by whatever method returns this object.
*/
class DrakeSubscriptionInterface {
public:
DRAKE_NO_COPY_NO_MOVE_NO_ASSIGN(DrakeSubscriptionInterface);
virtual ~DrakeSubscriptionInterface();
/**
* Sets whether or not the subscription on DrakeLcmInterface will be
* terminated when this object is deleted. It is permitted to call this
* method many times, with a new `enabled` value each time.
*/
virtual void set_unsubscribe_on_delete(bool enabled) = 0;
/**
* Sets this subscription's queue depth to store messages between calls to
* DrakeLcmInterface::HandleSubscriptions. When the queue becomes full, new
* received messages will be discarded. The default depth is 1.
*
* @warning The memq:// LCM URL does not support per-channel queues, so this
* method has no effect when memq is being used, e.g., in Drake unit tests.
*/
virtual void set_queue_capacity(int capacity) = 0;
protected:
DrakeSubscriptionInterface();
};
/**
* Publishes an LCM message on channel @p channel.
*
* @param lcm The LCM service on which to publish the message.
* Must not be null.
*
* @param channel The channel on which to publish the message.
* Must not be the empty string.
*
* @param message The message to publish.
*
* @param time_sec Time in seconds when the publish event occurred.
* If unknown, use the default value of nullopt.
*/
template <typename Message>
void Publish(DrakeLcmInterface* lcm, const std::string& channel,
const Message& message, std::optional<double> time_sec = {}) {
DRAKE_THROW_UNLESS(lcm != nullptr);
const std::vector<uint8_t> bytes = EncodeLcmMessage(message);
lcm->Publish(channel, bytes.data(), bytes.size(), time_sec);
}
/**
* Subscribes to an LCM channel named @p channel and decodes messages of type
* @p Message. See also drake::lcm::Subscriber for a simple way to passively
* observe received messages, without the need to write a handler function.
*
* @param lcm The LCM service on which to subscribe.
* Must not be null.
*
* @param channel The channel on which to subscribe.
* Must not be the empty string.
*
* @param handler The callback when a message is received and decoded without
* error.
*
* @param on_error The callback when a message is received and cannot be
* decoded; if no error handler is given, an exception is thrown instead.
*
* @return the object used to unsubscribe if that is supported, or else nullptr
* if unsubscribe is not supported. The unsubscribe-on-delete default is
* `false`, so that ignoring this result leaves the subscription intact. Refer
* to the DrakeSubscriptionInterface class overview for details.
*
* @note Depending on the specific DrakeLcmInterface implementation, the
* handler might be invoked on a different thread than this function.
*/
template <typename Message>
std::shared_ptr<DrakeSubscriptionInterface> Subscribe(
DrakeLcmInterface* lcm, const std::string& channel,
std::function<void(const Message&)> handler,
std::function<void()> on_error = {}) {
DRAKE_THROW_UNLESS(lcm != nullptr);
auto result = lcm->Subscribe(channel, [=](const void* bytes, int size) {
Message received{};
const int size_decoded = received.decode(bytes, 0, size);
if (size_decoded == size) {
try {
handler(received);
} catch (const std::exception& e) {
// Register the error on the DrakeLcmInterface that owns us. It will
// throw once it's safe (i.e., once C code is no longer on the stack).
internal::OnHandleSubscriptionsError(
lcm, fmt::format("Error from message handler callback on {}: {}",
channel, e.what()));
}
} else if (on_error) {
on_error();
} else {
// Register the error on the DrakeLcmInterface that owns us. It will
// throw once it's safe (i.e., once C code is no longer on the stack).
internal::OnHandleSubscriptionsError(
lcm, fmt::format("Error decoding message on {}", channel));
}
});
return result;
}
/**
* Subscribes to and stores a copy of the most recent message on a given
* channel, for some @p Message type. All copies of a given Subscriber share
* the same underlying data. This class does NOT provide any mutex behavior
* for multi-threaded locking; it should only be used in cases where the
* governing DrakeLcmInterface::HandleSubscriptions is called from the same
* thread that owns all copies of this object.
*/
template <typename Message>
class Subscriber final {
public:
// Intentionally copyable so that it can be returned and stored by-value.
DRAKE_DEFAULT_COPY_AND_MOVE_AND_ASSIGN(Subscriber);
/**
* Subscribes to the (non-empty) @p channel on the given (non-null)
* @p lcm instance. The `lcm` pointer is only used during construction; it
* is not retained by this object. When a undecodable message is received,
* @p on_error handler is invoked; when `on_error` is not provided, an
* exception will be thrown instead.
*/
Subscriber(DrakeLcmInterface* lcm, const std::string& channel,
std::function<void()> on_error = {}) {
subscription_ = drake::lcm::Subscribe<Message>(
lcm, channel,
[data = data_](const Message& message) {
data->message = message;
data->count++;
},
std::move(on_error));
if (subscription_) {
subscription_->set_unsubscribe_on_delete(true);
}
}
/**
* Returns the most recently received message, or a value-initialized (zeros)
* message otherwise.
*/
const Message& message() const { return data_->message; }
Message& message() { return data_->message; }
/** Returns the total number of received messages. */
int64_t count() const { return data_->count; }
int64_t& count() { return data_->count; }
/** Clears all data (sets the message and count to all zeros). */
void clear() {
data_->message = {};
data_->count = 0;
}
private:
struct Data {
Message message{};
int64_t count{0};
};
// Share a single copy of our (mutable) message storage, for all Subscribers
// to view or modify *and* for our subscription closure to write into. This
// will not be destroyed until all Subscribers are gone AND the subscription
// closure has been destroyed.
std::shared_ptr<Data> data_{std::make_shared<Data>()};
// Keep our subscription active as long as a copy of this Subscriber remains.
std::shared_ptr<DrakeSubscriptionInterface> subscription_;
};
/// Convenience function that repeatedly calls `lcm->HandleSubscriptions()`
/// with a timeout value of `timeout_millis`, until `finished()` returns true.
/// Returns the total number of messages handled.
int LcmHandleSubscriptionsUntil(DrakeLcmInterface* lcm,
const std::function<bool(void)>& finished,
int timeout_millis = 100);
} // namespace lcm
} // namespace drake