Skip to content

Commit

Permalink
framework: fix object pool memory leak issue (ApolloAuto#2002)
Browse files Browse the repository at this point in the history
* framework: format scheduler code style

* framework: Fix circular dependency in scheduler

* framework: fix object pool memory leak issue

* framework: make routine more robust

* framework: fix lint issues

* framework: update BUILD files && remove finished TODO.
  • Loading branch information
GoLancer authored and Jiangtao Hu committed Dec 13, 2018
1 parent 65b481c commit 215ccd2
Show file tree
Hide file tree
Showing 50 changed files with 494 additions and 256 deletions.
12 changes: 8 additions & 4 deletions cyber/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,13 @@ cc_library(

cc_binary(
name = "mainboard",
srcs = glob([
"mainboard/*.cc",
"mainboard/*.h",
]),
srcs = [
"mainboard/mainboard.cc",
"mainboard/module_controller.cc",
"mainboard/module_controller.h",
"mainboard/module_argument.cc",
"mainboard/module_argument.h",
],
copts = [
"-pthread",
],
Expand Down Expand Up @@ -101,6 +104,7 @@ cc_library(
"//cyber/parameter:parameter_server",
"//cyber/record",
"//cyber/scheduler",
"//cyber/scheduler:scheduler_factory",
"//cyber/service:client",
"//cyber/service",
"//cyber/service_discovery:topology_manager",
Expand Down
48 changes: 42 additions & 6 deletions cyber/base/concurrent_object_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,12 @@ class CCObjectPool : public std::enable_shared_from_this<CCObjectPool<T>> {
virtual ~CCObjectPool();

template <typename... Args>
std::shared_ptr<T> GetObject(Args &&... args);
void ConstructAll(Args &&... args);

template <typename... Args>
std::shared_ptr<T> ConstructObject(Args &&... args);

std::shared_ptr<T> GetObject();
void ReleaseObject(T *);
uint32_t size() const;

Expand All @@ -58,6 +62,7 @@ class CCObjectPool : public std::enable_shared_from_this<CCObjectPool<T>> {
private:
CCObjectPool(CCObjectPool &) = delete;
CCObjectPool &operator=(CCObjectPool &) = delete;
bool FindFreeHead(Head *head);

std::atomic<Head> free_head_;
Node *node_arena_ = nullptr;
Expand All @@ -72,31 +77,62 @@ CCObjectPool<T>::CCObjectPool(uint32_t size) : capacity_(size) {
free_head_.store({0, node_arena_}, std::memory_order_relaxed);
}

template <typename T>
template <typename... Args>
void CCObjectPool<T>::ConstructAll(Args &&... args) {
FOR_EACH(i, 0, capacity_) {
new (node_arena_ + i) T(std::forward<Args>(args)...);
}
}

template <typename T>
CCObjectPool<T>::~CCObjectPool() {
std::free(node_arena_);
}

template <typename T>
template <typename... Args>
std::shared_ptr<T> CCObjectPool<T>::GetObject(Args &&... args) {
bool CCObjectPool<T>::FindFreeHead(Head *head) {
Head new_head;
Head old_head = free_head_.load(std::memory_order_acquire);
do {
if (unlikely(old_head.node == nullptr)) {
return nullptr;
return false;
}
new_head.node = old_head.node->next;
new_head.count = old_head.count + 1;
} while (!free_head_.compare_exchange_weak(old_head, new_head,
std::memory_order_acq_rel,
std::memory_order_acquire));
*head = old_head;
return true;
}

template <typename T>
std::shared_ptr<T> CCObjectPool<T>::GetObject() {
Head free_head;
if (unlikely(!FindFreeHead(&free_head))) {
return nullptr;
}
auto self = this->shared_from_this();
T *ptr = new (old_head.node) T(std::forward<Args>(args)...);
return std::shared_ptr<T>(ptr,
return std::shared_ptr<T>(reinterpret_cast<T *>(free_head.node),
[self](T *object) { self->ReleaseObject(object); });
}

template <typename T>
template <typename... Args>
std::shared_ptr<T> CCObjectPool<T>::ConstructObject(Args &&... args) {
Head free_head;
if (unlikely(!FindFreeHead(&free_head))) {
return nullptr;
}
auto self = this->shared_from_this();
T *ptr = new (free_head.node) T(std::forward<Args>(args)...);
return std::shared_ptr<T>(ptr, [self](T *object) {
object->~T();
self->ReleaseObject(object);
});
}

template <typename T>
void CCObjectPool<T>::ReleaseObject(T *object) {
Head new_head;
Expand Down
67 changes: 58 additions & 9 deletions cyber/base/object_pool_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,14 @@ namespace cyber {
namespace base {

struct TestNode {
TestNode() {}
explicit TestNode(int data) : value(data) {}
TestNode() : inited(true) {}
~TestNode() {
inited = false;
value = 1;
}
explicit TestNode(int data) : value(data), inited(true) {}
int value = 0;
bool inited = false;
};

TEST(CCObjectPoolTest, base) {
Expand All @@ -37,15 +42,15 @@ TEST(CCObjectPoolTest, base) {
auto pool = std::make_shared<CCObjectPool<TestNode>>(capacity);

FOR_EACH(i, 0, capacity) {
auto obj = pool->GetObject(i);
auto obj = pool->ConstructObject(i);
vec.push_back(obj);
EXPECT_EQ(i, obj->value);
}

FOR_EACH(i, 0, 10) { EXPECT_EQ(nullptr, pool->GetObject(10)); }
FOR_EACH(i, 0, 10) { EXPECT_EQ(nullptr, pool->ConstructObject(10)); }

vec.clear();
EXPECT_EQ(10, pool->GetObject(10)->value);
EXPECT_EQ(10, pool->ConstructObject(10)->value);

pool.reset();
}
Expand All @@ -59,22 +64,66 @@ TEST(CCObjectPoolTest, multi_thread) {
auto pool = std::make_shared<CCObjectPool<TestNode>>(capacity);
FOR_EACH(i, 0, 16) {
thread_pool.emplace_back([pool]() {
FOR_EACH(i, 0, 100000) { pool->GetObject(10); }
FOR_EACH(i, 0, 100000) { pool->ConstructObject(10); }
});
}
for (auto& thread : thread_pool) {
thread.join();
}

FOR_EACH(i, 0, capacity) {
auto obj = pool->GetObject(i);
auto obj = pool->ConstructObject(i);
vec.push_back(obj);
EXPECT_EQ(i, obj->value);
}

FOR_EACH(i, 0, 10) { EXPECT_EQ(nullptr, pool->GetObject(10)); }
FOR_EACH(i, 0, 10) { EXPECT_EQ(nullptr, pool->ConstructObject(10)); }
vec.clear();
}

TEST(CCObjectPoolTest, construct_object) {
const uint32_t capacity = 1024;
auto pool = std::make_shared<CCObjectPool<TestNode>>(capacity);
std::vector<std::shared_ptr<TestNode>> vec;

FOR_EACH(i, 0, capacity) {
auto obj = pool->GetObject();
vec.push_back(obj);
EXPECT_FALSE(obj->inited);
EXPECT_EQ(0, obj->value);
}
vec.clear();

FOR_EACH(i, 0, capacity) {
auto obj = pool->ConstructObject(i);
vec.push_back(obj);
EXPECT_TRUE(obj->inited);
EXPECT_EQ(i, obj->value);
}
vec.clear();

// check values after destructor
FOR_EACH(i, 0, capacity) {
auto obj = pool->GetObject();
vec.push_back(obj);
EXPECT_FALSE(obj->inited);
EXPECT_EQ(1, obj->value);
}
vec.clear();
}

TEST(CCObjectPoolTest, construct_all) {
const uint32_t capacity = 1024;
std::vector<std::shared_ptr<TestNode>> vec;
auto pool = std::make_shared<CCObjectPool<TestNode>>(capacity);
pool->ConstructAll();

FOR_EACH(i, 0, capacity) {
auto obj = pool->GetObject();
vec.push_back(obj);
EXPECT_TRUE(obj->inited);
}
vec.clear();
pool.reset();
}

TEST(ObjectPoolTest, get_object) {
Expand Down
9 changes: 4 additions & 5 deletions cyber/component/component.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ bool Component<M0, NullType, NullType, NullType>::Process(
if (is_shutdown_.load()) {
return true;
}
// TODO(hewei03): Add some protection here.
return Proc(msg);
}

Expand Down Expand Up @@ -182,7 +181,7 @@ bool Component<M0, NullType, NullType, NullType>::Initialize(
auto dv = std::make_shared<data::DataVisitor<M0>>(conf);
croutine::RoutineFactory factory =
croutine::CreateRoutineFactory<M0>(func, dv);
auto sched = scheduler::Scheduler::Instance();
auto sched = scheduler::Instance();
return sched->CreateTask(factory, node_->Name());
}

Expand Down Expand Up @@ -259,7 +258,7 @@ bool Component<M0, M1, NullType, NullType>::Initialize(
return true;
}

auto sched = scheduler::Scheduler::Instance();
auto sched = scheduler::Instance();
std::weak_ptr<Component<M0, M1>> self =
std::dynamic_pointer_cast<Component<M0, M1>>(shared_from_this());
auto func = [self](const std::shared_ptr<M0>& msg0,
Expand Down Expand Up @@ -367,7 +366,7 @@ bool Component<M0, M1, M2, NullType>::Initialize(
return true;
}

auto sched = scheduler::Scheduler::Instance();
auto sched = scheduler::Instance();
std::weak_ptr<Component<M0, M1, M2, NullType>> self =
std::dynamic_pointer_cast<Component<M0, M1, M2, NullType>>(
shared_from_this());
Expand Down Expand Up @@ -491,7 +490,7 @@ bool Component<M0, M1, M2, M3>::Initialize(const ComponentConfig& config) {
return true;
}

auto sched = scheduler::Scheduler::Instance();
auto sched = scheduler::Instance();
std::weak_ptr<Component<M0, M1, M2, M3>> self =
std::dynamic_pointer_cast<Component<M0, M1, M2, M3>>(shared_from_this());
auto func = [self](
Expand Down
1 change: 0 additions & 1 deletion cyber/component/timer_component.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ bool TimerComponent::Process() {
if (is_shutdown_.load()) {
return true;
}
// TODO(hewei03): Add some protection here.
return Proc();
}

Expand Down
19 changes: 9 additions & 10 deletions cyber/croutine/croutine.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,28 +36,33 @@ thread_local std::shared_ptr<RoutineContext> CRoutine::main_context_;

namespace {
std::shared_ptr<base::CCObjectPool<RoutineContext>> context_pool = nullptr;
std::once_flag pool_init_flag;

void CRoutineEntry(void *arg) {
CRoutine *r = static_cast<CRoutine *>(arg);
r->Run();
CRoutine::Yield(RoutineState::FINISHED);
}
} // namespace
}

CRoutine::CRoutine(const std::function<void()> &func) : func_(func) {
if (unlikely(context_pool == nullptr)) {
std::call_once(pool_init_flag, [&]() {
auto routine_num = 100;
auto &global_conf = common::GlobalData::Instance()->Config();
if (global_conf.has_scheduler_conf() &&
global_conf.scheduler_conf().has_routine_num()) {
routine_num = global_conf.scheduler_conf().routine_num();
}
context_pool.reset(new base::CCObjectPool<RoutineContext>(routine_num));
}
});

context_ = context_pool->GetObject();
if (context_ == nullptr) {
AWARN << "Maximum routine context number exceeded! Please check "
"[routine_num] in config file.";
context_.reset(new RoutineContext());
}

MakeContext(CRoutineEntry, this, context_.get());
state_ = RoutineState::READY;
updated_.test_and_set(std::memory_order_release);
Expand All @@ -82,16 +87,10 @@ RoutineState CRoutine::Resume() {
SwapContext(GetMainContext(), this->GetContext());
PerfEventCache::Instance()->AddSchedEvent(
SchedPerf::SWAP_OUT, id_, processor_id_, static_cast<int>(state_));
current_routine_ = nullptr;
return state_;
}

void CRoutine::Routine() {
while (true) {
AINFO << "inner routine" << std::endl;
usleep(1000000);
}
}

void CRoutine::Stop() { force_stop_ = true; }

} // namespace croutine
Expand Down
1 change: 0 additions & 1 deletion cyber/croutine/croutine.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ class CRoutine {
public:
explicit CRoutine(const RoutineFunc &func);
virtual ~CRoutine();
virtual void Routine();

// static interfaces
static void Yield();
Expand Down
3 changes: 2 additions & 1 deletion cyber/croutine/routine_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@ namespace cyber {
namespace croutine {

void MakeContext(const func &f1, const void *arg, RoutineContext *ctx) {
ctx->sp = ctx->stack + STACK_SIZE - 2 * sizeof(void *) - REGISTERS_SIZE;
std::memset(ctx->sp, 0, REGISTERS_SIZE);
char *sp = ctx->stack + STACK_SIZE - 2 * sizeof(void *);
*reinterpret_cast<void **>(sp) = reinterpret_cast<void *>(f1);
sp -= sizeof(void *);
*reinterpret_cast<void **>(sp) = const_cast<void *>(arg);
ctx->sp = ctx->stack + STACK_SIZE - 2 * sizeof(void *) - REGISTERS_SIZE;
}

} // namespace croutine
Expand Down
2 changes: 0 additions & 2 deletions cyber/croutine/routine_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@ constexpr size_t REGISTERS_SIZE = 56;

typedef void (*func)(void*);
struct RoutineContext {
RoutineContext() {}
~RoutineContext() { memset(stack, 0, STACK_SIZE); }
char stack[STACK_SIZE];
char* sp = nullptr;
};
Expand Down
3 changes: 2 additions & 1 deletion cyber/data/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ cc_library(
"channel_buffer.h",
],
deps = [
"data_notifier"
"data_notifier",
"//cyber/proto:component_conf_cc_proto",
],
)

Expand Down
Loading

0 comments on commit 215ccd2

Please sign in to comment.