Skip to content

Commit

Permalink
v2.2 Merge branch 'dev': 优化多线程模型的性能
Browse files Browse the repository at this point in the history
  • Loading branch information
test committed Jan 29, 2016
2 parents 0cea895 + 1ae65f2 commit 738b625
Show file tree
Hide file tree
Showing 24 changed files with 594 additions and 137 deletions.
45 changes: 20 additions & 25 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,39 +1,34 @@
sudo: required
dist: trusty
language: cpp

compiler:
- gcc

before_install:
- sudo add-apt-repository -y ppa:andykimpe/cmake
- sudo add-apt-repository -y ppa:boost-latest/ppa
- sudo add-apt-repository -y ppa:ubuntu-toolchain-r/test
- sudo apt-get -qq update
- sudo add-apt-repository -y ppa:kojoley/boost
- sudo apt-get update -y
- lsb_release -a

install:
- sudo apt-get install cmake
- sudo apt-get install -qq boost1.55
- .travis_scripts/gcc.sh
- git clone https://github.com/yyzybb537/Boost-dev-bin.git /tmp/boost-dev-bin && sudo dpkg -i /tmp/boost-dev-bin/libboost1.59-all-dev.deb
- ls /usr/local/lib/libboost*
- sudo ldconfig
#- sudo apt-get install -y -qq libboost1.58-dev
#- sudo apt-get install -y -qq libboost-thread1.58-dev
#- sudo apt-get install -y -qq libboost-coroutine1.58-dev
#- sudo apt-get install -y -qq libboost-context1.58-dev
#- sudo apt-get install -y -qq libboost-system1.58-dev
#- sudo apt-get install -y -qq libboost-date-time1.58-dev
#- sudo apt-get install -y -qq libboost-chrono1.58-dev
#- sudo apt-get install -y -qq libboost-regex1.58-dev
- sudo apt-get install -y -qq cmake

script:
- mkdir build; pushd build;
- cmake ..
- make -j4
- sudo make install
- make test_small
- make samples
- make run_test
# - sudo rm * -rf
# - cmake .. -DENABLE_BOOST_COROUTINE=ON
# - make -j4
# - sudo make install
# - make test_small
# - make run_test
# - sudo rm * -rf
# - cmake .. -DENABLE_SHARED_STACK=ON
# - make -j4
# - sudo make install
# - make test_small
# - make run_test
- cmake .. && make -j4 && sudo make install && make test_small && make samples && make run_test
- sudo rm * -rf && cmake .. -DENABLE_BOOST_COROUTINE=ON && make -j4 && sudo make install && make test_small && make samples && make run_test
- sudo rm * -rf && cmake .. -DENABLE_SHARED_STACK=ON && make -j4 && sudo make install && make test_small && make samples && make run_test
- popd;

after_success:
Expand Down
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ endif()

if (UNIX)
set(CMAKE_CXX_FLAGS "-std=c++11 -fPIC -Wall ${CXX_FLAGS_POSTFIX}")
set(CMAKE_CXX_FLAGS_DEBUG "-g ${CMAKE_CXX_FLAGS} -Werror")
set(CMAKE_CXX_FLAGS_DEBUG "-g -pg ${CMAKE_CXX_FLAGS} -Werror")
set(CMAKE_CXX_FLAGS_RELEASE "-g -O3 ${CMAKE_CXX_FLAGS} -Werror")
else ()
set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} /MTd")
Expand Down
14 changes: 8 additions & 6 deletions src/block_object.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,19 +41,19 @@ void BlockObject::CoBlockWait()
Task* tk = g_Scheduler.GetLocalInfo().current_task;
tk->block_ = this;
tk->state_ = TaskState::sys_block;
tk->block_timeout_ = std::chrono::nanoseconds::zero();
tk->block_timeout_ = MininumTimeDurationType::zero();
tk->is_block_timeout_ = false;
++ tk->block_sequence_;
DebugPrint(dbg_syncblock, "wait to switch. task(%s)", tk->DebugInfo());
g_Scheduler.CoYield();
}

