Skip to content

Commit

Permalink
Merge pull request RobotLocomotion#8313 from EricCousineau-TRI/featur…
Browse files Browse the repository at this point in the history
…e/lcm_wait_for_message_update

 lcm_subscriber_system: Allow `WaitForMessage` to output the latest message
  • Loading branch information
EricCousineau-TRI authored Mar 12, 2018
2 parents 1690614 + 5a8ef3b commit 79f0832
Show file tree
Hide file tree
Showing 11 changed files with 131 additions and 24 deletions.
5 changes: 4 additions & 1 deletion lcm/drake_lcm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,10 @@ void DrakeLcm::StartReceiveThread() {
}

void DrakeLcm::StopReceiveThread() {
if (receive_thread_ != nullptr) receive_thread_->Stop();
if (receive_thread_ != nullptr) {
receive_thread_->Stop();
receive_thread_.reset();
}
}

::lcm::LCM* DrakeLcm::get_lcm_instance() { return &lcm_; }
Expand Down
4 changes: 4 additions & 0 deletions lcm/drake_lcm.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ class DrakeLcm : public DrakeLcmInterface {

void StopReceiveThread() override;

bool IsReceiveThreadRunning() const override {
return receive_thread_ != nullptr;
}

/**
* An accessor to the real LCM instance encapsulated by this object. The
* returned pointer is guaranteed to be valid for the duration of this
Expand Down
5 changes: 5 additions & 0 deletions lcm/drake_lcm_interface.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ class DrakeLcmInterface {
*/
virtual void StopReceiveThread() = 0;

/**
* Indicates that the receiving thread is running.
*/
virtual bool IsReceiveThreadRunning() const = 0;

/**
* Publishes an LCM message on channel @p channel.
*
Expand Down
13 changes: 11 additions & 2 deletions lcm/drake_lcm_log.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include <atomic>
#include <map>
#include <memory>
#include <mutex>
Expand Down Expand Up @@ -116,8 +117,15 @@ class DrakeLcmLog : public DrakeLcmInterface {
return static_cast<uint64_t>(sec * 1e6);
}

void StartReceiveThread() override {}
void StopReceiveThread() override {}
void StartReceiveThread() override {
receive_thread_started_ = true;
}
void StopReceiveThread() override {
receive_thread_started_ = false;
}
bool IsReceiveThreadRunning() const override {
return receive_thread_started_;
}

private:
const bool is_write_;
Expand All @@ -129,6 +137,7 @@ class DrakeLcmLog : public DrakeLcmInterface {
mutable std::mutex mutex_;
std::unique_ptr<::lcm::LogFile> log_;
const ::lcm::LogEvent* next_event_{nullptr};
std::atomic<bool> receive_thread_started_{false};
};

} // namespace lcm
Expand Down
4 changes: 4 additions & 0 deletions lcm/drake_mock_lcm.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ class DrakeMockLcm : public DrakeLcmInterface {

void StopReceiveThread() override;

bool IsReceiveThreadRunning() const override {
return receive_thread_started_;
}

void Publish(const std::string& channel, const void* data,
int data_size, double time_sec = 0) override;

Expand Down
6 changes: 6 additions & 0 deletions lcm/test/drake_lcm_log_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@ GTEST_TEST(LcmLogTest, LcmLogTestSaveAndRead) {
auto w_log = std::make_unique<DrakeLcmLog>("test.log", true);
const std::string channel_name("test_channel");

EXPECT_FALSE(w_log->IsReceiveThreadRunning());
w_log->StartReceiveThread();
EXPECT_TRUE(w_log->IsReceiveThreadRunning());
w_log->StopReceiveThread();
EXPECT_FALSE(w_log->IsReceiveThreadRunning());

drake::lcmt_drake_signal msg;
msg.dim = 1;
msg.val.push_back(0.1);
Expand Down
4 changes: 4 additions & 0 deletions lcm/test/drake_lcm_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,10 @@ TEST_F(DrakeLcmTest, PublishTest) {
// order of construction, this ensures the LCM receive thread stops before any
// resources it uses are destroyed. If the Lcm receive thread is stopped after
// the resources it relies on are destroyed, a segmentation fault may occur.

EXPECT_FALSE(dut.IsReceiveThreadRunning());
dut.StartReceiveThread();
EXPECT_TRUE(dut.IsReceiveThreadRunning());

// Records whether the receiver received an LCM message published by the DUT.
bool done = false;
Expand Down Expand Up @@ -130,6 +133,7 @@ TEST_F(DrakeLcmTest, PublishTest) {

dut.StopReceiveThread();
EXPECT_TRUE(done);
EXPECT_FALSE(dut.IsReceiveThreadRunning());
}

// Handles received LCM messages.
Expand Down
4 changes: 4 additions & 0 deletions lcm/test/drake_mock_lcm_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ GTEST_TEST(DrakeMockLcmTest, PublishTest) {
// Instantiates the Device Under Test (DUT).
DrakeMockLcm dut;

EXPECT_FALSE(dut.IsReceiveThreadRunning());
dut.StartReceiveThread();
EXPECT_TRUE(dut.IsReceiveThreadRunning());
dut.Publish(channel_name, &message_bytes[0], message_size);

// Verifies that the message was "published".
Expand All @@ -38,6 +40,8 @@ GTEST_TEST(DrakeMockLcmTest, PublishTest) {

EXPECT_EQ(message_size, static_cast<int>(published_message_bytes.size()));
EXPECT_EQ(message_bytes, published_message_bytes);
dut.StopReceiveThread();
EXPECT_FALSE(dut.IsReceiveThreadRunning());
}

// Tests DrakeMockLcm::DecodeLastPublishedMessageAs() using an lcmt_drake_signal
Expand Down
17 changes: 15 additions & 2 deletions systems/lcm/lcm_subscriber_system.cc
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,8 @@ void LcmSubscriberSystem::HandleMessage(const std::string& channel,
}
}

int LcmSubscriberSystem::WaitForMessage(int old_message_count) const {
int LcmSubscriberSystem::WaitForMessage(
int old_message_count, AbstractValue* message) const {
// The message buffer and counter are updated in HandleMessage(), which is
// a callback function invoked by a different thread owned by the
// drake::lcm::DrakeLcmInterface instance passed to the constructor. Thus,
Expand All @@ -310,16 +311,28 @@ int LcmSubscriberSystem::WaitForMessage(int old_message_count) const {

// This while loop is necessary to guard for spurious wakeup:
// https://en.wikipedia.org/wiki/Spurious_wakeup
while (old_message_count == received_message_count_)
while (old_message_count >= received_message_count_) {
// When wait returns, lock is atomically acquired. So it's thread safe to
// read received_message_count_.
received_message_condition_variable_.wait(lock);
}
int new_message_count = received_message_count_;
if (message) {
DRAKE_ASSERT(translator_ == nullptr);
DRAKE_ASSERT(serializer_ != nullptr);
serializer_->Deserialize(
received_message_.data(), received_message_.size(), message);
}
lock.unlock();

return new_message_count;
}

int LcmSubscriberSystem::GetInternalMessageCount() const {
std::unique_lock<std::mutex> lock(received_message_mutex_);
return received_message_count_;
}

const LcmAndVectorBaseTranslator& LcmSubscriberSystem::get_translator() const {
DRAKE_DEMAND(translator_ != nullptr);
return *translator_;
Expand Down
16 changes: 13 additions & 3 deletions systems/lcm/lcm_subscriber_system.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,20 @@ class LcmSubscriberSystem : public LeafSystem<double>,
void get_input_port(int) = delete;

/**
* Blocks the caller until @p old_message_count is different from the
* internal message counter, and the internal message counter is returned.
* Blocks the caller until its internal message count exceeds
* `old_message_count`.
* @param old_message_count Internal message counter.
* @param message If non-null, will return the received message.
* @pre If `message` is specified, this system must be abstract-valued.
*/
int WaitForMessage(int old_message_count) const;
int WaitForMessage(
int old_message_count, AbstractValue* message = nullptr) const;

/**
* Returns the internal message counter. Meant to be used with
* `WaitForMessage`.
*/
int GetInternalMessageCount() const;

/**
* Returns the message counter stored in @p context.
Expand Down
77 changes: 61 additions & 16 deletions systems/lcm/test/lcm_subscriber_system_test.cc
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "drake/systems/lcm/lcm_subscriber_system.h"

#include <array>
#include <future>

#include <gtest/gtest.h>

Expand Down Expand Up @@ -129,6 +130,18 @@ GTEST_TEST(LcmSubscriberSystemTest, ReceiveTestUsingDictionary) {
TestSubscriber(&lcm, channel_name, &dut);
}

struct SampleData {
const lcmt_drake_signal value{2, {1.0, 2.0}, {"x", "y"}, 12345};

void MockPublish(
drake::lcm::DrakeMockLcm* lcm, const std::string& channel_name) {
const int num_bytes = value.getEncodedSize();
std::vector<uint8_t> buffer(num_bytes);
value.encode(buffer.data(), 0, num_bytes);
lcm->InduceSubscriberCallback(channel_name, buffer.data(), num_bytes);
}
};

// Tests LcmSubscriberSystem using a Serializer.
GTEST_TEST(LcmSubscriberSystemTest, SerializerTest) {
drake::lcm::DrakeMockLcm lcm;
Expand All @@ -143,28 +156,60 @@ GTEST_TEST(LcmSubscriberSystemTest, SerializerTest) {

// MockLcm produces a sample message.
lcm.StartReceiveThread();
const lcmt_drake_signal sample_data{
2,
{
1.0, 2.0,
},
{
"x", "y",
},
12345,
};
const int num_bytes = sample_data.getEncodedSize();
std::vector<uint8_t> buffer(num_bytes);
sample_data.encode(buffer.data(), 0, num_bytes);
lcm.InduceSubscriberCallback(channel_name, buffer.data(), num_bytes);
SampleData sample_data;
sample_data.MockPublish(&lcm, channel_name);

// Verifies that the dut produces the output message.
EvalOutputHelper(*dut, context.get(), output.get());

const AbstractValue* abstract_value = output->get_data(0);
ASSERT_NE(abstract_value, nullptr);
const auto& value = abstract_value->GetValueOrThrow<lcmt_drake_signal>();
EXPECT_TRUE(CompareLcmtDrakeSignalMessages(value, sample_data));
auto value = abstract_value->GetValueOrThrow<lcmt_drake_signal>();
EXPECT_TRUE(CompareLcmtDrakeSignalMessages(value, sample_data.value));
}

GTEST_TEST(LcmSubscriberSystemTest, WaitTest) {
// Ensure that `WaitForMessage` works as expected.
drake::lcm::DrakeMockLcm lcm;
const std::string channel_name = "channel_name";

// Start device under test, with background LCM thread running.
auto dut = LcmSubscriberSystem::Make<lcmt_drake_signal>(channel_name, &lcm);
lcm.StartReceiveThread();

SampleData sample_data;

// Use simple atomic rather than condition variables, as it simplifies this
// implementation.
std::atomic<bool> started{};
auto wait = [&]() {
while (!started.load()) {}
};

// Test explicit value.
started = false;
auto future_count = std::async(std::launch::async, [&]() {
EXPECT_EQ(dut->GetInternalMessageCount(), 0);
started = true;
return dut->WaitForMessage(0);
});
wait();
sample_data.MockPublish(&lcm, channel_name);
EXPECT_GE(future_count.get(), 1);

// Test implicit value, retrieving message as well.
started = false;
auto future_message = std::async(std::launch::async, [&]() {
int old_count = dut->GetInternalMessageCount();
started = true;
Value<lcmt_drake_signal> message;
dut->WaitForMessage(old_count, &message);
return message.get_value();
});
wait();
sample_data.MockPublish(&lcm, channel_name);
EXPECT_TRUE(CompareLcmtDrakeSignalMessages(
future_message.get(), sample_data.value));
}

// A lcmt_drake_signal translator that preserves coordinate names.
Expand Down

0 comments on commit 79f0832

Please sign in to comment.