Skip to content

Commit

Permalink
Revised Timer Task Thread (OpenAtomFoundation#1862)
Browse files Browse the repository at this point in the history
* add TimertaskManager, removed TimedscanThread.
  • Loading branch information
cheniujh authored Aug 4, 2023
1 parent 61aa47b commit 4e434e3
Show file tree
Hide file tree
Showing 6 changed files with 211 additions and 32 deletions.
1 change: 1 addition & 0 deletions src/net/include/server_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ class ServerThread : public Thread {

// process events in notify_queue
virtual void ProcessNotifyEvents(const NetFiredEvent* pfe);


const ServerHandle* handle_;
bool own_handle_ = false;
Expand Down
14 changes: 9 additions & 5 deletions src/net/src/dispatch_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ DispatchThread::DispatchThread(int port, int work_num, ConnFactory* conn_factory
for (int i = 0; i < work_num_; i++) {
worker_thread_.emplace_back(std::make_unique<WorkerThread>(conn_factory, this, queue_limit, cron_interval));
}
timed_scan_thread.SetTimedTask(0.3, [this]{this->ScanExpiredBlockedConnsOfBlrpop();});
}

DispatchThread::DispatchThread(const std::string& ip, int port, int work_num, ConnFactory* conn_factory,
Expand All @@ -35,7 +34,6 @@ DispatchThread::DispatchThread(const std::string& ip, int port, int work_num, Co
for (int i = 0; i < work_num_; i++) {
worker_thread_.emplace_back(std::make_unique<WorkerThread>(conn_factory, this, queue_limit, cron_interval));
}
timed_scan_thread.SetTimedTask(0.3, [this]{this->ScanExpiredBlockedConnsOfBlrpop();});
}

DispatchThread::DispatchThread(const std::set<std::string>& ips, int port, int work_num, ConnFactory* conn_factory,
Expand All @@ -47,7 +45,6 @@ DispatchThread::DispatchThread(const std::set<std::string>& ips, int port, int w
for (int i = 0; i < work_num_; i++) {
worker_thread_.emplace_back(std::make_unique<WorkerThread>(conn_factory, this, queue_limit, cron_interval));
}
timed_scan_thread.SetTimedTask(0.3, [this]{this->ScanExpiredBlockedConnsOfBlrpop();});
}

DispatchThread::~DispatchThread() = default;
Expand All @@ -67,7 +64,13 @@ int DispatchThread::StartThread() {
return ret;
}
}
timed_scan_thread.StartThread();

// Adding timer tasks and run timertaskThread
timerTaskThread_.AddTimerTask(
"blrpop_blocking_info_scan", 250, true, [this] { this->ScanExpiredBlockedConnsOfBlrpop();});


timerTaskThread_.StartThread();
return ServerThread::StartThread();
}

Expand All @@ -88,7 +91,7 @@ int DispatchThread::StopThread() {
worker_thread_[i]->private_data_ = nullptr;
}
}
timed_scan_thread.StopThread();
timerTaskThread_.StopThread();
return ServerThread::StopThread();
}