bool BlockObject::CoBlockWaitTimed(std::chrono::nanoseconds timeo)
bool BlockObject::CoBlockWaitTimed(MininumTimeDurationType timeo)
{
auto begin = std::chrono::high_resolution_clock::now();
if (!g_Scheduler.IsCoroutine()) {
while (!TryBlockWait() &&
std::chrono::duration_cast<std::chrono::nanoseconds>
std::chrono::duration_cast<MininumTimeDurationType>
(std::chrono::high_resolution_clock::now() - begin) < timeo)
usleep(10 * 1000);
return false;
Expand Down Expand Up @@ -104,10 +104,11 @@ bool BlockObject::Wakeup()
DebugPrint(dbg_syncblock, "wakeup to %lu.", (long unsigned)wakeup_);
return true;
}
lock.unlock();

tk->block_ = nullptr;
g_Scheduler.AddTaskRunnable(tk);
DebugPrint(dbg_syncblock, "wakeup task(%s).", tk->DebugInfo());
g_Scheduler.AddTaskRunnable(tk);
return true;
}
void BlockObject::CancelWait(Task* tk, uint32_t block_sequence)
Expand All @@ -128,11 +129,12 @@ void BlockObject::CancelWait(Task* tk, uint32_t block_sequence)
DebugPrint(dbg_syncblock, "cancelwait task(%s) erase failed.", tk->DebugInfo());
return;
}
lock.unlock();

tk->block_ = nullptr;
tk->is_block_timeout_ = true;
g_Scheduler.AddTaskRunnable(tk);
DebugPrint(dbg_syncblock, "cancelwait task(%s).", tk->DebugInfo());
g_Scheduler.AddTaskRunnable(tk);
}

bool BlockObject::IsWakeup()
Expand All @@ -152,7 +154,7 @@ bool BlockObject::AddWaitTask(Task* tk)
wait_queue_.push(tk);

