Skip to content

Commit

Permalink
Improve queue usage (ros2#75)
Browse files Browse the repository at this point in the history
* ros2GH-155 Improve shared queue usage

Add printouts of message queue size when refilling
Print warning when starving

* ros2GH-155 Extract method for readability

* ros2GH-155 Fix replay timing when starved and improve warning message

* Improve patching in vendor package
  • Loading branch information
anhosi authored and Karsten1987 committed Dec 20, 2018
1 parent f97189b commit 8b043fe
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 21 deletions.
19 changes: 8 additions & 11 deletions ros1_rosbag_storage_vendor/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ if(WIN32)
return()
endif()

set(git_apply git apply)

include(ExternalProject)
# We have to include a number of patches to the rosbag
# 1. rosbag1_encryption_patch includes https://github.com/ros/ros_comm/pull/1499, without it, pluginlib won't link
Expand All @@ -46,17 +44,16 @@ ExternalProject_Add(ros1_rosbag_storage
URL https://github.com/ros/ros_comm/archive/669fbd32d2f92cc295f4b024fcb2f982fddec0f0.zip
URL_MD5 4c8b4c33165b223870f5d77bb697bef6
TIMEOUT 60
SOURCE_DIR ${CMAKE_CURRENT_BINARY_DIR}/sources
CMAKE_ARGS -DCMAKE_INSTALL_PREFIX=${CMAKE_CURRENT_BINARY_DIR}/rosbag_install -DCMAKE_NO_SYSTEM_FROM_IMPORTED=1
PATCH_COMMAND
${git_apply} ${CMAKE_CURRENT_SOURCE_DIR}/resources/rosbag1_encryption_patch.diff &&
${git_apply} ${CMAKE_CURRENT_SOURCE_DIR}/resources/bagh.diff &&
${git_apply} ${CMAKE_CURRENT_SOURCE_DIR}/resources/bagcpp.diff &&
${git_apply} ${CMAKE_CURRENT_SOURCE_DIR}/resources/bagcpp_name.diff &&
${git_apply} ${CMAKE_CURRENT_SOURCE_DIR}/resources/plugin_descriptionxml.diff &&
${git_apply} ${CMAKE_CURRENT_SOURCE_DIR}/resources/packagexml.diff &&
${git_apply} ${CMAKE_CURRENT_SOURCE_DIR}/resources/cmakeliststxt.diff &&
${CMAKE_COMMAND} -E copy ${CMAKE_CURRENT_SOURCE_DIR}/cmake/CMakeLists.txt.in ${CMAKE_CURRENT_BINARY_DIR}/sources/CMakeLists.txt
patch -p1 -N < ${CMAKE_CURRENT_SOURCE_DIR}/resources/rosbag1_encryption_patch.diff &&
patch -p1 -N < ${CMAKE_CURRENT_SOURCE_DIR}/resources/bagh.diff &&
patch -p1 -N < ${CMAKE_CURRENT_SOURCE_DIR}/resources/bagcpp.diff &&
patch -p1 -N < ${CMAKE_CURRENT_SOURCE_DIR}/resources/bagcpp_name.diff &&
patch -p1 -N < ${CMAKE_CURRENT_SOURCE_DIR}/resources/plugin_descriptionxml.diff &&
patch -p1 -N < ${CMAKE_CURRENT_SOURCE_DIR}/resources/packagexml.diff &&
patch -p1 -N < ${CMAKE_CURRENT_SOURCE_DIR}/resources/cmakeliststxt.diff &&
${CMAKE_COMMAND} -E copy ${CMAKE_CURRENT_SOURCE_DIR}/cmake/CMakeLists.txt.in ${CMAKE_CURRENT_BINARY_DIR}/ros1_rosbag_storage-prefix/src/ros1_rosbag_storage/CMakeLists.txt
)

install(
Expand Down
1 change: 0 additions & 1 deletion ros1_rosbag_storage_vendor/package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

<build_depend>boost</build_depend>
<build_depend>bzip2</build_depend>
<build_depend>git</build_depend>
<build_depend>libconsole-bridge-dev</build_depend>
<build_depend>libgpgme-dev</build_depend>
<build_depend>libssl-dev</build_depend>
Expand Down
26 changes: 17 additions & 9 deletions rosbag2_transport/src/rosbag2_transport/player.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,15 +116,23 @@ void Player::enqueue_up_to_boundary(const TimePoint & time_first_message, uint64

void Player::play_messages_from_queue()
{
auto start_time = std::chrono::high_resolution_clock::now();

while ((message_queue_.size_approx() != 0 || !is_storage_completely_loaded()) && rclcpp::ok()) {
ReplayableMessage message;
if (message_queue_.try_dequeue(message)) {
std::this_thread::sleep_until(start_time + message.time_since_start);
if (rclcpp::ok()) {
publishers_[message.message->topic_name]->publish(message.message->serialized_data);
}
start_time_ = std::chrono::system_clock::now();
do {
play_messages_until_queue_empty();
if (!is_storage_completely_loaded() && rclcpp::ok()) {
ROSBAG2_TRANSPORT_LOG_WARN("Message queue starved. Messages will be delayed. Consider "
"increasing the --read-ahead-queue-size option.");
}
} while (!is_storage_completely_loaded() && rclcpp::ok());
}

void Player::play_messages_until_queue_empty()
{
ReplayableMessage message;
while (message_queue_.try_dequeue(message) && rclcpp::ok()) {
std::this_thread::sleep_until(start_time_ + message.time_since_start);
if (rclcpp::ok()) {
publishers_[message.message->topic_name]->publish(message.message->serialized_data);
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions rosbag2_transport/src/rosbag2_transport/player.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,15 @@ class Player
void enqueue_up_to_boundary(const TimePoint & time_first_message, uint64_t boundary);
void wait_for_filled_queue(const PlayOptions & options) const;
void play_messages_from_queue();
void play_messages_until_queue_empty();
void prepare_publishers();

static constexpr double read_ahead_lower_bound_percentage_ = 0.9;
static const std::chrono::milliseconds queue_read_wait_period_;

std::shared_ptr<rosbag2::SequentialReader> reader_;
moodycamel::ReaderWriterQueue<ReplayableMessage> message_queue_;
std::chrono::time_point<std::chrono::system_clock> start_time_;
mutable std::future<void> storage_loading_future_;
std::shared_ptr<Rosbag2Node> rosbag2_transport_;
std::unordered_map<std::string, std::shared_ptr<GenericPublisher>> publishers_;
Expand Down

0 comments on commit 8b043fe

Please sign in to comment.