Expand Down Expand Up @@ -258,6 +261,7 @@ void DispatchThread::ScanExpiredBlockedConnsOfBlrpop() {

void DispatchThread::SetQueueLimit(int queue_limit) { queue_limit_ = queue_limit; }


extern ServerThread* NewDispatchThread(int port, int work_num, ConnFactory* conn_factory, int cron_interval,
int queue_limit, const ServerHandle* handle) {
return new DispatchThread(port, work_num, conn_factory, cron_interval, queue_limit, handle);
Expand Down
27 changes: 2 additions & 25 deletions src/net/src/dispatch_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,27 +54,6 @@ class BlockedConnNode {
BlockKeyType block_type_;
};

class TimedScanThread : public Thread {
public:
template <class F, class... Args>
void SetTimedTask(double interval, F&& f, Args&&... args) {
time_interval_ = interval;
timed_task_ = [f = std::forward<F>(f), args = std::make_tuple(std::forward<Args>(args)...)]{
std::apply(f, args);
};
}
private:
void* ThreadMain() override{
while(!should_stop()){
timed_task_();
sleep(time_interval_);
}
return nullptr;
}
std::function<void()> timed_task_;
// unit in seconds
double time_interval_;
};

class DispatchThread : public ServerThread {
public:
Expand Down Expand Up @@ -129,9 +108,9 @@ class DispatchThread : public ServerThread {
return blocked_conn_to_keys_;
}
std::shared_mutex& GetBlockMtx() { return block_mtx_; };

// BlPop/BrPop used end


private:
/*
* Here we used auto poll to find the next work thread,
Expand Down Expand Up @@ -168,9 +147,7 @@ class DispatchThread : public ServerThread {
*/
std::shared_mutex block_mtx_;

//used for blpop/brpop currently
TimedScanThread timed_scan_thread;

TimerTaskThread timerTaskThread_;
}; // class DispatchThread

} // namespace net
Expand Down
119 changes: 119 additions & 0 deletions src/net/src/net_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,123 @@ int Setnonblocking(int sockfd) {
return flags;
}

uint32_t TimerTaskManager::AddTimerTask(const std::string& task_name, int interval_ms, bool repeat_exec,
const std::function<void()>& task) {
TimedTask new_task = {last_task_id_++, task_name, interval_ms, repeat_exec, task};
id_to_task_[new_task.task_id] = new_task;

int64_t next_expired_time = NowInMs() + interval_ms;
exec_queue_.insert({next_expired_time, new_task.task_id});

if (min_interval_ms_ > interval_ms || min_interval_ms_ == -1) {
min_interval_ms_ = interval_ms;
}
// return the id of this task
return new_task.task_id;
}
int64_t TimerTaskManager::NowInMs() {
auto now = std::chrono::system_clock::now();
return std::chrono::time_point_cast<std::chrono::milliseconds>(now).time_since_epoch().count();
}
int TimerTaskManager::ExecTimerTask() {
std::vector<ExecTsWithId> fired_tasks_;
int64_t now_in_ms = NowInMs();
// traverse in ascending order
for (auto pair = exec_queue_.begin(); pair != exec_queue_.end(); pair++) {
if (pair->exec_ts <= now_in_ms) {
auto it = id_to_task_.find(pair->id);
assert(it != id_to_task_.end());
it->second.fun();
fired_tasks_.push_back({pair->exec_ts, pair->id});
now_in_ms = NowInMs();
} else {
break;
}
}
for (auto task : fired_tasks_) {
exec_queue_.erase(task);
auto it = id_to_task_.find(task.id);
assert(it != id_to_task_.end());
if (it->second.repeat_exec) {
// this task need to be repeatedly exec, register it again
exec_queue_.insert({now_in_ms + it->second.interval_ms, task.id});
} else {
// this task only need to be exec once, completely remove this task
int interval_del = it->second.interval_ms;
id_to_task_.erase(task.id);
if (interval_del == min_interval_ms_) {
RenewMinIntervalMs();
}
}
}
return min_interval_ms_;
}
bool TimerTaskManager::DelTimerTaskByTaskId(uint32_t task_id) {
// remove the task
auto task_to_del = id_to_task_.find(task_id);
if (task_to_del == id_to_task_.end()) {
return false;
}
int interval_del = task_to_del->second.interval_ms;
id_to_task_.erase(task_to_del);

// renew the min_interval_ms_
if (interval_del == min_interval_ms_) {
RenewMinIntervalMs();
}

// remove from exec queue
ExecTsWithId target_key = {-1, 0};
for (auto pair : exec_queue_) {
if (pair.id == task_id) {
target_key = {pair.exec_ts, pair.id};
break;
}
}
if (target_key.exec_ts != -1) {
exec_queue_.erase(target_key);
}
return true;
}

void TimerTaskManager::RenewMinIntervalMs() {
min_interval_ms_ = -1;
for (auto pair : id_to_task_) {
if (pair.second.interval_ms < min_interval_ms_ || min_interval_ms_ == -1) {
min_interval_ms_ = pair.second.interval_ms;
}
}
}

TimerTaskThread::~TimerTaskThread() {
if (!timer_task_manager_.Empty()) {
LOG(INFO) << "TimerTaskThread exit !!!";
}
}
int TimerTaskThread::StartThread() {
if (timer_task_manager_.Empty()) {
LOG(INFO) << "No Timer task registered, TimerTaskThread won't be created.";
// if there is no timer task registered, no need of start the thread
return -1;
}
LOG(INFO) << "TimerTaskThread Starting...";
return Thread::StartThread();
}
int TimerTaskThread::StopThread() {
if (timer_task_manager_.Empty()) {
LOG(INFO) << "TimerTaskThread::StopThread : TimerTaskThread didn't create, no need to stop it.";
// if there is no timer task registered, the thread didn't even start
return -1;
}
return Thread::StopThread();
}

void* TimerTaskThread::ThreadMain() {
int timeout;
while (!should_stop()) {
timeout = timer_task_manager_.ExecTimerTask();
net_multiplexer_->NetPoll(timeout);
}
return nullptr;
}
} // namespace net
81 changes: 80 additions & 1 deletion src/net/src/net_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,95 @@
#ifndef NET_SRC_NET_UTIL_H_
#define NET_SRC_NET_UTIL_H_
#include <unistd.h>
#include <cassert>
#include <chrono>
#include <functional>
#include <memory>
#include<memory>
#include <set>
#include <string>
#include <unordered_map>
#include <vector>
#include <glog/logging.h>
#include "net/src/net_multiplexer.h"
#include "net/include/net_thread.h"

