Skip to content

Commit

Permalink
[lcm] Use Impl in DrakeLcmLog (RobotLocomotion#17290)
Browse files Browse the repository at this point in the history
This buries the upstream lcm header file into our cc file.
  • Loading branch information
jwnimmer-tri authored May 31, 2022
1 parent cd4bb4c commit 2a3371e
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 30 deletions.
60 changes: 41 additions & 19 deletions lcm/drake_lcm_log.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,31 +3,50 @@
#include <chrono>
#include <iostream>
#include <limits>
#include <map>
#include <stdexcept>
#include <utility>
#include <vector>

#include "lcm/lcm-cpp.hpp"

#include "drake/common/drake_assert.h"

namespace drake {
namespace lcm {

using HandlerFunction = DrakeLcmInterface::HandlerFunction;
using MultichannelHandlerFunction =
DrakeLcmInterface::MultichannelHandlerFunction;

class DrakeLcmLog::Impl {
public:
std::multimap<std::string, HandlerFunction> subscriptions_;
std::vector<MultichannelHandlerFunction> multichannel_subscriptions_;
std::unique_ptr<::lcm::LogFile> log_;
const ::lcm::LogEvent* next_event_{nullptr};
};

DrakeLcmLog::DrakeLcmLog(const std::string& file_name, bool is_write,
bool overwrite_publish_time_with_system_clock)
: is_write_(is_write),
overwrite_publish_time_with_system_clock_(
overwrite_publish_time_with_system_clock),
url_("lcmlog://" + file_name) {
url_("lcmlog://" + file_name),
impl_(new Impl) {
if (is_write_) {
log_ = std::make_unique<::lcm::LogFile>(file_name, "w");
impl_->log_ = std::make_unique<::lcm::LogFile>(file_name, "w");
} else {
log_ = std::make_unique<::lcm::LogFile>(file_name, "r");
next_event_ = log_->readNextEvent();
impl_->log_ = std::make_unique<::lcm::LogFile>(file_name, "r");
impl_->next_event_ = impl_->log_->readNextEvent();
}
if (!log_->good()) {
if (!impl_->log_->good()) {
throw std::runtime_error("Failed to open log file: " + file_name);
}
}

DrakeLcmLog::~DrakeLcmLog() = default;

std::string DrakeLcmLog::get_lcm_url() const {
return url_;
}
Expand All @@ -38,8 +57,6 @@ void DrakeLcmLog::Publish(const std::string& channel, const void* data,
throw std::logic_error("Publish is only available for log saving.");
}

std::lock_guard<std::mutex> lock(mutex_);

::lcm::LogEvent log_event{};
if (!overwrite_publish_time_with_system_clock_) {
log_event.timestamp = second_to_timestamp(time_sec.value_or(0.0));
Expand All @@ -51,8 +68,9 @@ void DrakeLcmLog::Publish(const std::string& channel, const void* data,
log_event.datalen = data_size;
log_event.data = const_cast<void*>(data);

std::lock_guard<std::mutex> lock(mutex_);
// TODO(siyuan): should make cache this or thread write this.
if (log_->writeEvent(&log_event) != 0) {
if (impl_->log_->writeEvent(&log_event) != 0) {
throw std::runtime_error("Publish failed to write to log file.");
}
}
Expand All @@ -63,7 +81,7 @@ std::shared_ptr<DrakeSubscriptionInterface> DrakeLcmLog::Subscribe(
throw std::logic_error("Subscribe is only available for log playback.");
}
std::lock_guard<std::mutex> lock(mutex_);
subscriptions_.emplace(channel, std::move(handler));
impl_->subscriptions_.emplace(channel, std::move(handler));
return nullptr;
}

Expand All @@ -73,7 +91,7 @@ std::shared_ptr<DrakeSubscriptionInterface> DrakeLcmLog::SubscribeAllChannels(
throw std::logic_error("Subscribe is only available for log playback.");
}
std::lock_guard<std::mutex> lock(mutex_);
multichannel_subscriptions_.push_back(std::move(handler));
impl_->multichannel_subscriptions_.push_back(std::move(handler));
return nullptr;
}

Expand All @@ -92,10 +110,10 @@ double DrakeLcmLog::GetNextMessageTime() const {
}

std::lock_guard<std::mutex> lock(mutex_);
if (next_event_ == nullptr) {
if (impl_->next_event_ == nullptr) {
return std::numeric_limits<double>::infinity();
}
return timestamp_to_second(next_event_->timestamp);
return timestamp_to_second(impl_->next_event_->timestamp);
}

void DrakeLcmLog::DispatchMessageAndAdvanceLog(double current_time) {
Expand All @@ -106,25 +124,29 @@ void DrakeLcmLog::DispatchMessageAndAdvanceLog(double current_time) {

std::lock_guard<std::mutex> lock(mutex_);
// End of log, do nothing.
if (next_event_ == nullptr) return;
if (impl_->next_event_ == nullptr) {
return;
}
const ::lcm::LogEvent& next_event = *impl_->next_event_;

// Do nothing if the call time does not match the event's time.
if (current_time != timestamp_to_second(next_event_->timestamp)) {
if (current_time != timestamp_to_second(next_event.timestamp)) {
return;
}

// Dispatch message if necessary.
const auto& range = subscriptions_.equal_range(next_event_->channel);
const auto& range = impl_->subscriptions_.equal_range(next_event.channel);
for (auto iter = range.first; iter != range.second; ++iter) {
const HandlerFunction& handler = iter->second;
handler(next_event_->data, next_event_->datalen);
handler(next_event.data, next_event.datalen);
}
for (MultichannelHandlerFunction& handler : multichannel_subscriptions_) {
handler(next_event_->channel, next_event_->data, next_event_->datalen);
for (const MultichannelHandlerFunction& handler :
impl_->multichannel_subscriptions_) {
handler(next_event.channel, next_event.data, next_event.datalen);
}

// Advance log.
next_event_ = log_->readNextEvent();
impl_->next_event_ = impl_->log_->readNextEvent();
}

void DrakeLcmLog::OnHandleSubscriptionsError(const std::string& error_message) {
Expand Down
16 changes: 5 additions & 11 deletions lcm/drake_lcm_log.h
Original file line number Diff line number Diff line change
@@ -1,14 +1,9 @@
#pragma once

#include <atomic>
#include <map>
#include <memory>
#include <mutex>
#include <optional>
#include <string>
#include <vector>

#include "lcm/lcm-cpp.hpp"

#include "drake/common/drake_copyable.h"
#include "drake/lcm/drake_lcm_interface.h"
Expand Down Expand Up @@ -46,6 +41,8 @@ class DrakeLcmLog : public DrakeLcmInterface {
DrakeLcmLog(const std::string& file_name, bool is_write,
bool overwrite_publish_time_with_system_clock = false);

~DrakeLcmLog() override;

/**
* Writes an entry occurred at @p timestamp with content @p data to the log
* file. The current implementation blocks until writing is done.
Expand Down Expand Up @@ -149,13 +146,10 @@ class DrakeLcmLog : public DrakeLcmInterface {
// (i.e., where multiple threads are coming from). That factor needs to be
// re-discovered and then documented somewhere.

// This mutes guards access to all of the below member fields.
// This mutex guards access to the Impl object.
mutable std::mutex mutex_;
std::multimap<std::string, DrakeLcmInterface::HandlerFunction> subscriptions_;
std::vector<DrakeLcmInterface::MultichannelHandlerFunction>
multichannel_subscriptions_;
std::unique_ptr<::lcm::LogFile> log_;
const ::lcm::LogEvent* next_event_{nullptr};
class Impl;
const std::unique_ptr<Impl> impl_;
};

} // namespace lcm
Expand Down

0 comments on commit 2a3371e

Please sign in to comment.