Skip to content

Commit

Permalink
Better handle LCM message publication time (RobotLocomotion#8450)
Browse files Browse the repository at this point in the history
In its original form, it was repeated-everywhere default value of 0.0,
but GSG forbids default arguments in virtual methods.  Here, we change
it to be an optional<>.

This patch also contains a hodge-podge of related cleanups for the
orignal drake_lcm_log code.
  • Loading branch information
jwnimmer-tri authored Mar 28, 2018
1 parent bc017cd commit 4704f8b
Show file tree
Hide file tree
Showing 15 changed files with 65 additions and 42 deletions.
7 changes: 1 addition & 6 deletions automotive/automotive_simulator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -544,12 +544,7 @@ template <typename T>
void AutomotiveSimulator<T>::SendLoadRobotMessage(
const lcmt_viewer_load_robot& message) {
DRAKE_DEMAND(lcm_ != nullptr);
const int num_bytes = message.getEncodedSize();
std::vector<uint8_t> message_bytes(num_bytes);
const int num_bytes_encoded =
message.encode(message_bytes.data(), 0, num_bytes);
DRAKE_ASSERT(num_bytes_encoded == num_bytes);
lcm_->Publish("DRAKE_VIEWER_LOAD_ROBOT", message_bytes.data(), num_bytes);
Publish(lcm_.get(), "DRAKE_VIEWER_LOAD_ROBOT", message);
}

template <typename T>
Expand Down
2 changes: 1 addition & 1 deletion examples/valkyrie/test/valkyrie_ik_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ GTEST_TEST(ValkyrieIK_Test, ValkyrieIK_Test_StandingPose_Test) {
std::vector<uint8_t> message_bytes;
posture_drawer.Serialize(0, q_draw, &message_bytes);
lcm.Publish("DRAKE_VIEWER_DRAW", message_bytes.data(),
message_bytes.size());
message_bytes.size(), {});
}

} // namespace
Expand Down
2 changes: 1 addition & 1 deletion lcm/drake_lcm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ void DrakeLcm::StopReceiveThread() {
::lcm::LCM* DrakeLcm::get_lcm_instance() { return &lcm_; }

void DrakeLcm::Publish(const std::string& channel, const void* data,
int data_size, double) {
int data_size, optional<double>) {
DRAKE_THROW_UNLESS(!channel.empty());
lcm_.publish(channel, data, data_size);
}
Expand Down
2 changes: 1 addition & 1 deletion lcm/drake_lcm.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class DrakeLcm : public DrakeLcmInterface {
::lcm::LCM* get_lcm_instance();

void Publish(const std::string& channel, const void* data,
int data_size, double time_sec = 0) override;
int data_size, optional<double> time_sec) override;

void Subscribe(const std::string& channel,
DrakeLcmMessageHandlerInterface* handler) override;
Expand Down
12 changes: 7 additions & 5 deletions lcm/drake_lcm_interface.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ namespace lcm {
class DrakeLcmInterface {
public:
DRAKE_NO_COPY_NO_MOVE_NO_ASSIGN(DrakeLcmInterface)
DrakeLcmInterface() = default;
virtual ~DrakeLcmInterface() = default;

/**
Expand All @@ -33,11 +32,11 @@ class DrakeLcmInterface {
*
* @param[in] data_size The length of @data in bytes.
*
* @param[in] time_sec Time in seconds when the publish event occurred. Note
* that this argument is only used when generating a Lcm log.
* @param[in] time_sec Time in seconds when the publish event occurred.
* If unknown, use drake::nullopt or a default-constructed optional.
*/
virtual void Publish(const std::string& channel, const void* data,
int data_size, double time_sec = 0) = 0;
int data_size, optional<double> time_sec) = 0;

/**
* Subscribes to an LCM channel without automatic message decoding. The
Expand All @@ -51,6 +50,9 @@ class DrakeLcmInterface {
*/
virtual void Subscribe(const std::string& channel,
DrakeLcmMessageHandlerInterface* handler) = 0;

protected:
DrakeLcmInterface() = default;
};

/**
Expand All @@ -76,7 +78,7 @@ void Publish(DrakeLcmInterface* lcm, const std::string& channel,
const size_t size_bytes = static_cast<size_t>(num_bytes);
std::vector<uint8_t> bytes(size_bytes);
message.encode(bytes.data(), 0, num_bytes);
lcm->Publish(channel, bytes.data(), num_bytes, time_sec.value_or(0.0));
lcm->Publish(channel, bytes.data(), num_bytes, time_sec);
}

} // namespace lcm
Expand Down
8 changes: 4 additions & 4 deletions lcm/drake_lcm_log.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ DrakeLcmLog::DrakeLcmLog(const std::string& file_name, bool is_write,
: is_write_(is_write),
overwrite_publish_time_with_system_clock_(
overwrite_publish_time_with_system_clock) {
if (is_write) {
if (is_write_) {
log_ = std::make_unique<::lcm::LogFile>(file_name, "w");
} else {
log_ = std::make_unique<::lcm::LogFile>(file_name, "r");
Expand All @@ -26,16 +26,16 @@ DrakeLcmLog::DrakeLcmLog(const std::string& file_name, bool is_write,
}

void DrakeLcmLog::Publish(const std::string& channel, const void* data,
int data_size, double second) {
int data_size, optional<double> time_sec) {
if (!is_write_) {
throw std::logic_error("Publish is only available for log saving.");
}

std::lock_guard<std::mutex> lock(mutex_);

::lcm::LogEvent log_event;
::lcm::LogEvent log_event{};
if (!overwrite_publish_time_with_system_clock_) {
log_event.timestamp = second_to_timestamp(second);
log_event.timestamp = second_to_timestamp(time_sec.value_or(0.0));
} else {
log_event.timestamp = std::chrono::steady_clock::now().time_since_epoch() /
std::chrono::microseconds(1);
Expand Down
22 changes: 13 additions & 9 deletions lcm/drake_lcm_log.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ namespace lcm {
* existing log. Note the user is responsible for offsetting the clock used
* to generate the log and the clock used for playback. For example, if the log
* is generated by some external logger (the lcm-logger binary), which uses the
* unix epoch time clock to record message arrival time, the user need to offset
* those timestamps properly to match and the clock used for playback.
* unix epoch time clock to record message arrival time, the user needs to
* offset those timestamps properly to match and the clock used for playback.
*/
class DrakeLcmLog : public DrakeLcmInterface {
public:
Expand All @@ -43,16 +43,16 @@ class DrakeLcmLog : public DrakeLcmInterface {
*
* @throws std::runtime_error if unable to open file.
*/
explicit DrakeLcmLog(const std::string& file_name, bool is_write,
bool overwrite_publish_time_with_system_clock = false);
DrakeLcmLog(const std::string& file_name, bool is_write,
bool overwrite_publish_time_with_system_clock = false);

/**
* Writes an entry occurred at @p timestamp with content @p data to the log
* file. The current implementation blocks until writing is done.
* @param channel Channel name.
* @param data Pointer to raw bytes.
* @param data_size Number of bytes in @p data.
* @param second Time in seconds when the message is published. Since
* @param time_sec Time in seconds when the message is published. Since
* messages are save to the log file in the order of Publish calls, this
* function should only be called with non-decreasing @p second. Note that
* this parameter can be overwritten by the host system's clock if
Expand All @@ -62,7 +62,7 @@ class DrakeLcmLog : public DrakeLcmInterface {
* mode.
*/
void Publish(const std::string& channel, const void* data, int data_size,
double second) override;
optional<double> time_sec) override;

/**
* Subscribes @p handler to @p channel. Multiple handlers can subscribe to the
Expand Down Expand Up @@ -99,7 +99,7 @@ class DrakeLcmLog : public DrakeLcmInterface {
/**
* Returns true if this instance is constructed in write-only mode.
*/
bool is_write_only() const { return is_write_; }
bool is_write() const { return is_write_; }

/**
* Converts @p timestamp (in microseconds) to time (in seconds) relative to
Expand All @@ -121,10 +121,14 @@ class DrakeLcmLog : public DrakeLcmInterface {
const bool is_write_;
const bool overwrite_publish_time_with_system_clock_;

std::map<std::string, std::vector<DrakeLcmMessageHandlerInterface*>>
subscriptions_;
// TODO(jwnimmer-tri) It is not clear to me why this class needs a mutex
// (i.e., where multiple threads are coming from). That factor needs to be
// re-discovered and then documented somewhere.

// This mutes guards access to all of the below member fields.
mutable std::mutex mutex_;
std::map<std::string, std::vector<DrakeLcmMessageHandlerInterface*>>
subscriptions_;
std::unique_ptr<::lcm::LogFile> log_;
const ::lcm::LogEvent* next_event_{nullptr};
};
Expand Down
12 changes: 11 additions & 1 deletion lcm/drake_mock_lcm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ DrakeMockLcm::DrakeMockLcm() {}


void DrakeMockLcm::Publish(const std::string& channel, const void* data,
int data_size, double) {
int data_size, optional<double> time_sec) {
DRAKE_THROW_UNLESS(!channel.empty());
if (last_published_messages_.find(channel) ==
last_published_messages_.end()) {
Expand All @@ -26,6 +26,7 @@ void DrakeMockLcm::Publish(const std::string& channel, const void* data,

const uint8_t* bytes = static_cast<const uint8_t*>(data);
saved_message->data = std::vector<uint8_t>(&bytes[0], &bytes[data_size]);
saved_message->time_sec = time_sec;

if (enable_loop_back_) {
InduceSubscriberCallback(channel, data, data_size);
Expand All @@ -48,6 +49,15 @@ const std::vector<uint8_t>& DrakeMockLcm::get_last_published_message(
return message->data;
}

optional<double> DrakeMockLcm::get_last_publication_time(
const std::string& channel) const {
auto iter = last_published_messages_.find(channel);
if (iter == last_published_messages_.end()) {
return nullopt;
}
return iter->second.time_sec;
}

void DrakeMockLcm::Subscribe(const std::string& channel,
DrakeLcmMessageHandlerInterface* handler) {
DRAKE_THROW_UNLESS(!channel.empty());
Expand Down
11 changes: 9 additions & 2 deletions lcm/drake_mock_lcm.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ class DrakeMockLcm : public DrakeLcmInterface {
*/
void EnableLoopBack() { enable_loop_back_ = true; }

void Publish(const std::string& channel, const void* data,
int data_size, double time_sec = 0) override;
void Publish(const std::string&, const void*, int, optional<double>) override;

/**
* Obtains the most recently "published" message on a particular channel.
Expand Down Expand Up @@ -88,6 +87,13 @@ class DrakeMockLcm : public DrakeLcmInterface {
const std::vector<uint8_t>& get_last_published_message(
const std::string& channel) const;

/**
* Returns the time of the most recent publication on a particular channel.
* Returns nullopt iff a message has never been published on this channel or
* the most recent Publish call had no time_sec.
*/
optional<double> get_last_publication_time(const std::string& channel) const;

/**
* Creates a subscription. Only one subscription per channel name is
* permitted. A std::runtime_error is thrown if more than one subscription to
Expand Down Expand Up @@ -115,6 +121,7 @@ class DrakeMockLcm : public DrakeLcmInterface {

struct LastPublishedMessage {
std::vector<uint8_t> data{};
optional<double> time_sec{};
};

std::map<std::string, LastPublishedMessage> last_published_messages_;
Expand Down
17 changes: 11 additions & 6 deletions lcm/test/drake_mock_lcm_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,19 @@ GTEST_TEST(DrakeMockLcmTest, PublishTest) {
// Instantiates the Device Under Test (DUT).
DrakeMockLcm dut;

dut.Publish(channel_name, &message_bytes[0], message_size);
dut.Publish(channel_name, &message_bytes[0], message_size, nullopt);

// Verifies that the message was "published".
const vector<uint8_t>& published_message_bytes =
dut.get_last_published_message(channel_name);

EXPECT_EQ(message_size, static_cast<int>(published_message_bytes.size()));
EXPECT_EQ(message_bytes, published_message_bytes);
EXPECT_FALSE(dut.get_last_publication_time(channel_name).has_value());

// Now with a publication time.
dut.Publish(channel_name, &message_bytes[0], message_size, 1.23);
EXPECT_EQ(dut.get_last_publication_time(channel_name).value_or(-1.0), 1.23);
}

// Tests DrakeMockLcm::DecodeLastPublishedMessageAs() using an lcmt_drake_signal
Expand All @@ -57,7 +62,7 @@ GTEST_TEST(DrakeMockLcmTest, DecodeLastPublishedMessageAsTest) {

// Instantiates the Device Under Test (DUT).
DrakeMockLcm dut;
dut.Publish(channel_name, &message_bytes[0], message_size);
dut.Publish(channel_name, &message_bytes[0], message_size, nullopt);

// Verifies that the message was "published".
const lcmt_drake_signal last_published_message =
Expand All @@ -69,12 +74,12 @@ GTEST_TEST(DrakeMockLcmTest, DecodeLastPublishedMessageAsTest) {
EXPECT_EQ(last_published_message.timestamp, original_message.timestamp);

// Verifies that exceptions are thrown when a decode operation fails.
dut.Publish(channel_name, &message_bytes[0], message_size - 1);
dut.Publish(channel_name, &message_bytes[0], message_size - 1, nullopt);
EXPECT_THROW(dut.DecodeLastPublishedMessageAs<lcmt_drake_signal>(
channel_name), std::runtime_error);

message_bytes.push_back(0);
dut.Publish(channel_name, &message_bytes[0], message_size + 1);
dut.Publish(channel_name, &message_bytes[0], message_size + 1, nullopt);
EXPECT_THROW(dut.DecodeLastPublishedMessageAs<lcmt_drake_signal>(
channel_name), std::runtime_error);
}
Expand Down Expand Up @@ -166,7 +171,7 @@ GTEST_TEST(DrakeMockLcmTest, WithLoopbackTest) {
message_bytes[i] = i;
}

dut.Publish(kChannelName, &message_bytes[0], kMessageSize);
dut.Publish(kChannelName, &message_bytes[0], kMessageSize, nullopt);

// Verifies that the message was received via loopback.
EXPECT_EQ(kChannelName, handler.get_channel());
Expand Down Expand Up @@ -200,7 +205,7 @@ GTEST_TEST(DrakeMockLcmTest, WithoutLoopbackTest) {
message_bytes[i] = i;
}

dut.Publish(kChannelName, &message_bytes[0], kMessageSize);
dut.Publish(kChannelName, &message_bytes[0], kMessageSize, nullopt);

// Verifies that the message was not received via loopback.
EXPECT_NE(kChannelName, handler.get_channel());
Expand Down
2 changes: 1 addition & 1 deletion manipulation/util/simple_tree_visualizer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ void SimpleTreeVisualizer::visualize(const VectorX<double>& position_vector) {

// Publishes onto the specified LCM channel.
lcm_->Publish("DRAKE_VIEWER_DRAW", message_bytes.data(),
message_bytes.size());
message_bytes.size(), kTime);
}

} // namespace manipulation
Expand Down
2 changes: 1 addition & 1 deletion perception/estimators/dev/test/articulated_icp_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ class ArticulatedIcpVisualizer : public PointCloudVisualizer {
xb.setZero();
xb.head(num_q) = scene_state.q();
draw_msg_tx.Serialize(0, x, &bytes);
lcm().Publish("DRAKE_VIEWER_DRAW", bytes.data(), bytes.size());
lcm().Publish("DRAKE_VIEWER_DRAW", bytes.data(), bytes.size(), {});
}

private:
Expand Down
1 change: 0 additions & 1 deletion systems/lcm/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ drake_cc_library(
deps = [
":translator",
"//lcm:interface",
"//lcm:lcm_log",
"//systems/framework",
],
)
Expand Down
3 changes: 0 additions & 3 deletions systems/lcm/lcm_publisher_system.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,6 @@ class LcmPublisherSystem : public LeafSystem<double> {
channel, std::make_unique<Serializer<LcmMessage>>(), lcm);
}

// TODO(siyuan): add multiple DrakeLcmInterface, so you can publish to a log
// and real lcm at the same time.

/**
* A constructor for an %LcmPublisherSystem that takes LCM message objects on
* its sole abstract-valued input port. The LCM message type is determined by
Expand Down
4 changes: 4 additions & 0 deletions systems/lcm/test/lcm_publisher_system_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ void TestPublisher(const std::string& channel_name, lcm::DrakeMockLcm* lcm,
// TODO(liang.fok) Replace this with a Google Test matcher.
EXPECT_TRUE(CompareLcmtDrakeSignalMessages(received_message,
expected_message));

EXPECT_EQ(
lcm->get_last_publication_time(dut->get_channel_name()).value_or(-1.0),
time);
}

// Tests LcmPublisherSystem using a translator.
Expand Down

0 comments on commit 4704f8b

Please sign in to comment.