namespace net {

int Setnonblocking(int sockfd);

struct TimedTask{
uint32_t task_id;
std::string task_name;
int interval_ms;
bool repeat_exec;
std::function<void()> fun;
};

struct ExecTsWithId {
//the next exec time of the task, unit in ms
int64_t exec_ts;
//id of the task to be exec
uint32_t id;

bool operator<(const ExecTsWithId& other) const{
if(exec_ts == other.exec_ts){
return id < other.id;
}
return exec_ts < other.exec_ts;
}
bool operator==(const ExecTsWithId& other) const {
return exec_ts == other.exec_ts && id == other.id;
}
};

class TimerTaskManager {
public:
TimerTaskManager() = default;
~TimerTaskManager() = default;

uint32_t AddTimerTask(const std::string& task_name, int interval_ms, bool repeat_exec, const std::function<void()> &task);
//return the newest min_minterval_ms
int ExecTimerTask();
bool DelTimerTaskByTaskId(uint32_t task_id);
int GetMinIntervalMs() const { return min_interval_ms_; }
int64_t NowInMs();
void RenewMinIntervalMs();
bool Empty(){ return 0 == last_task_id_; }

private:
//items stored in std::set are ascending ordered, we regard it as an auto sorted queue
std::set<ExecTsWithId> exec_queue_;
std::unordered_map<uint32_t, TimedTask> id_to_task_;
uint32_t last_task_id_{0};
int min_interval_ms_{-1};
};



class TimerTaskThread : public Thread {
public:
TimerTaskThread(){
net_multiplexer_.reset(CreateNetMultiplexer());
net_multiplexer_->Initialize();
}
~TimerTaskThread() override;
int StartThread() override;
int StopThread() override;

uint32_t AddTimerTask(const std::string& task_name, int interval_ms, bool repeat_exec, const std::function<void()> &task){
return timer_task_manager_.AddTimerTask(task_name, interval_ms, repeat_exec, task);
};

bool DelTimerTaskByTaskId(uint32_t task_id){
return timer_task_manager_.DelTimerTaskByTaskId(task_id);
};

private:
void* ThreadMain() override;

TimerTaskManager timer_task_manager_;
std::unique_ptr<NetMultiplexer> net_multiplexer_;
};

} // namespace net

Expand Down
1 change: 0 additions & 1 deletion src/net/src/server_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,6 @@ void* ServerThread::ThreadMain() {
char port_buf[32];
char ip_addr[INET_ADDRSTRLEN] = "";


while (!should_stop()) {
if (cron_interval_ > 0) {
gettimeofday(&now, nullptr);
Expand Down

0 comments on commit 4e434e3

Please sign in to comment.