Skip to content

Commit

Permalink
add info keyspace thread & bugfix
Browse files Browse the repository at this point in the history
  • Loading branch information
songzhao committed Nov 5, 2015
1 parent 02ea2f6 commit c419841
Show file tree
Hide file tree
Showing 8 changed files with 124 additions and 48 deletions.
2 changes: 1 addition & 1 deletion conf/pika.conf
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ log_level : 0
# Pika db path
db_path : ./db/
# Pika write_buffer_size
write_buffer_size : 1000000000
write_buffer_size : 268435456
# Pika timeout
timeout : 60
# Requirepass
Expand Down
1 change: 1 addition & 0 deletions include/pika_conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class PikaConn
// struct timeval lastinteraction() { return lastinteraction_; };
// void UpdateLastInteraction() { gettimeofday(&lastinteraction_, NULL); };
int wbuflen() { return sdslen(wbuf_); }
int rbuflen() { return sdslen(rbuf_); }
int querynums() { return querynums_; }
void clear_querynums() { querynums_ = 0; }
private:
Expand Down
6 changes: 3 additions & 3 deletions include/pika_define.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
#define PIKA_MAX_MESSAGE 10240
#define PIKA_THREAD_NUM 24

#define PIKA_IOBUF_LEN (1024 * 64)
#define PIKA_MBULK_BIG_ARG (1024 * 32)
#define PIKA_INLINE_MAX_SIZE (1024 * 64)
#define PIKA_IOBUF_LEN (1024 * 1024 * 64)
#define PIKA_MBULK_BIG_ARG (1024 * 1024 * 32)
#define PIKA_INLINE_MAX_SIZE (1024 * 1024 * 64)

#define PIKA_DEFAULT_PID_FILE "pika.pid"

Expand Down
14 changes: 14 additions & 0 deletions include/pika_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,20 @@ class PikaServer
std::string is_bgsaving();
bool is_readonly_;

pthread_t info_keyspace_thread_id_;
bool info_keyspacing_;
time_t info_keyspace_start_time_;
time_t last_info_keyspace_start_time_;
std::vector<uint64_t> keynums_;
uint64_t last_kv_num_;
uint64_t last_hash_num_;
uint64_t last_list_num_;
uint64_t last_zset_num_;
uint64_t last_set_num_;
std::string is_scaning();
void InfoKeySpace();
static void* StartInfoKeySpace(void* arg);

private:
friend class PikaConn;
Status SetBlockType(BlockType type);
Expand Down
66 changes: 30 additions & 36 deletions src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -493,26 +493,27 @@ void InfoCmd::Do(std::list<std::string> &argv, std::string &ret) {
uname(&name);
call_uname = false;
}
std::string s = g_pikaServer->is_bgsaving();
char buf[512];
snprintf (buf, sizeof(buf),
"# Server\r\n"
"pika_version:%s\r\n"
"os:%s %s %s\r\n"
"process_id:%ld\r\n"
"tcp_port:%d\r\n"
"thread_num:%d\r\n"
"config_file:%s\r\n"
"is_bgsaving:%s\r\n"
"current qps:%d\r\n",
"pika_version: %s\r\n"
"os: %s %s %s\r\n"
"process_id: %ld\r\n"
"tcp_port: %d\r\n"
"thread_num: %d\r\n"
"config_file: %s\r\n"
"is_bgsaving: %s\r\n"
"current qps: %d\r\n"
"is_scaning_keyspace: %s\r\n",
PIKA_VERSION,
name.sysname, name.release, name.machine,
(long) getpid(),
g_pikaConf->port(),
g_pikaConf->thread_num(),
g_pikaConf->conf_path(),
g_pikaServer->is_bgsaving().c_str(),
g_pikaServer->CurrentQps()
g_pikaServer->CurrentQps(),
g_pikaServer->is_scaning().c_str()
);
info.append(buf);
}
Expand All @@ -525,45 +526,38 @@ void InfoCmd::Do(std::list<std::string> &argv, std::string &ret) {
char buf[128];
snprintf (buf, sizeof(buf),
"# Clients\r\n"
"connected_clients:%d\r\n",
"connected_clients: %d\r\n",
g_pikaServer->ClientList(clients));
info.append(buf);
}

