Skip to content

Commit

Permalink
Add buffer pool to reuse allocated memory
Browse files Browse the repository at this point in the history
  • Loading branch information
nsubiron committed Oct 2, 2018
1 parent 2ab8a0c commit fb2691e
Show file tree
Hide file tree
Showing 19 changed files with 3,921 additions and 55 deletions.
4 changes: 4 additions & 0 deletions LibCarla/cmake/client/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ file(GLOB libcarla_carla_headers
"${libcarla_source_path}/carla/*.h")
install(FILES ${libcarla_carla_headers} DESTINATION include/carla)

file(GLOB libcarla_moodycamel_headers
"${libcarla_source_path}/moodycamel/*.h")
install(FILES ${libcarla_moodycamel_headers} DESTINATION include/moodycamel)

file(GLOB libcarla_carla_client_headers
"${libcarla_source_path}/carla/client/*.cpp"
"${libcarla_source_path}/carla/client/*.h")
Expand Down
2 changes: 2 additions & 0 deletions LibCarla/cmake/server/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ endif()
# carla_server library.

file(GLOB_RECURSE libcarla_server_sources
"${libcarla_source_path}/carla/*.h"
"${libcarla_source_path}/carla/*.cpp"
"${libcarla_source_path}/carla/rpc/*.h"
"${libcarla_source_path}/carla/rpc/*.cpp"
"${libcarla_source_path}/carla/streaming/*.h"
Expand Down
14 changes: 14 additions & 0 deletions LibCarla/source/carla/Buffer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#include "carla/Buffer.h"

#include "carla/BufferPool.h"

namespace carla {

void Buffer::ReuseThisBuffer() {
auto pool = _parent_pool.lock();
if (pool != nullptr) {
pool->Push(std::move(*this));
}
}

} // namespace carla
31 changes: 27 additions & 4 deletions LibCarla/source/carla/Buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

#include <boost/asio/buffer.hpp>

#include <array>
#include <cstdint>
#include <limits>
#include <memory>
Expand All @@ -23,9 +22,19 @@

namespace carla {

class BufferPool;

/// A piece of raw data.
///
/// Creating a buffer bigger than max_size() is undefined.
/// Note that if more capacity is needed, a new memory block is allocated and
/// the old one is deleted. This means that by default the buffer can only
/// grow. To release the memory use `clear` or `pop`.
///
/// This is a move-only type, meant to be cheap to pass by value. If the
/// buffer is retrieved from a BufferPool, the memory is automatically pushed
/// back to the pool on destruction.
///
/// @warning Creating a buffer bigger than max_size() is undefined.
class Buffer {

// =========================================================================
Expand Down Expand Up @@ -70,17 +79,25 @@ namespace carla {
Buffer &operator=(const Buffer &) = delete;

Buffer(Buffer &&rhs) noexcept
: _size(rhs._size),
: _parent_pool(std::move(rhs._parent_pool)),
_size(rhs._size),
_capacity(rhs._capacity),
_data(rhs.pop()) {}

Buffer &operator=(Buffer &&rhs) noexcept {
_parent_pool = std::move(rhs._parent_pool);
_size = rhs._size;
_capacity = rhs._capacity;
_data = rhs.pop();
return *this;
}

~Buffer() {
if (_size > 0u) {
ReuseThisBuffer();
}
}

// =========================================================================
// -- Data access ----------------------------------------------------------
// =========================================================================
Expand Down Expand Up @@ -155,7 +172,7 @@ namespace carla {

void reset(size_type size) {
if (_capacity < size) {
log_debug("allocating sensor buffer of", size, "bytes");
log_debug("allocating buffer of", size, "bytes");
_data = std::make_unique<value_type[]>(size);
_capacity = size;
}
Expand Down Expand Up @@ -234,6 +251,12 @@ namespace carla {

private:

void ReuseThisBuffer();

friend class BufferPool;

std::weak_ptr<BufferPool> _parent_pool;

size_type _size = 0u;

size_type _capacity = 0u;
Expand Down
53 changes: 53 additions & 0 deletions LibCarla/source/carla/BufferPool.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Copyright (c) 2017 Computer Vision Center (CVC) at the Universitat Autonoma
// de Barcelona (UAB).
//
// This work is licensed under the terms of the MIT license.
// For a copy, see <https://opensource.org/licenses/MIT>.

#pragma once

#include "carla/Buffer.h"

#include "moodycamel/ConcurrentQueue.h"

#include <memory>

namespace carla {

/// A pool of Buffer. Buffers popped from this pool automatically return to
/// the pool on destruction so the allocated memory can be reused.
///
/// @warning Buffers adjust their size only by growing, they never shrink
/// unless explicitly cleared. The allocated memory is only deleted when this
/// pool is destroyed.
class BufferPool : public std::enable_shared_from_this<BufferPool> {
public:

BufferPool() = default;

explicit BufferPool(size_t estimated_size) : _queue(estimated_size) {}

/// Pop a Buffer from the queue, creates a new one if the queue is empty.
Buffer Pop() {
Buffer item;
_queue.try_dequeue(item); // we don't care if it fails.
#if __cplusplus >= 201703L // C++17
item._parent_pool = weak_from_this();
#else
item._parent_pool = shared_from_this();
#endif
return item;
}

private:

friend class Buffer;

void Push(Buffer buffer) {
_queue.enqueue(std::move(buffer));
}

moodycamel::ConcurrentQueue<Buffer> _queue;
};

} // namespace carla
38 changes: 28 additions & 10 deletions LibCarla/source/carla/ListView.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@

namespace carla {

/// A view over a range of elements in a container. Basically a pair of begin
/// and end iterators.
template<typename IT>
class ListView {
public:
Expand All @@ -25,19 +27,30 @@ namespace carla {
explicit ListView(iterator begin, iterator end)
: _begin(begin), _end(end) {}

template <typename STL_CONTAINER>
explicit ListView(STL_CONTAINER &container)
: _begin(iterator(container.begin())),
_end(iterator(container.end())) {}

ListView(const ListView &) = default;
ListView &operator=(const ListView &) = delete;

iterator begin() const {
iterator begin() {
return _begin;
}

const_iterator begin() const {
return _begin;
}

iterator end() const {
const_iterator cbegin() const {
return _begin;
}

iterator end() {
return _end;
}

const_iterator end() const {
return _end;
}

const_iterator cend() const {
return _end;
}

Expand All @@ -56,9 +69,14 @@ namespace carla {
const iterator _end;
};

template <typename T>
static inline auto MakeListView(T begin, T end) {
return ListView<T>(begin, end);
template <typename Iterator>
static inline auto MakeListView(Iterator begin, Iterator end) {
return ListView<Iterator>(begin, end);
}

template <typename Container>
static inline auto MakeListView(Container &c) {
return MakeListView(std::begin(c), std::end(c));
}

} // namespace carla
2 changes: 1 addition & 1 deletion LibCarla/source/carla/ThreadGroup.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ namespace carla {
class ThreadGroup {
public:

ThreadGroup() {}
ThreadGroup() = default;

ThreadGroup(const ThreadGroup &) = delete;
ThreadGroup &operator=(const ThreadGroup &) = delete;
Expand Down
17 changes: 0 additions & 17 deletions LibCarla/source/carla/streaming/Message.h

This file was deleted.

20 changes: 19 additions & 1 deletion LibCarla/source/carla/streaming/Stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ namespace detail {

} // namespace detail

/// A stream represents an unidirectional channel for sending data from server
/// to client. A (single) client can subscribe to this stream using the stream
/// token. If no client is subscribed, the data flushed down the stream is
/// discarded.
class Stream {
public:

Expand All @@ -33,10 +37,22 @@ namespace detail {
Stream &operator=(const Stream &) = default;
Stream &operator=(Stream &&) = default;

/// Token associated with this stream. This token can be used by a client to
/// subscribe to this stream.
Token token() const {
return _shared_state->token();
}

/// Pull a buffer from the buffer pool associated to this stream. Discarded
/// buffers are re-used to avoid memory allocations.
///
/// @note Re-using buffers is optimized for the use case in which all the
/// messages sent through the stream are big and have (approximately) the
/// same size.
Buffer MakeBuffer() {
return _shared_state->MakeBuffer();
}

/// Flush @a buffers down the stream. No copies are made.
template <typename... Buffers>
void Write(Buffers... buffers) {
Expand All @@ -46,7 +62,9 @@ namespace detail {
/// Make a copy of @a data and flush it down the stream.
template <typename T>
Stream &operator<<(const T &data) {
Write(Buffer(data));
auto buffer = MakeBuffer();
buffer.copy_from(data);
Write(std::move(buffer));
return *this;
}

Expand Down
1 change: 1 addition & 0 deletions LibCarla/source/carla/streaming/Token.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
namespace carla {
namespace streaming {

/// A token that uniquely identify a stream.
class Token {
public:

Expand Down
27 changes: 27 additions & 0 deletions LibCarla/source/carla/streaming/detail/StreamState.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Copyright (c) 2017 Computer Vision Center (CVC) at the Universitat Autonoma
// de Barcelona (UAB).
//
// This work is licensed under the terms of the MIT license.
// For a copy, see <https://opensource.org/licenses/MIT>.

#include "carla/streaming/detail/StreamState.h"

#include "carla/BufferPool.h"

namespace carla {
namespace streaming {
namespace detail {

StreamState::StreamState(const token_type &token)
: _token(token),
_buffer_pool(std::make_shared<BufferPool>()) {}

StreamState::~StreamState() = default;

Buffer StreamState::MakeBuffer() {
return _buffer_pool->Pop();
}

} // namespace detail
} // namespace streaming
} // namespace carla
17 changes: 12 additions & 5 deletions LibCarla/source/carla/streaming/detail/StreamState.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,14 @@
#include <memory>

namespace carla {

class BufferPool;

namespace streaming {
namespace detail {

/// Handles the synchronization of the shared session.
class SessionHolder {
class SessionHolder : private NonCopyable {
public:

void set_session(std::shared_ptr<Session> session) {
Expand All @@ -38,17 +41,19 @@ namespace detail {

/// Shared state among all the copies of a stream. Provides access to the
/// underlying server session if active.
class StreamState
: public SessionHolder,
private NonCopyable {
class StreamState : public SessionHolder {
public:

explicit StreamState(const token_type &token) : _token(token) {}
explicit StreamState(const token_type &token);

~StreamState();

const token_type &token() const {
return _token;
}

Buffer MakeBuffer();

template <typename... Buffers>
void Write(Buffers... buffers) {
auto session = get_session();
Expand All @@ -60,6 +65,8 @@ namespace detail {
private:

const token_type _token;

const std::shared_ptr<BufferPool> _buffer_pool;
};

} // namespace detail
Expand Down
Loading

0 comments on commit fb2691e

Please sign in to comment.