Skip to content

Commit

Permalink
Remove thread management from DrakeLcmInterface (RobotLocomotion#8419)
Browse files Browse the repository at this point in the history
The ability to publish and subscribe should be orthogonal from
(use a different interface from) thread management.  Since we
don't have any need to mock'd thread management yet, we move
the methods onto the concrete class, rather than creating a
separate interface.
  • Loading branch information
jwnimmer-tri authored Mar 21, 2018
1 parent 30e2f95 commit c0cedeb
Show file tree
Hide file tree
Showing 11 changed files with 43 additions and 129 deletions.
10 changes: 0 additions & 10 deletions automotive/test/automotive_simulator_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ GTEST_TEST(AutomotiveSimulatorTest, TestPriusSimpleCar) {
// Set up a basic simulation with just a Prius SimpleCar.
auto simulator = std::make_unique<AutomotiveSimulator<double>>(
std::make_unique<lcm::DrakeMockLcm>());
simulator->get_lcm()->StartReceiveThread();

const int id = simulator->AddPriusSimpleCar("Foo", kCommandChannelName);
EXPECT_EQ(id, 0);
Expand Down Expand Up @@ -131,7 +130,6 @@ GTEST_TEST(AutomotiveSimulatorTest, TestPriusSimpleCar) {
GTEST_TEST(AutomotiveSimulatorTest, TestPriusSimpleCarInitialState) {
auto simulator = std::make_unique<AutomotiveSimulator<double>>(
std::make_unique<lcm::DrakeMockLcm>());
simulator->get_lcm()->StartReceiveThread();
const double kX{10};
const double kY{5.5};
const double kHeading{M_PI_2};
Expand Down Expand Up @@ -164,7 +162,6 @@ GTEST_TEST(AutomotiveSimulatorTest, TestMobilControlledSimpleCar) {
// Set up a basic simulation with a MOBIL- and IDM-controlled SimpleCar.
auto simulator = std::make_unique<AutomotiveSimulator<double>>(
std::make_unique<lcm::DrakeMockLcm>());
simulator->get_lcm()->StartReceiveThread();
lcm::DrakeMockLcm* lcm =
dynamic_cast<lcm::DrakeMockLcm*>(simulator->get_lcm());
ASSERT_NE(lcm, nullptr);
Expand Down Expand Up @@ -239,7 +236,6 @@ GTEST_TEST(AutomotiveSimulatorTest, TestPriusTrajectoryCar) {
// stationary. They both follow a straight 100 m long line.
auto simulator = std::make_unique<AutomotiveSimulator<double>>(
std::make_unique<lcm::DrakeMockLcm>());
simulator->get_lcm()->StartReceiveThread();
const int id1 = simulator->AddPriusTrajectoryCar("alice", curve, 1.0, 0.0);
const int id2 = simulator->AddPriusTrajectoryCar("bob", curve, 0.0, 0.0);
EXPECT_EQ(id1, 0);
Expand Down Expand Up @@ -433,7 +429,6 @@ std::unique_ptr<AutomotiveSimulator<double>> MakeWithIdmCarAndDecoy(
GTEST_TEST(AutomotiveSimulatorTest, TestIdmControlledSimpleCar) {
auto simulator =
MakeWithIdmCarAndDecoy(std::make_unique<lcm::DrakeMockLcm>());
simulator->get_lcm()->StartReceiveThread();

// Finish all initialization, so that we can test the post-init state.
simulator->Start();
Expand Down Expand Up @@ -495,7 +490,6 @@ double GetPosition(const lcmt_viewer_draw& message, double y) {
GTEST_TEST(AutomotiveSimulatorTest, TestMaliputRailcar) {
auto simulator = std::make_unique<AutomotiveSimulator<double>>(
std::make_unique<lcm::DrakeMockLcm>());
simulator->get_lcm()->StartReceiveThread();
lcm::DrakeMockLcm* lcm =
dynamic_cast<lcm::DrakeMockLcm*>(simulator->get_lcm());
ASSERT_NE(lcm, nullptr);
Expand Down Expand Up @@ -597,7 +591,6 @@ bool ContainsWorld(const lcmt_viewer_load_robot& message) {
GTEST_TEST(AutomotiveSimulatorTest, TestLcmOutput) {
auto simulator = std::make_unique<AutomotiveSimulator<double>>(
std::make_unique<lcm::DrakeMockLcm>());
simulator->get_lcm()->StartReceiveThread();

simulator->AddPriusSimpleCar("Model1", "Channel1");
simulator->AddPriusSimpleCar("Model2", "Channel2");
Expand Down Expand Up @@ -644,7 +637,6 @@ GTEST_TEST(AutomotiveSimulatorTest, TestLcmOutput) {
GTEST_TEST(AutomotiveSimulatorTest, TestDuplicateVehicleNameException) {
auto simulator = std::make_unique<AutomotiveSimulator<double>>(
std::make_unique<lcm::DrakeMockLcm>());
simulator->get_lcm()->StartReceiveThread();

EXPECT_NO_THROW(simulator->AddPriusSimpleCar("Model1", "Channel1"));
EXPECT_THROW(simulator->AddPriusSimpleCar("Model1", "foo"),
Expand Down Expand Up @@ -694,7 +686,6 @@ GTEST_TEST(AutomotiveSimulatorTest, TestDuplicateVehicleNameException) {
GTEST_TEST(AutomotiveSimulatorTest, TestIdmControllerUniqueName) {
auto simulator = std::make_unique<AutomotiveSimulator<double>>(
std::make_unique<lcm::DrakeMockLcm>());
simulator->get_lcm()->StartReceiveThread();

const MaliputRailcarParams<double> params;
const maliput::api::RoadGeometry* road = simulator->SetRoadGeometry(
Expand Down Expand Up @@ -723,7 +714,6 @@ GTEST_TEST(AutomotiveSimulatorTest, TestIdmControllerUniqueName) {
GTEST_TEST(AutomotiveSimulatorTest, TestRailcarVelocityOutput) {
auto simulator = std::make_unique<AutomotiveSimulator<double>>(
std::make_unique<lcm::DrakeMockLcm>());
simulator->get_lcm()->StartReceiveThread();

const MaliputRailcarParams<double> params;
const maliput::api::RoadGeometry* road =
Expand Down
8 changes: 4 additions & 4 deletions bindings/pydrake/lcm_py.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,17 @@ PYBIND11_MODULE(lcm, m) {

{
using Class = DrakeLcmInterface;
py::class_<Class>(m, "DrakeLcmInterface")
.def("StartReceiveThread", &Class::StartReceiveThread)
.def("StopReceiveThread", &Class::StopReceiveThread);
py::class_<Class>(m, "DrakeLcmInterface");
// TODO(eric.cousineau): Add remaining methods.
// TODO(eric.cousineau): Allow virtual overrides in Python.
}

{
using Class = DrakeLcm;
py::class_<Class, DrakeLcmInterface>(m, "DrakeLcm")
.def(py::init<>());
.def(py::init<>())
.def("StartReceiveThread", &Class::StartReceiveThread)
.def("StopReceiveThread", &Class::StopReceiveThread);
// TODO(eric.cousineau): Add remaining methods.
}

Expand Down
15 changes: 7 additions & 8 deletions bindings/pydrake/test/lcm_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,13 @@


class TestLcm(unittest.TestCase):
def _test_lcm_interface(self, lcm):
self.assertIsInstance(lcm, DrakeLcmInterface)
# Quickly start and stop the receiving thread.
lcm.StartReceiveThread()
lcm.StopReceiveThread()

def test_lcm(self):
self._test_lcm_interface(DrakeLcm())
dut = DrakeLcm()
self.assertIsInstance(dut, DrakeLcmInterface)
# Quickly start and stop the receiving thread.
dut.StartReceiveThread()
dut.StopReceiveThread()

def test_mock_lcm(self):
self._test_lcm_interface(DrakeMockLcm())
dut = DrakeMockLcm()
self.assertIsInstance(dut, DrakeLcmInterface)
26 changes: 23 additions & 3 deletions lcm/drake_lcm.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,31 @@ class DrakeLcm : public DrakeLcmInterface {
*/
~DrakeLcm() override;

void StartReceiveThread() override;
/**
* Starts the receive thread. This must be called for subscribers to receive
* any messages.
*
* @pre StartReceiveThread() was not called.
*/
void StartReceiveThread();

void StopReceiveThread() override;
/**
* Stops the receive thread. This must be called prior to any subscribers
* being destroyed. Note that the receive thread will be automatically stopped
* by this class's destructor, so usage of this method will be extremely rare.
* It will only be needed if this class's instance and the subscribers to LCM
* channels are owned by different classes. In such a scenario, this method
* can be used to ensure the receive thread is destroyed before the
* subscribers are destroyed.
*
* @pre StartReceiveThread() was called.
*/
void StopReceiveThread();

bool IsReceiveThreadRunning() const override {
/**
* Indicates that the receiving thread is running.
*/
bool IsReceiveThreadRunning() const {
return receive_thread_ != nullptr;
}

Expand Down
26 changes: 0 additions & 26 deletions lcm/drake_lcm_interface.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,32 +18,6 @@ class DrakeLcmInterface {
DrakeLcmInterface() = default;
virtual ~DrakeLcmInterface() = default;

/**
* Starts the receive thread. This must be called for subscribers to receive
* any messages.
*
* @pre StartReceiveThread() was not called.
*/
virtual void StartReceiveThread() = 0;

/**
* Stops the receive thread. This must be called prior to any subscribers
* being destroyed. Note that the receive thread will be automatically stopped
* by this class's destructor, so usage of this method will be extremely rare.
* It will only be needed if this class's instance and the subscribers to LCM
* channels are owned by different classes. In such a scenario, this method
* can be used to ensure the receive thread is destroyed before the
* subscribers are destroyed.
*
* @pre StartReceiveThread() was called.
*/
virtual void StopReceiveThread() = 0;

/**
* Indicates that the receiving thread is running.
*/
virtual bool IsReceiveThreadRunning() const = 0;

/**
* Publishes an LCM message on channel @p channel.
*
Expand Down
11 changes: 0 additions & 11 deletions lcm/drake_lcm_log.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,16 +117,6 @@ class DrakeLcmLog : public DrakeLcmInterface {
return static_cast<uint64_t>(sec * 1e6);
}

void StartReceiveThread() override {
receive_thread_started_ = true;
}
void StopReceiveThread() override {
receive_thread_started_ = false;
}
bool IsReceiveThreadRunning() const override {
return receive_thread_started_;
}

private:
const bool is_write_;
const bool overwrite_publish_time_with_system_clock_;
Expand All @@ -137,7 +127,6 @@ class DrakeLcmLog : public DrakeLcmInterface {
mutable std::mutex mutex_;
std::unique_ptr<::lcm::LogFile> log_;
const ::lcm::LogEvent* next_event_{nullptr};
std::atomic<bool> receive_thread_started_{false};
};

} // namespace lcm
Expand Down
25 changes: 7 additions & 18 deletions lcm/drake_mock_lcm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,6 @@ namespace lcm {

DrakeMockLcm::DrakeMockLcm() {}

void DrakeMockLcm::StartReceiveThread() {
DRAKE_THROW_UNLESS(!receive_thread_started_);
receive_thread_started_ = true;
}

void DrakeMockLcm::StopReceiveThread() {
DRAKE_THROW_UNLESS(receive_thread_started_);
receive_thread_started_ = false;
}

void DrakeMockLcm::Publish(const std::string& channel, const void* data,
int data_size, double) {
Expand Down Expand Up @@ -72,15 +63,13 @@ void DrakeMockLcm::Subscribe(const std::string& channel,

void DrakeMockLcm::InduceSubscriberCallback(const std::string& channel,
const void* data, int data_size) {
if (receive_thread_started_) {
if (subscriptions_.find(channel) == subscriptions_.end()) {
throw std::runtime_error(
"DrakeMockLcm::InduceSubscriberCallback: No "
"subscription to channel \"" +
channel + "\".");
} else {
subscriptions_[channel]->HandleMessage(channel, data, data_size);
}
if (subscriptions_.find(channel) == subscriptions_.end()) {
throw std::runtime_error(
"DrakeMockLcm::InduceSubscriberCallback: No "
"subscription to channel \"" +
channel + "\".");
} else {
subscriptions_[channel]->HandleMessage(channel, data, data_size);
}
}

Expand Down
14 changes: 2 additions & 12 deletions lcm/drake_mock_lcm.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,6 @@ class DrakeMockLcm : public DrakeLcmInterface {
*/
void EnableLoopBack() { enable_loop_back_ = true; }

void StartReceiveThread() override;

void StopReceiveThread() override;

bool IsReceiveThreadRunning() const override {
return receive_thread_started_;
}

void Publish(const std::string& channel, const void* data,
int data_size, double time_sec = 0) override;

Expand Down Expand Up @@ -105,9 +97,8 @@ class DrakeMockLcm : public DrakeLcmInterface {
DrakeLcmMessageHandlerInterface* handler) override;

/**
* Fakes a callback. This will only work if StartReceivedThread() was already
* called, otherwise this method will do nothing. The callback is executed by
* the same thread as the one calling this method.
* Fakes a callback. The callback is executed by the same thread as the one
* calling this method.
*
* @param[in] channel The channel on which to publish the message.
*
Expand All @@ -121,7 +112,6 @@ class DrakeMockLcm : public DrakeLcmInterface {

private:
bool enable_loop_back_{false};
bool receive_thread_started_{false};

struct LastPublishedMessage {
std::vector<uint8_t> data{};
Expand Down
6 changes: 0 additions & 6 deletions lcm/test/drake_lcm_log_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,6 @@ GTEST_TEST(LcmLogTest, LcmLogTestSaveAndRead) {
auto w_log = std::make_unique<DrakeLcmLog>("test.log", true);
const std::string channel_name("test_channel");

EXPECT_FALSE(w_log->IsReceiveThreadRunning());
w_log->StartReceiveThread();
EXPECT_TRUE(w_log->IsReceiveThreadRunning());
w_log->StopReceiveThread();
EXPECT_FALSE(w_log->IsReceiveThreadRunning());

drake::lcmt_drake_signal msg;
msg.dim = 1;
msg.val.push_back(0.1);
Expand Down
26 changes: 0 additions & 26 deletions lcm/test/drake_mock_lcm_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,6 @@ GTEST_TEST(DrakeMockLcmTest, PublishTest) {
// Instantiates the Device Under Test (DUT).
DrakeMockLcm dut;

EXPECT_FALSE(dut.IsReceiveThreadRunning());
dut.StartReceiveThread();
EXPECT_TRUE(dut.IsReceiveThreadRunning());
dut.Publish(channel_name, &message_bytes[0], message_size);

// Verifies that the message was "published".
Expand All @@ -40,8 +37,6 @@ GTEST_TEST(DrakeMockLcmTest, PublishTest) {

EXPECT_EQ(message_size, static_cast<int>(published_message_bytes.size()));
EXPECT_EQ(message_bytes, published_message_bytes);
dut.StopReceiveThread();
EXPECT_FALSE(dut.IsReceiveThreadRunning());
}

// Tests DrakeMockLcm::DecodeLastPublishedMessageAs() using an lcmt_drake_signal
Expand All @@ -62,7 +57,6 @@ GTEST_TEST(DrakeMockLcmTest, DecodeLastPublishedMessageAsTest) {

// Instantiates the Device Under Test (DUT).
DrakeMockLcm dut;
dut.StartReceiveThread();
dut.Publish(channel_name, &message_bytes[0], message_size);

// Verifies that the message was "published".
Expand Down Expand Up @@ -128,7 +122,6 @@ GTEST_TEST(DrakeMockLcmTest, SubscribeTest) {
// Instantiates a message handler.
MockMessageHandler handler;

dut.StartReceiveThread();
dut.Subscribe(kChannelName, &handler);

// Defines a fake serialized message.
Expand All @@ -146,23 +139,6 @@ GTEST_TEST(DrakeMockLcmTest, SubscribeTest) {

const vector<uint8_t>& received_bytes = handler.get_buffer();
EXPECT_EQ(message_bytes, received_bytes);

dut.StopReceiveThread();

// Verifies that no more messages are received by subscribers after the
// receive thread is stopped.
vector<uint8_t> message_bytes2;
message_bytes2.push_back(128);

dut.InduceSubscriberCallback("foo_channel", &message_bytes2[0],
message_bytes2.size());

// Verifies that the original message is returned, not the one that was
// sent after the receive thread was stopped.
EXPECT_EQ(kChannelName, handler.get_channel());
EXPECT_EQ(kMessageSize, handler.get_buffer_size());
const vector<uint8_t>& received_bytes2 = handler.get_buffer();
EXPECT_EQ(message_bytes, received_bytes2);
}

// Tests DrakeMockLcm's ability to "publish" and "subscribe" to an LCM channel
Expand All @@ -180,7 +156,6 @@ GTEST_TEST(DrakeMockLcmTest, WithLoopbackTest) {
// Instantiates a message handler.
MockMessageHandler handler;

dut.StartReceiveThread();
dut.Subscribe(kChannelName, &handler);

// Defines a fake serialized message.
Expand Down Expand Up @@ -215,7 +190,6 @@ GTEST_TEST(DrakeMockLcmTest, WithoutLoopbackTest) {
// Instantiates a message handler.
MockMessageHandler handler;

dut.StartReceiveThread();
dut.Subscribe(kChannelName, &handler);

// Defines a fake serialized message.
Expand Down
Loading

0 comments on commit c0cedeb

Please sign in to comment.