Skip to content

Commit

Permalink
Merge branch 'fix/elev_mapping_postprocessor_multiple_functors' into …
Browse files Browse the repository at this point in the history
…'master'

[elevation_mapping] Multiple functors per postprocessing pool, each owning a filter chain

GitOrigin-RevId: c972b26f7900420bb71918f55bf74113ea4ee3ab
  • Loading branch information
YoshuaNavaANYbotics authored and anybotics-sync-runner committed Dec 22, 2020
1 parent 5c84a55 commit e0864b5
Show file tree
Hide file tree
Showing 7 changed files with 151 additions and 52 deletions.
1 change: 1 addition & 0 deletions elevation_mapping/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ add_library(${PROJECT_NAME}_library
src/input_sources/Input.cpp
src/input_sources/InputSourceManager.cpp
src/postprocessing/PostprocessorPool.cpp
src/postprocessing/PostprocessingWorker.cpp
src/postprocessing/PostprocessingPipelineFunctor.cpp
src/RobotMotionMapUpdater.cpp
src/sensor_processors/SensorProcessorBase.cpp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@

#pragma once

#include <memory>

#include <filters/filter_chain.h>
#include <ros/ros.h>
#include <grid_map_core/GridMap.hpp>
Expand All @@ -34,7 +32,7 @@ class PostprocessingPipelineFunctor {
using GridMap = grid_map::GridMap;

/**
* @brief Constructor.
* @brief Explicit Constructor.
* @param nodeHandle The node handle to read parameters from and to publish output data.
*/
explicit PostprocessingPipelineFunctor(ros::NodeHandle& nodeHandle);
Expand All @@ -55,14 +53,14 @@ class PostprocessingPipelineFunctor {
* Publishes a given grid map.
* @param gridMap The Grid Map that this functor will publish.
*/
void publish(GridMap& gridMap) const;
void publish(const GridMap& gridMap) const;

/**
* Checks whether there are any subscribers to the result of this pipeline.
* Checks whether there are any subscribers to the result of this functor.
*
* @return True if someone listens to the topic this task publishes to.
* @return True if someone listens to the topic this functor publishes to.
*/
bool pipelineHasSubscribers() const;
bool hasSubscribers() const;

private:
/**
Expand All @@ -81,8 +79,7 @@ class PostprocessingPipelineFunctor {
ros::Publisher publisher_;

//! Filter chain.
// We wrap the filter chain with a unique_ptr to simplify moving this object, overcoming shortcomings of its implementation.
std::unique_ptr<filters::FilterChain<grid_map::GridMap>> filterChain_;
filters::FilterChain<grid_map::GridMap> filterChain_;

//! Filter chain parameters name.
std::string filterChainParametersName_;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* PostprocessingWorker.hpp
*
* Created on: Dec. 21, 2020
* Author: Yoshua Nava
* Institute: ANYbotics
*/

#pragma once

#include <boost/asio.hpp>
#include <boost/thread.hpp>
#include <thread>

#include <grid_map_core/GridMap.hpp>

#include "elevation_mapping/postprocessing/PostprocessingPipelineFunctor.hpp"

namespace elevation_mapping {

/**
* @brief A wrapper around the postprocessing pipelines functor.
*
* @remark It stores together the functor, grid map data buffer, and thread tooling.
* It is assumed that the members of this function are guarded by an external mutex,
* handled by the owner of this class.
*/
class PostprocessingWorker {
public:
using GridMap = grid_map::GridMap;

explicit PostprocessingWorker(ros::NodeHandle nodeHandle);

/*! @name Accessors */
///@{
boost::asio::io_service& ioService() { return ioService_; }
std::thread& thread() { return thread_; }
const GridMap& dataBuffer() { return dataBuffer_; }
void setDataBuffer(GridMap data) { dataBuffer_ = std::move(data); }
///@}

/*! @name Methods */
///@{
/**
* @brief Process the data in the buffer.
*
* @return GridMap Processed grid map.
*/
GridMap processBuffer();

/**
* @brief Publish a given grid map.
*
* @param gridMap The grid map to publish.
*/
void publish(const GridMap& gridMap) const;

/**
* @brief Checks whether the worker publisher has any active subscribers.
*
* @return true If there are subscribers to the worker publisher, false otherwise.
*/
bool hasSubscribers() const;
///@}

protected:
//! The functor to execute on a given GridMap.
PostprocessingPipelineFunctor functor_;

//! BOOST Service Worker Infrastructure.
//! The io_service objects provide the interface to post an asynchronous task.
//! The work object ensures that the io_service run() method keeps spinning and accepts tasks.
//! A thread executes the io_service::run() method, which does io_service::work, ie accepting and executing new tasks.
//! IO service for asynchronous operation.
boost::asio::io_service ioService_;
//! IO service work notifier
boost::asio::io_service::work work_;
//! The thread on which this worker runs.
std::thread thread_;

//! Data container for the worker.
GridMap dataBuffer_;
};

} // namespace elevation_mapping
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,15 @@

#include <boost/asio.hpp>
#include <boost/thread.hpp>
#include <deque>
#include <memory>
#include <mutex>
#include <thread>

#include <grid_map_core/GridMap.hpp>

#include "elevation_mapping/postprocessing/PostprocessingPipelineFunctor.hpp"
#include "elevation_mapping/postprocessing/PostprocessingWorker.hpp"

namespace elevation_mapping {

Expand All @@ -38,7 +41,11 @@ class PostprocessorPool {
*/
PostprocessorPool(std::size_t poolSize, ros::NodeHandle nodeHandle);

//! @brief Destructor.
/**
* @brief Destructor.
*
* Requests all postprocessing pipelines to stop and joins their threads until they're done.
*/
~PostprocessorPool();

/**
Expand All @@ -62,27 +69,13 @@ class PostprocessorPool {
*/
void wrapTask(size_t serviceIndex);

//! The functor to execute on a given GridMap.
PostprocessingPipelineFunctor postprocessor_;

//! Multiple service workers, all addressed with their corresponding index.
//! The io_service objects provide the interface to post an asynchronous task.
//! The work object ensures that the io_service run() method keeps spinning and accepts tasks.
//! A thread executes the io_service::run() method, which does io_service::work, ie accepting and executing new tasks.

//! Service objects, object i runs work_[i] on threads_[i].
std::vector<boost::asio::io_service> ioService_;
//! The routines to accept is_service tasks.
std::vector<boost::asio::io_service::work> work_;
//! Data container for the workers. E.g ioService_[i] operates on dataBuffers_[i].
std::vector<GridMap> dataBuffers_;
//! The threads on which the services run. E.g ioService_[i] runs on threads_[i].
std::vector<std::thread> threads_;
// Post-processing workers.
std::vector<std::unique_ptr<PostprocessingWorker>> workers_;

//! Container holding the service ids which have corresponding threads. The only object that is used in a mutual exclusive manner and must
//! be protected by availableServicesMutex_.
std::deque<size_t> availableServices_;
boost::mutex availableServicesMutex_;
std::deque<size_t> availableServices_;
};

} // namespace elevation_mapping
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,14 @@
namespace elevation_mapping {

PostprocessingPipelineFunctor::PostprocessingPipelineFunctor(ros::NodeHandle& nodeHandle)
: nodeHandle_(nodeHandle), filterChain_(), filterChainConfigured_(false) {
: nodeHandle_(nodeHandle), filterChain_("grid_map::GridMap"), filterChainConfigured_(false) {
// TODO (magnus) Add logic when setting up failed. What happens actually if it is not configured?
readParameters();

// Note: we declare the ROS filter chain as a unique pointer so that each PostprocessingPipelineFunctor
// can own one chain, without that chain taking away move construction ability from the functor.
filterChain_ = std::make_unique<filters::FilterChain<grid_map::GridMap>>("grid_map::GridMap");

publisher_ = nodeHandle_.advertise<grid_map_msgs::GridMap>(outputTopic_, 1, true);

// Setup filter chain.
if (!nodeHandle.hasParam(filterChainParametersName_) || !filterChain_->configure(filterChainParametersName_, nodeHandle)) {
if (!nodeHandle.hasParam(filterChainParametersName_) || !filterChain_.configure(filterChainParametersName_, nodeHandle)) {
ROS_WARN("Could not configure the filter chain. Will publish the raw elevation map without postprocessing!");
return;
}
Expand All @@ -48,23 +44,23 @@ grid_map::GridMap PostprocessingPipelineFunctor::operator()(GridMap& inputMap) {
}

grid_map::GridMap outputMap;
if (not filterChain_->update(inputMap, outputMap)) {
if (not filterChain_.update(inputMap, outputMap)) {
ROS_ERROR("Could not perform the grid map filter chain! Forwarding the raw elevation map!");
return inputMap;
}

return outputMap;
}

void PostprocessingPipelineFunctor::publish(GridMap& gridMap) const {
void PostprocessingPipelineFunctor::publish(const GridMap& gridMap) const {
// Publish filtered output grid map.
grid_map_msgs::GridMap outputMessage;
grid_map::GridMapRosConverter::toMessage(gridMap, outputMessage);
publisher_.publish(outputMessage);
ROS_DEBUG("Elevation map raw has been published.");
}

bool PostprocessingPipelineFunctor::pipelineHasSubscribers() const {
bool PostprocessingPipelineFunctor::hasSubscribers() const {
return publisher_.getNumSubscribers() > 0;
}

Expand Down
28 changes: 28 additions & 0 deletions elevation_mapping/src/postprocessing/PostprocessingWorker.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* PostprocessingWorker.cpp
*
* Created on: Dec. 21, 2020
* Author: Yoshua Nava
* Institute: ANYbotics
*/

#include "elevation_mapping/postprocessing/PostprocessingWorker.hpp"

namespace elevation_mapping {

PostprocessingWorker::PostprocessingWorker(ros::NodeHandle nodeHandle)
: functor_(nodeHandle), work_(ioService_), thread_(boost::bind(&boost::asio::io_service::run, &ioService_)) {}

PostprocessingWorker::GridMap PostprocessingWorker::processBuffer() {
return functor_(dataBuffer_);
}

void PostprocessingWorker::publish(const GridMap& gridMap) const {
functor_.publish(gridMap);
}

bool PostprocessingWorker::hasSubscribers() const {
return functor_.hasSubscribers();
}

} // namespace elevation_mapping
31 changes: 15 additions & 16 deletions elevation_mapping/src/postprocessing/PostprocessorPool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,28 +10,26 @@

namespace elevation_mapping {

PostprocessorPool::PostprocessorPool(std::size_t poolSize, ros::NodeHandle nodeHandle)
: postprocessor_(nodeHandle), ioService_(poolSize), dataBuffers_(poolSize) {
PostprocessorPool::PostprocessorPool(std::size_t poolSize, ros::NodeHandle nodeHandle) {
for (std::size_t i = 0; i < poolSize; ++i) {
// Add worker to the collection.
workers_.emplace_back(std::make_unique<PostprocessingWorker>(nodeHandle));
// Create one service per thread
availableServices_.push_back(i);
// Generate services and link work to it. Needed to accept tasks and keep the service thread running.
work_.emplace_back(ioService_.at(i));
threads_.emplace_back(boost::bind(&boost::asio::io_service::run, &ioService_.at(i)));
}
}

PostprocessorPool::~PostprocessorPool() {
// Force all threads to return from io_service::run().
for (auto& service : ioService_) {
service.stop();
for (auto& worker : workers_) {
worker->ioService().stop();
}

// Suppress all exceptions. Try to join every worker thread.
for (auto& thread : threads_) {
for (auto& worker : workers_) {
try {
if (thread.joinable()) {
thread.join();
if (worker->thread().joinable()) {
worker->thread().join();
}
} catch (const std::exception&) {
}
Expand All @@ -50,20 +48,20 @@ bool PostprocessorPool::runTask(const GridMap& gridMap) {
availableServices_.pop_back();
}

// Copy data to the buffer for the worker thread.
dataBuffers_.at(serviceIndex) = gridMap;
// Copy data to the buffer for the worker.
workers_.at(serviceIndex)->setDataBuffer(gridMap);

// Create a task with the post-processor and dispatch it.
auto task = std::bind(&PostprocessorPool::wrapTask, this, serviceIndex);
ioService_.at(serviceIndex).post(task);
workers_.at(serviceIndex)->ioService().post(task);
return true;
}

void PostprocessorPool::wrapTask(size_t serviceIndex) {
// Run the user supplied task.
try {
GridMap postprocessedMap = postprocessor_(dataBuffers_.at(serviceIndex));
postprocessor_.publish(postprocessedMap);
GridMap postprocessedMap = workers_.at(serviceIndex)->processBuffer();
workers_.at(serviceIndex)->publish(postprocessedMap);
}
// Suppress all exceptions.
catch (const std::exception& exception) {
Expand All @@ -76,7 +74,8 @@ void PostprocessorPool::wrapTask(size_t serviceIndex) {
}

bool PostprocessorPool::pipelineHasSubscribers() const {
return postprocessor_.pipelineHasSubscribers();
return std::all_of(workers_.cbegin(), workers_.cend(),
[](const std::unique_ptr<PostprocessingWorker>& worker) { return worker->hasSubscribers(); });
}

} // namespace elevation_mapping

0 comments on commit e0864b5

Please sign in to comment.