Skip to content

Commit

Permalink
add epoll thread, not finished, can't build
Browse files Browse the repository at this point in the history
  • Loading branch information
ingangi committed Mar 27, 2020
1 parent f5c488f commit a1fb109
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 25 deletions.
20 changes: 15 additions & 5 deletions engin/chroutine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ chroutine_id_t chroutine_thread_t::create_chroutine(func_t & func, void *arg)
m_schedule.chroutines_sched.push_back(c);
}

SPDLOG(TRACE, "create_chroutine {} over, is in main: {}", id, m_is_main_thread);
SPDLOG(TRACE, "create_chroutine {} over, thread type: {}", id, m_type);
return id;
}

Expand Down Expand Up @@ -355,9 +355,14 @@ int chroutine_thread_t::schedule()
{
set_state(thread_state_t_running);
m_is_running = true;
SPDLOG(INFO, "chroutine_thread_t {:p} schedule is_running {}, is main:{}", (void*)(this), m_is_running, m_is_main_thread);
if (!m_is_main_thread) {
engine_t::instance().on_thread_ready(m_creating_index, std::this_thread::get_id());
m_std_thread_id = std::this_thread::get_id();
SPDLOG(INFO, "chroutine_thread_t {:p} schedule is_running {}, m_type:{} ({})", (void*)(this)
, m_is_running
, m_type
, readable_thread_id(m_std_thread_id));

if (m_type == thread_type_t::worker) {
engine_t::instance().on_thread_ready(m_creating_index, m_std_thread_id);
}
while (!m_need_stop) {
int processed = 0;
Expand All @@ -370,7 +375,12 @@ int chroutine_thread_t::schedule()
m_is_running = false;
set_state(thread_state_t_finished);
clear_all_chroutine();
SPDLOG(INFO, "chroutine_thread_t {:p} schedule is_running {}, is main:{}", (void*)(this), m_is_running, m_is_main_thread);

SPDLOG(INFO, "chroutine_thread_t {:p} schedule is_running {}, m_type:{} ({})"
, (void*)(this)
, m_is_running
, m_type
, readable_thread_id(m_std_thread_id));
return 0;
}

Expand Down
34 changes: 22 additions & 12 deletions engin/chroutine.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,12 @@ typedef enum {
thread_state_t_finished,
} thread_state_t;

enum class thread_type_t {
worker = 0,
main,
epoll,
};

// chroutine_thread_t hold a os thread
// and a list of chroutines run in the thread.
class chroutine_thread_t
Expand Down Expand Up @@ -203,8 +209,8 @@ class chroutine_thread_t
// awake waiting chroutine
int awake_chroutine(chroutine_id_t id);

void set_main_thread_flag(bool flag) {
m_is_main_thread = flag;
void set_type(thread_type_t type) {
m_type = type;
}

// the while loop of the thread
Expand All @@ -230,6 +236,10 @@ class chroutine_thread_t
return m_load.load();
}

const std::thread::id & thread_id() {
return m_std_thread_id;
}

private:
chroutine_thread_t();

Expand Down Expand Up @@ -272,18 +282,18 @@ class chroutine_thread_t
void clear_all_chroutine();

private:
schedule_t m_schedule;
bool m_is_running = false;
bool m_need_stop = false;
size_t m_creating_index = 0;
selectable_object_list_t m_selector_list;
chutex_t m_chroutine_lock;
bool m_is_main_thread = false;
std::atomic<std::time_t> m_entry_time; // for thread alive check
std::atomic<thread_state_t> m_state;

schedule_t m_schedule;
bool m_is_running = false;
bool m_need_stop = false;
size_t m_creating_index = 0;
selectable_object_list_t m_selector_list;
chutex_t m_chroutine_lock;
std::atomic<std::time_t> m_entry_time; // for thread alive check
std::atomic<thread_state_t> m_state;
static std::atomic<chroutine_id_t> ms_chroutine_id;
load_t m_load;
thread_type_t m_type = thread_type_t::worker;
std::thread::id m_std_thread_id;
};

}
Expand Down
22 changes: 16 additions & 6 deletions engin/engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,21 +34,26 @@ void engine_t::init(size_t init_pool_size)
if (!m_creating.empty())
return;

m_main_thread_id = std::this_thread::get_id();
for (size_t i = 0; i < init_pool_size; i++) {
std::shared_ptr<chroutine_thread_t> thrd = chroutine_thread_t::new_thread();
m_creating.push_back(thrd);
thrd.get()->start(i);
}

SPDLOG(INFO, "{}: init_pool_size = {}, m_main_thread_id = {}", __FUNCTION__, init_pool_size, readable_thread_id(m_main_thread_id));
SPDLOG(INFO, "{}: init_pool_size = {}", __FUNCTION__, init_pool_size);

while (!m_init_over) {
thread_ms_sleep(10);
}

// main thread do not need start()
m_main_thread = chroutine_thread_t::new_thread();
m_main_thread = chroutine_thread_t::new_thread();
m_main_thread->set_type(thread_type_t::main);
#ifdef ENABLE_EPOLL
m_epoll_thread = chroutine_thread_t::new_thread();
m_epoll_thread->start(0);
m_epoll_thread->set_type(thread_type_t::epoll);
#endif
SPDLOG(INFO, "{}: OVER", __FUNCTION__);
}

Expand Down Expand Up @@ -158,9 +163,12 @@ chroutine_thread_t *engine_t::get_current_thread()
}

std::thread::id cur_id = std::this_thread::get_id();
if (cur_id == m_main_thread_id) {
if (m_main_thread && cur_id == m_main_thread->thread_id()) {
return m_main_thread.get();
}
if (m_epoll_thread && cur_id == m_epoll_thread->thread_id()) {
return m_epoll_thread.get();
}

const auto& iter = m_pool.find(cur_id);
if (iter == m_pool.end())
Expand Down Expand Up @@ -359,6 +367,9 @@ void engine_t::stop_main()
if (m_main_thread) {
m_main_thread->stop();
}
if (m_epoll_thread) {
m_epoll_thread->stop();
}
}, nullptr);
}

Expand Down Expand Up @@ -392,8 +403,7 @@ void engine_t::run()
signal(SIGINT, signal_handle);
signal(SIGQUIT, signal_handle);
signal(SIGTERM, signal_handle);
m_main_thread->set_main_thread_flag(true);
SPDLOG(DEBUG, "main thread is about to run, check the id:{}=={}", readable_thread_id(m_main_thread_id), readable_thread_id(std::this_thread::get_id()));
SPDLOG(DEBUG, "main thread is about to run, check the id:{}", readable_thread_id(std::this_thread::get_id()));

create_chroutine_in_mainthread([this](void *){
while (true) {
Expand Down
4 changes: 2 additions & 2 deletions engin/engine.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,13 +124,13 @@ class engine_t final
std::mutex m_pool_lock; // only used during m_init_over is false
thread_pool_t m_pool; // is readonly after m_init_over become true
thread_vector_t m_creating; // is readonly after m_init_over become true
bool m_init_over = false; // if all threads ready
std::thread::id m_main_thread_id = NULL_THREAD_ID;
bool m_init_over = false; // if all threads ready
int m_dispatch_seed = 0;
#ifdef ENABLE_HTTP_PLUGIN
http_stub_pool_t m_http_stubs;
#endif
std::shared_ptr<chroutine_thread_t> m_main_thread = nullptr;
std::shared_ptr<chroutine_thread_t> m_epoll_thread = nullptr;
chr_timer_t* m_flush_timer;
};

Expand Down

0 comments on commit a1fb109

Please sign in to comment.