Skip to content

Commit

Permalink
merge with upstream
Browse files Browse the repository at this point in the history
  • Loading branch information
flabby committed Mar 23, 2016
2 parents 085a869 + f52979d commit 862dfce
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 3 deletions.
3 changes: 2 additions & 1 deletion include/pika_conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class PikaConf : public slash::BaseConf {
const std::vector<std::string>& vuser_blacklist() {
RWLock l(&rwlock_, false); return user_blacklist_;
}
std::string compression() { RWLock l(&rwlock_, false); return compression_; }
int target_file_size_base() { RWLock l(&rwlock_, false); return target_file_size_base_; }
std::string conf_path() { RWLock l(&rwlock_, false); return conf_path_; }
bool readonly() {
Expand Down Expand Up @@ -100,7 +101,7 @@ class PikaConf : public slash::BaseConf {
std::string pidfile_;

//char pidfile_[PIKA_WORD_SIZE];
//char compression_[PIKA_WORD_SIZE];
std::string compression_;
//int maxconnection_;

//int expire_logs_days_;
Expand Down
8 changes: 8 additions & 0 deletions include/pika_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,9 @@ class PikaServer
void ClearPurge() {
purging_ = false;
}
//flushall
bool FlushAll();
void PurgeDir(std::string& path);

/*
* client related
Expand Down Expand Up @@ -215,6 +218,11 @@ class PikaServer
static void DoPurgeLogs(void* arg);
bool GetPurgeWindow(uint32_t &max);

/*
* Flushall use
*/
static void DoPurgeDir(void* arg);

PikaServer(PikaServer &ps);
void operator =(const PikaServer &ps);
};
Expand Down
6 changes: 5 additions & 1 deletion src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,11 @@ void FlushallCmd::DoInitial(PikaCmdArgsType &argv, const CmdInfo* const ptr_info
}
void FlushallCmd::Do() {
slash::RWLock(g_pika_server->rwlock(), true);
res_.SetRes(CmdRes::kOk);
if (g_pika_server->FlushAll()) {
res_.SetRes(CmdRes::kOk);
} else {
res_.SetRes(CmdRes::kErrOther, "There are some bgthread using db now, can not flushall");
}
}

void ReadonlyCmd::DoInitial(PikaCmdArgsType &argv, const CmdInfo* const ptr_info) {
Expand Down
2 changes: 2 additions & 0 deletions src/pika_conf.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ int PikaConf::Load()
GetConfStr("userblacklist", &user_blacklist);
SetUserBlackList(std::string(user_blacklist));
GetConfStr("dump_path", &bgsave_path_);
GetConfStr("compression", &compression_);

GetConfBool("slave-read-only", &readonly_);

Expand Down Expand Up @@ -119,6 +120,7 @@ int PikaConf::ConfigRewrite() {
SetConfStr("userblacklist", suser_blacklist());
SetConfStr("dump_path", bgsave_path_);
SetConfInt("target_file_size_base", target_file_size_base_);
SetConfStr("compression", compression_);
SetConfBool("slave_read_only", readonly_);

return WriteBack();
Expand Down
51 changes: 50 additions & 1 deletion src/pika_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,12 @@ PikaServer::PikaServer() :

option.write_buffer_size = g_pika_conf->write_buffer_size();
option.target_file_size_base = g_pika_conf->target_file_size_base();
if (g_pika_conf->compression() == "none") {
option.compression = false;
}
std::string db_path = g_pika_conf->db_path();
LOG(WARNING) << "Prepare DB...";
db_ = std::unique_ptr<nemo::Nemo>(new nemo::Nemo(db_path, option));
db_ = std::shared_ptr<nemo::Nemo>(new nemo::Nemo(db_path, option));
assert(db_);
LOG(WARNING) << "DB Success";

Expand Down Expand Up @@ -532,6 +535,52 @@ bool PikaServer::GetPurgeWindow(uint32_t &max) {
return false;
}

bool PikaServer::FlushAll() {
if (bgsaving_ /*|| bgscaning_*/) {
return false;
}
std::string dbpath = g_pika_conf->db_path();
if (dbpath[dbpath.length() - 1] == '/') {
dbpath.erase(dbpath.length() - 1);
}
int pos = dbpath.find_last_of('/');
dbpath = dbpath.substr(0, pos);
dbpath.append("/deleting");
slash::RenameFile(g_pika_conf->db_path(), dbpath.c_str());

LOG(WARNING) << "Delete old db...";
db_.reset();

nemo::Options option;
option.write_buffer_size = g_pika_conf->write_buffer_size();
option.target_file_size_base = g_pika_conf->target_file_size_base();
if (g_pika_conf->compression() == "none") {
option.compression = false;
}
LOG(WARNING) << "Prepare open new db...";
db_ = std::shared_ptr<nemo::Nemo>(new nemo::Nemo(g_pika_conf->db_path(), option));
LOG(WARNING) << "open new db success";
PurgeDir(dbpath);
return true;
}

void PikaServer::PurgeDir(std::string& path) {
std::string *dir_path = new std::string(path);
// Start new thread if needed
if (!purge_thread_.is_running()) {
purge_thread_.StartThread();
}
purge_thread_.Schedule(&DoPurgeDir, static_cast<void*>(dir_path));
}

void PikaServer::DoPurgeDir(void* arg) {
std::string path = *(static_cast<std::string*>(arg));
DLOG(INFO) << "Delete dir: " << path << " start";
slash::DeleteDir(path);
DLOG(INFO) << "Delete dir: " << path << " done";
delete static_cast<std::string*>(arg);
}

void PikaServer::ClientKillAll() {
for (size_t idx = 0; idx != PIKA_MAX_WORKER_THREAD_NUM; idx++) {
pika_worker_thread_[idx]->ThreadClientKill();
Expand Down

0 comments on commit 862dfce

Please sign in to comment.