// 带超时的, 增加定时器
if (std::chrono::nanoseconds::zero() != tk->block_timeout_) {
if (MininumTimeDurationType::zero() != tk->block_timeout_) {
uint32_t seq = tk->block_sequence_;
tk->IncrementRef();
lock.unlock(); // sequence记录完成, task引用计数增加, 可以解锁了
Expand Down
2 changes: 1 addition & 1 deletion src/block_object.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class BlockObject

// 带超时的阻塞式等待信号
// @returns: 是否成功等到信号
bool CoBlockWaitTimed(std::chrono::nanoseconds timeo);
bool CoBlockWaitTimed(MininumTimeDurationType timeo);

bool TryBlockWait();

Expand Down
10 changes: 5 additions & 5 deletions src/channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ class Channel
template <typename U, typename Duration>
bool TimedPush(U && t, Duration const& dur)
{
if (!write_block_.CoBlockWaitTimed(std::chrono::duration_cast<std::chrono::nanoseconds>(dur)))
if (!write_block_.CoBlockWaitTimed(std::chrono::duration_cast<MininumTimeDurationType>(dur)))
return false;

{
Expand All @@ -158,7 +158,7 @@ class Channel
bool TimedPop(U & t, Duration const& dur)
{
write_block_.Wakeup();
if (!read_block_.CoBlockWaitTimed(std::chrono::duration_cast<std::chrono::nanoseconds>(dur)))
if (!read_block_.CoBlockWaitTimed(std::chrono::duration_cast<MininumTimeDurationType>(dur)))
{
if (write_block_.TryBlockWait())
return false;
Expand All @@ -183,7 +183,7 @@ class Channel
bool TimedPop(nullptr_t ignore, Duration const& dur)
{
write_block_.Wakeup();
if (!read_block_.CoBlockWaitTimed(std::chrono::duration_cast<std::chrono::nanoseconds>(dur)))
if (!read_block_.CoBlockWaitTimed(std::chrono::duration_cast<MininumTimeDurationType>(dur)))
{
if (write_block_.TryBlockWait())
return false;
Expand Down Expand Up @@ -344,7 +344,7 @@ class Channel<void>
template <typename Duration>
bool TimedPush(nullptr_t ignore, Duration const& dur)
{
if (!write_block_.CoBlockWaitTimed(std::chrono::duration_cast<std::chrono::nanoseconds>(dur)))
if (!write_block_.CoBlockWaitTimed(std::chrono::duration_cast<MininumTimeDurationType>(dur)))
return false;

read_block_.Wakeup();
Expand All @@ -356,7 +356,7 @@ class Channel<void>
bool TimedPop(nullptr_t ignore, Duration const& dur)
{
write_block_.Wakeup();
if (!read_block_.CoBlockWaitTimed(std::chrono::duration_cast<std::chrono::nanoseconds>(dur)))
if (!read_block_.CoBlockWaitTimed(std::chrono::duration_cast<MininumTimeDurationType>(dur)))
{
if (write_block_.TryBlockWait())
return false;
Expand Down
20 changes: 20 additions & 0 deletions src/config.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#pragma once
#include <chrono>

// VS2013²»Ö§³Öthread_local
#if defined(_MSC_VER) && _MSC_VER < 1900
#define co_thread_local __declspec(thread)
#else
#define co_thread_local thread_local
#endif

namespace co
{

#ifdef _WIN32
typedef std::chrono::microseconds MininumTimeDurationType;
#else
typedef std::chrono::nanoseconds MininumTimeDurationType;
#endif

} //namespace co
5 changes: 3 additions & 2 deletions src/context.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <functional>
#include <memory>
#include "error.h"

namespace co
{
Expand All @@ -16,9 +17,9 @@ namespace co
class impl_t;

public:
explicit Context(std::size_t stack_size);
explicit Context(std::size_t stack_size, std::function<void()> const& fn);

bool Init(std::function<void()> const& fn, char* shared_stack, uint32_t shared_stack_cap);
bool Init(char* shared_stack, uint32_t shared_stack_cap);

bool SwapIn();

Expand Down
35 changes: 24 additions & 11 deletions src/ctx_boost_coroutine/context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,34 +45,48 @@ namespace co
uint32_t stack_capacity_ = 0;
char* shared_stack_;
uint32_t shared_stack_cap_;
std::function<void()> fn_;
};

Context::Context(std::size_t stack_size)
Context::Context(std::size_t stack_size, std::function<void()> const& fn)
: impl_(new Context::impl_t), stack_size_(stack_size)
{ }
{
impl_->fn_ = fn;

#if !defined(ENABLE_SHARED_STACK)
decltype(impl_->ctx_) c(
[=](::boost::coroutines::symmetric_coroutine<void>::yield_type& yield){
impl_->yield_ = &yield;
fn();
}
, boost::coroutines::attributes(std::max<std::size_t>(stack_size_, boost::coroutines::stack_traits::minimum_size()))
);

bool Context::Init(std::function<void()> const& fn, char* shared_stack, uint32_t shared_stack_cap)
if (!c) {
ThrowError(eCoErrorCode::ec_makecontext_failed);
return ;
}

impl_->ctx_.swap(c);
#endif
}

bool Context::Init(char* shared_stack, uint32_t shared_stack_cap)
{
#if defined(ENABLE_SHARED_STACK)
impl_->shared_stack_ = shared_stack;
impl_->shared_stack_cap_ = shared_stack_cap;
#endif

decltype(impl_->ctx_) c(
[=](::boost::coroutines::symmetric_coroutine<void>::yield_type& yield){
impl_->yield_ = &yield;
fn();
}
#if defined(ENABLE_SHARED_STACK)
, boost::coroutines::attributes(shared_stack_cap), shared_stack_allocator(shared_stack, shared_stack_cap)
#else
, boost::coroutines::attributes(std::max<std::size_t>(stack_size_, boost::coroutines::stack_traits::minimum_size()))
#endif
);

if (!c) return false;
impl_->ctx_.swap(c);

#if defined(ENABLE_SHARED_STACK)
static const int default_base_size = 32;
// save coroutine stack first 16 bytes.
assert(!impl_->stack_);
Expand All @@ -81,7 +95,6 @@ namespace co
impl_->stack_ = (char*)malloc(impl_->stack_capacity_);
memcpy(impl_->stack_, shared_stack + shared_stack_cap - impl_->stack_size_, impl_->stack_size_);
#endif

return true;
}

Expand Down
35 changes: 21 additions & 14 deletions src/ctx_ucontext/context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,18 +41,33 @@ namespace co
(*pfn)();
}

Context::Context(std::size_t stack_size)
Context::Context(std::size_t stack_size, std::function<void()> const& fn)
: impl_(new Context::impl_t), stack_size_(stack_size)
{}
{
impl_->fn_ = fn;

#if !defined(ENABLE_SHARED_STACK)
if (-1 == getcontext(&impl_->ctx_)) {
ThrowError(eCoErrorCode::ec_makecontext_failed);
return ;
}

impl_->stack_size_ = this->stack_size_;
impl_->stack_ = (char*)valloc(impl_->stack_size_);

bool Context::Init(std::function<void()> const& fn, char* shared_stack, uint32_t shared_stack_cap)
impl_->ctx_.uc_stack.ss_sp = impl_->stack_;
impl_->ctx_.uc_stack.ss_size = impl_->stack_size_;
impl_->ctx_.uc_link = NULL;
makecontext(&impl_->ctx_, (void(*)(void))&ucontext_func, 1, &impl_->fn_);
#endif
}

bool Context::Init(char* shared_stack, uint32_t shared_stack_cap)
{
#if defined(ENABLE_SHARED_STACK)
if (-1 == getcontext(&impl_->ctx_))
return false;

impl_->fn_ = fn;

#if defined(ENABLE_SHARED_STACK)
impl_->shared_stack_ = shared_stack;
impl_->shared_stack_cap_ = shared_stack_cap;

Expand All @@ -67,14 +82,6 @@ namespace co
impl_->stack_capacity_ = std::max<uint32_t>(16, g_Scheduler.GetOptions().init_commit_stack_size);
impl_->stack_ = (char*)malloc(impl_->stack_capacity_);
memcpy(impl_->stack_, shared_stack + shared_stack_cap - impl_->stack_size_, impl_->stack_size_);
#else
impl_->stack_size_ = this->stack_size_;
impl_->stack_ = (char*)valloc(impl_->stack_size_);

impl_->ctx_.uc_stack.ss_sp = impl_->stack_;
impl_->ctx_.uc_stack.ss_size = impl_->stack_size_;
impl_->ctx_.uc_link = NULL;
makecontext(&impl_->ctx_, (void(*)(void))&ucontext_func, 1, &impl_->fn_);
#endif

return true;
Expand Down
15 changes: 10 additions & 5 deletions src/ctx_win_fiber/context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,18 +42,23 @@ namespace co
(*fn)();
};

Context::Context(std::size_t stack_size)
Context::Context(std::size_t stack_size, std::function<void()> const& fn)
: impl_(new Context::impl_t), stack_size_(stack_size)
{}

bool Context::Init(std::function<void()> const& fn, char* shared_stack, uint32_t shared_stack_cap)
{
impl_->fn_ = fn;
SIZE_T commit_size = g_Scheduler.GetOptions().init_commit_stack_size;
impl_->native_ = CreateFiberEx(commit_size,
(std::max)(stack_size_, commit_size), FIBER_FLAG_FLOAT_SWITCH,
(LPFIBER_START_ROUTINE)FiberFunc, &impl_->fn_);
return !!impl_->native_;
if (!impl_->native_) {
ThrowError(eCoErrorCode::ec_makecontext_failed);
return ;
}
}

bool Context::Init(char* shared_stack, uint32_t shared_stack_cap)
{
return true;
}

bool Context::SwapIn()
Expand Down
17 changes: 9 additions & 8 deletions src/linux/io_wait.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -208,14 +208,15 @@ int IoWait::WaitLoop()
// 由于epoll_wait的结果中会残留一些未计数的Task*,
// epoll的性质决定了这些Task无法计数,
// 所以这个析构的操作一定要在epoll_lock的保护中做
SList<Task> delete_list;
Task::PopDeleteList(delete_list);
for (auto it = delete_list.begin(); it != delete_list.end();)
{
Task* tk = &*it++;
DebugPrint(dbg_task, "task(%s) delete.", tk->DebugInfo());
delete tk;
}
std::vector<SList<Task>> delete_lists;
Task::PopDeleteList(delete_lists);
for (auto &delete_list : delete_lists)
for (auto it = delete_list.begin(); it != delete_list.end();)
{
Task* tk = &*it++;
DebugPrint(dbg_task, "task(%s) delete.", tk->DebugInfo());
delete tk;
}

return epoll_n + c;
}
Expand Down
Loading

0 comments on commit 738b625

Please sign in to comment.