// Key Space
if (section == "keyspace") {
if (sections++) info.append("\r\n");

char buf[128];
std::string key_type = "";
char buf[512];
std::string op = "";
if (!argv.empty()) {
key_type = argv.front();
transform(section.begin(), section.end(), section.begin(), ::tolower);
op = argv.front();
transform(op.begin(), op.end(), op.begin(), ::tolower);
argv.pop_front();

uint64_t num = 0;
nemo::Status s = g_pikaServer->GetHandle()->GetSpecifyKeyNum(key_type, num);
if (s.ok()) {
snprintf (buf, sizeof(buf),
"# Keyspace\r\n"
"%s keys:%lu\r\n",
key_type.c_str(), num);
} else {
ret = "-ERR \'" + key_type + "\' is not valid key type\r\n";
if (!argv.empty() || op != "readonly") {
ret = "-ERR Invalid Argument\r\n";
return;
}
} else {
std::vector<uint64_t> nums;
g_pikaServer->GetHandle()->GetKeyNum(nums);
snprintf (buf, sizeof(buf),
"# Keyspace\r\n"
"kv keys:%lu\r\n"
"hash keys:%lu\r\n"
"list keys:%lu\r\n"
"zset keys:%lu\r\n"
"set keys:%lu\r\n",
nums[0], nums[1], nums[2], nums[3], nums[4]);
g_pikaServer->InfoKeySpace();
}
char infotime[32];
strftime(infotime, sizeof(infotime), "%Y-%m-%d %H:%M:%S", localtime(&(g_pikaServer->last_info_keyspace_start_time_)));
snprintf (buf, sizeof(buf),
"# Keyspace\r\n"
"# Time: %s\r\n"
"kv keys: %lu\r\n"
"hash keys: %lu\r\n"
"list keys: %lu\r\n"
"zset keys: %lu\r\n"
"set keys: %lu\r\n",
infotime, g_pikaServer->last_kv_num_, g_pikaServer->last_hash_num_, g_pikaServer->last_list_num_, g_pikaServer->last_zset_num_, g_pikaServer->last_set_num_);
info.append(buf);
}

Expand Down
16 changes: 16 additions & 0 deletions src/pika_conf.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,22 @@ PikaConf::PikaConf(const char* path) :
getConfInt("maxconnection", &maxconnection_);
getConfInt("target_file_size_base", &target_file_size_base_);

if (thread_num_ <= 0) {
thread_num_ = 24;
}
if (write_buffer_size_ <= 0 ) {
write_buffer_size_ = 4194304; // 40M
}
if (timeout_ <= 0) {
timeout_ = 60; // 60s
}
if (maxconnection_ <= 0) {
maxconnection_ = 20000;
}
if (target_file_size_base_ <= 0) {
target_file_size_base_ = 1048576; // 10M
}

char str[PIKA_WORD_SIZE];
getConfStr("daemonize", str);

Expand Down
64 changes: 56 additions & 8 deletions src/pika_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,13 @@ PikaServer::PikaServer()
//
// stat_keyspace_hits = 0;
// stat_keyspace_misses = 0;
nemo::Options option;
option.write_buffer_size = g_pikaConf->write_buffer_size();
option.target_file_size_base = g_pikaConf->target_file_size_base();

LOG(WARNING) << "Prepare DB...";
db_ = new nemo::Nemo(g_pikaConf->db_path(), option);
LOG(WARNING) << "DB Success";
// init sock
sockfd_ = socket(AF_INET, SOCK_STREAM, 0);
memset(&servaddr_, 0, sizeof(servaddr_));
Expand Down Expand Up @@ -87,18 +93,12 @@ PikaServer::PikaServer()
dump_pro_offset_ = 0;
bgsaving_ = false;
is_readonly_ = false;
info_keyspacing_ = false;
// options_.create_if_missing = true;
// options_.write_buffer_size = 1500000000;
// leveldb::Status s = leveldb::DB::Open(options_, "/tmp/testdb", &db_);
// leveldb::Status s = leveldb::DB::Open(options_, "/tmp/testdb", &db_);
// db_ = new nemo::Nemo("/tmp/testdb");
nemo::Options option;
option.write_buffer_size = g_pikaConf->write_buffer_size();
option.target_file_size_base = g_pikaConf->target_file_size_base();

LOG(WARNING) << "Prepare DB...";
db_ = new nemo::Nemo(g_pikaConf->db_path(), option);
LOG(WARNING) << "DB Success";
// if (!s.ok()) {
// log_err("Open db failed");
// }
Expand Down Expand Up @@ -159,7 +159,7 @@ void PikaServer::Dump() {
bgsaving_ = true;
}
bgsaving_start_time_ = time(NULL);
strftime(dump_time_, sizeof(dump_time_), "%Y%m%H%M%S",localtime(&bgsaving_start_time_));
strftime(dump_time_, sizeof(dump_time_), "%Y%m%d%H%M%S",localtime(&bgsaving_start_time_));
// LOG(INFO) << tmp;
dump_args *arg = new dump_args;
arg->p = (void*)this;
Expand Down Expand Up @@ -195,6 +195,35 @@ void* PikaServer::StartDump(void* arg) {
return NULL;
}

void PikaServer::InfoKeySpace() {
MutexLock l(&mutex_);
if (info_keyspacing_) {
return;
}

info_keyspacing_ = true;
pthread_create(&info_keyspace_thread_id_, NULL, &(PikaServer::StartInfoKeySpace), this);
}

void* PikaServer::StartInfoKeySpace(void* arg) {
PikaServer* p = (PikaServer*)arg;
p->info_keyspace_start_time_ = time(NULL);
p->keynums_.clear();
p->GetHandle()->GetKeyNum(p->keynums_);

{
MutexLock l(p->Mutex());
p->last_info_keyspace_start_time_ = p->info_keyspace_start_time_;
p->last_kv_num_ = p->keynums_[0];
p->last_hash_num_ = p->keynums_[1];
p->last_list_num_ = p->keynums_[2];
p->last_zset_num_ = p->keynums_[3];
p->last_set_num_ = p->keynums_[4];
p->info_keyspacing_ = false;
}
return NULL;
}

void PikaServer::Slaveofnoone() {
MutexLock l(&mutex_);
if (repl_state_ == PIKA_SLAVE) {
Expand Down Expand Up @@ -230,6 +259,25 @@ std::string PikaServer::is_bgsaving() {
return s;
}

std::string PikaServer::is_scaning() {
MutexLock l(&mutex_);
std::string s;
if (info_keyspacing_) {
s = "Yes, ";
char infotime[32];
strftime(infotime, sizeof(infotime), "%Y-%m-%d %H:%M:%S", localtime(&(info_keyspace_start_time_)));
s.append(infotime);
time_t delta = time(NULL) - info_keyspace_start_time_;
char buf[32];
snprintf(buf, sizeof(buf), "%lu", delta);
s.append(", ");
s.append(buf);
} else {
s = "No";
}
return s;
}

void PikaServer::ProcessTimeEvent(struct timeval* target) {
std::string ip_port;
char buf[32];
Expand Down
3 changes: 3 additions & 0 deletions src/pika_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ int PikaThread::ProcessTimeEvent(struct timeval* target) {
if (crontimes_ == 0 && ((iter->second->role() == PIKA_MASTER && g_pikaServer->ms_state_ == PIKA_REP_CONNECTED) || (iter->second->role() == PIKA_SLAVE))) {
LOG(INFO)<<"Send Ping to " << iter->second->ip_port();
iter->second->append_wbuf("*1\r\n$4\r\nPING\r\n");
LOG(INFO) << "length of rbuf_: " << iter->second->rbuflen();
LOG(INFO) << "length of wbuf_: " << iter->second->wbuflen();
}
iter_clientlist = clients_.find(iter->second->ip_port());
Expand Down Expand Up @@ -248,6 +249,7 @@ void PikaThread::RunProcess()
role = it->second->role();
conns_.erase(it);
}
pikaEpoll_->PikaDelEvent(inConn->fd());
close(inConn->fd());

it_clientlist = clients_.find(inConn->ip_port());
Expand Down Expand Up @@ -294,6 +296,7 @@ void PikaThread::RunProcess()
role = it->second->role();
conns_.erase(it);
}
pikaEpoll_->PikaDelEvent(tfe->fd_);
close(tfe->fd_);
it_clientlist = clients_.find(inConn->ip_port());
if (it_clientlist != clients_.end()) {
Expand Down

0 comments on commit c419841

Please sign in to comment.