Skip to content

Commit

Permalink
Merge branch 'pika2.0' of github.com:baotiao/pika into pika2.0
Browse files Browse the repository at this point in the history
  • Loading branch information
JacketWoo committed Apr 11, 2016
2 parents 45d5a73 + 883f62c commit 3bbccaa
Show file tree
Hide file tree
Showing 7 changed files with 95 additions and 55 deletions.
4 changes: 2 additions & 2 deletions include/pika_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ class PikaServer
* Master use
*/
int64_t GenSid() {
slash::MutexLock l(&slave_mutex_);
// slash::MutexLock l(&slave_mutex_);
int64_t sid = sid_;
sid_++;
return sid;
Expand Down Expand Up @@ -200,7 +200,7 @@ class PikaServer
std::string s_start_time;
std::vector<uint64_t> key_nums_v; //the order is kv, hash, list, zset, set
bool key_scaning_;
KeyScanInfo() : start_time(0), key_nums_v({0, 0, 0, 0, 0}), key_scaning_(false) {
KeyScanInfo() : start_time(0), s_start_time("1970-01-01 08:00:00"), key_nums_v({0, 0, 0, 0, 0}), key_scaning_(false) {
}
};
bool key_scaning() {
Expand Down
89 changes: 46 additions & 43 deletions src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,12 @@ void SlaveofCmd::DoInitial(PikaCmdArgsType &argv, const CmdInfo* const ptr_info)
} else if (cur_size == 2) {
have_offset_ = true;
std::string str_filenum = *it++;
if (!slash::string2l(str_filenum.data(), str_filenum.size(), &filenum_) && filenum_ < 0) {
if (!slash::string2l(str_filenum.data(), str_filenum.size(), &filenum_) || filenum_ < 0) {
res_.SetRes(CmdRes::kInvalidInt);
return;
}
std::string str_pro_offset = *it++;
if (!slash::string2l(str_pro_offset.data(), str_pro_offset.size(), &pro_offset_) && pro_offset_ < 0) {
if (!slash::string2l(str_pro_offset.data(), str_pro_offset.size(), &pro_offset_) || pro_offset_ < 0) {
res_.SetRes(CmdRes::kInvalidInt);
return;
}
Expand Down Expand Up @@ -111,6 +111,7 @@ void TrysyncCmd::Do() {
ip_port.append(":");
ip_port.append(buf);
DLOG(INFO) << "Trysync, Slave ip_port: " << ip_port << " filenum: " << filenum_ << " pro_offset: " << pro_offset_;
slash::MutexLock l(&(g_pika_server->slave_mutex_));
if (!g_pika_server->FindSlave(ip_port)) {
SlaveItem s;
s.sid = g_pika_server->GenSid();
Expand Down Expand Up @@ -441,51 +442,52 @@ void InfoCmd::InfoServer(std::string &info) {
std::stringstream tmp_stream;
uint32_t purge_max;
tmp_stream << "# Server\r\n";
tmp_stream << "pika_version: " << kPikaVersion << "\r\n";
tmp_stream << "os: " << host_info.sysname << " " << host_info.release << " " << host_info.machine << "\r\n";
tmp_stream << "arch_bits: " << (reinterpret_cast<char*>(&host_info.machine) + strlen(host_info.machine) - 2) << "\r\n";
tmp_stream << "process_id: " << getpid() << "\r\n";
tmp_stream << "tcp_port: " << g_pika_conf->port() << "\r\n";
tmp_stream << "thread_num: " << g_pika_conf->thread_num() << "\r\n";
tmp_stream << "uptime_in_seconds: " << (current_time_s - g_pika_server->start_time_s()) << "\r\n";
tmp_stream << "uptime_in_days: " << (current_time_s / (24*3600) - g_pika_server->start_time_s() / (24*3600) + 1) << "\r\n";
tmp_stream << "config_file: " << g_pika_conf->conf_path() << "\r\n";
tmp_stream << "pika_version:" << kPikaVersion << "\r\n";
tmp_stream << "os:" << host_info.sysname << " " << host_info.release << " " << host_info.machine << "\r\n";
tmp_stream << "arch_bits:" << (reinterpret_cast<char*>(&host_info.machine) + strlen(host_info.machine) - 2) << "\r\n";
tmp_stream << "process_id:" << getpid() << "\r\n";
tmp_stream << "tcp_port:" << g_pika_conf->port() << "\r\n";
tmp_stream << "thread_num:" << g_pika_conf->thread_num() << "\r\n";
tmp_stream << "uptime_in_seconds:" << (current_time_s - g_pika_server->start_time_s()) << "\r\n";
tmp_stream << "uptime_in_days:" << (current_time_s / (24*3600) - g_pika_server->start_time_s() / (24*3600) + 1) << "\r\n";
tmp_stream << "config_file:" << g_pika_conf->conf_path() << "\r\n";
PikaServer::BGSaveInfo bgsave_info = g_pika_server->bgsave_info();
bool is_bgsaving = g_pika_server->bgsaving();
tmp_stream << "is_bgsaving: " << (is_bgsaving ? "Yes, " : "No, ") << bgsave_info.s_start_time << ", "
tmp_stream << "is_bgsaving:" << (is_bgsaving ? "Yes, " : "No, ") << bgsave_info.s_start_time << ", "
<< (is_bgsaving ? (current_time_s - bgsave_info.start_time) : 0) << "\r\n";
PikaServer::KeyScanInfo key_scan_info = g_pika_server->key_scan_info();
bool is_scaning = g_pika_server->key_scaning();
tmp_stream << "is_scaning_keyspace: " << (is_scaning ? ("Yes, " + key_scan_info.s_start_time) + "," : "No");
tmp_stream << "is_scaning_keyspace:" << (is_scaning ? ("Yes, " + key_scan_info.s_start_time) + "," : "No");
if (is_scaning) {
tmp_stream << current_time_s - key_scan_info.start_time;
}
tmp_stream << "\r\n";
tmp_stream << "is_compact: " << g_pika_server->db()->GetCurrentTaskType() << "\r\n";
tmp_stream << "db_size: " << (slash::Du(g_pika_conf->db_path()) >> 20) << "M\r\n";
tmp_stream << "log_size: " << (slash::Du(g_pika_conf->log_path()) >> 20) << "M\r\n";
tmp_stream << "compression: " << g_pika_conf->compression() << "\r\n";
tmp_stream << "safety purge: " << (g_pika_server->GetPurgeWindow(purge_max) ? static_cast<int32_t>(purge_max) : -1) << "\r\n";
tmp_stream << "expire_logs_days: " << g_pika_conf->expire_logs_days() << "\r\n";
tmp_stream << "expire_logs_nums: " << g_pika_conf->expire_logs_nums() << "\r\n";
tmp_stream << "is_compact:" << g_pika_server->db()->GetCurrentTaskType() << "\r\n";
tmp_stream << "db_size:" << (slash::Du(g_pika_conf->db_path()) >> 20) << "M\r\n";
tmp_stream << "log_size:" << (slash::Du(g_pika_conf->log_path()) >> 20) << "M\r\n";
tmp_stream << "compression:" << g_pika_conf->compression() << "\r\n";
tmp_stream << "safety_purge:" << (g_pika_server->GetPurgeWindow(purge_max) ? std::to_string(static_cast<int32_t>(purge_max)) : "none") << "\r\n";
tmp_stream << "expire_logs_days:" << g_pika_conf->expire_logs_days() << "\r\n";
tmp_stream << "expire_logs_nums:" << g_pika_conf->expire_logs_nums() << "\r\n";

info.append(tmp_stream.str());
}

void InfoCmd::InfoClients(std::string &info) {
std::stringstream tmp_stream;
tmp_stream << "# Clients\r\n";
tmp_stream << "connected_clients: " << g_pika_server->ClientList() << "\r\n";
tmp_stream << "connected_clients:" << g_pika_server->ClientList() << "\r\n";

info.append(tmp_stream.str());
}

void InfoCmd::InfoStats(std::string &info) {
std::stringstream tmp_stream;
tmp_stream << "# Stats\r\n";
tmp_stream << "total_connections_received: " << g_pika_server->accumulative_connections() << "\r\n";
tmp_stream << "instances_ops_per_sec: " << g_pika_server->ServerCurrentQps() << "\r\n";
tmp_stream << "accumulative_query_nums: " << g_pika_server->ServerQueryNum() << "\r\n";

tmp_stream << "total_connections_received:" << g_pika_server->accumulative_connections() << "\r\n";
tmp_stream << "instantaneous_ops_per_sec:" << g_pika_server->ServerCurrentQps() << "\r\n";
tmp_stream << "accumulative_query_nums:" << g_pika_server->ServerQueryNum() << "\r\n";

info.append(tmp_stream.str());
}
Expand All @@ -495,28 +497,29 @@ void InfoCmd::InfoReplication(std::string &info) {
std::stringstream tmp_stream;
tmp_stream << "# replication(";
switch (host_role) {
case PIKA_ROLE_MASTER :
case PIKA_ROLE_SINGLE : tmp_stream << "MASTER)\r\nrole: master\r\n"; break;
case PIKA_ROLE_SLAVE : tmp_stream << "SLAVE)\r\nrole: slave\r\n"; break;
case PIKA_ROLE_MASTER | PIKA_ROLE_SLAVE : tmp_stream << "MASTER/SLAVE)\r\nrole: slave\r\n"; break;
case PIKA_ROLE_MASTER : tmp_stream << "MASTER)\r\nrole:master\r\n"; break;
case PIKA_ROLE_SINGLE : tmp_stream << "MASTER)\r\nrole:single\r\n"; break;
case PIKA_ROLE_SLAVE : tmp_stream << "SLAVE)\r\nrole:slave\r\n"; break;
case PIKA_ROLE_MASTER | PIKA_ROLE_SLAVE : tmp_stream << "MASTER/SLAVE)\r\nrole:slave\r\n"; break;
default: info.append("ERR: server role is error\r\n"); return;
}

std::string slaves_list_str;
//int32_t slaves_num = g_pika_server->GetSlaveListString(slaves_list_str);
switch (host_role) {
case PIKA_ROLE_MASTER | PIKA_ROLE_SLAVE :
case PIKA_ROLE_SLAVE :
tmp_stream << "master_host: " << g_pika_server->master_ip() << "\r\n";
tmp_stream << "master_port: " << g_pika_server->master_port() << "\r\n";
tmp_stream << "master_link_status: " << (g_pika_server->repl_state() == PIKA_REPL_CONNECTED ? "up" : "down") << "\r\n";
tmp_stream << "slave_read_only: " << g_pika_conf->readonly() << "\r\n";
if (host_role == PIKA_ROLE_SLAVE) {
break;
}
tmp_stream << "master_host:" << g_pika_server->master_ip() << "\r\n";
tmp_stream << "master_port:" << g_pika_server->master_port() << "\r\n";
tmp_stream << "master_link_status:" << (g_pika_server->repl_state() == PIKA_REPL_CONNECTED ? "up" : "down") << "\r\n";
tmp_stream << "slave_read_only:" << g_pika_conf->readonly() << "\r\n";
case PIKA_ROLE_MASTER | PIKA_ROLE_SLAVE :
tmp_stream << "master_host:" << g_pika_server->master_ip() << "\r\n";
tmp_stream << "master_port:" << g_pika_server->master_port() << "\r\n";
tmp_stream << "master_link_status:" << (g_pika_server->repl_state() == PIKA_REPL_CONNECTED ? "connected" : "down") << "\r\n";
tmp_stream << "slave_read_only:" << g_pika_conf->readonly() << "\r\n";
case PIKA_ROLE_SINGLE :
case PIKA_ROLE_MASTER :
tmp_stream << "connected_slaves: " << g_pika_server->GetSlaveListString(slaves_list_str) << "\r\n" << slaves_list_str;
tmp_stream << "connected_slaves:" << g_pika_server->GetSlaveListString(slaves_list_str) << "\r\n" << slaves_list_str;
}

info.append(tmp_stream.str());
Expand All @@ -531,12 +534,12 @@ void InfoCmd::InfoKeyspace(std::string &info) {
}
std::stringstream tmp_stream;
tmp_stream << "# Keyspace\r\n";
tmp_stream << "# Time: " << key_scan_info.s_start_time << "\r\n";
tmp_stream << "kv keys: " << key_nums_v[0] << "\r\n";
tmp_stream << "hash keys: " << key_nums_v[1] << "\r\n";
tmp_stream << "list keys: " << key_nums_v[2] << "\r\n";
tmp_stream << "zset keys: " << key_nums_v[3] << "\r\n";
tmp_stream << "set keys: " << key_nums_v[4] << "\r\n";
tmp_stream << "# Time:" << key_scan_info.s_start_time << "\r\n";
tmp_stream << "kv keys:" << key_nums_v[0] << "\r\n";
tmp_stream << "hash keys:" << key_nums_v[1] << "\r\n";
tmp_stream << "list keys:" << key_nums_v[2] << "\r\n";
tmp_stream << "zset keys:" << key_nums_v[3] << "\r\n";
tmp_stream << "set keys:" << key_nums_v[4] << "\r\n";
info.append(tmp_stream.str());

if (rescan_) {
Expand Down
8 changes: 7 additions & 1 deletion src/pika_binlog.cc
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,13 @@ Status Binlog::AppendBlank(slash::WritableFile *file, uint64_t len) {
}

// Append a msg which occupy the remain part of the last block
uint32_t n = (uint32_t) ((len % kBlockSize) - kHeaderSize);
// We simply increase the remain length to kHeaderSize when remain part < kHeaderSize
uint32_t n;
if (len % kBlockSize < kHeaderSize) {
n = 0;
} else {
n = (uint32_t) ((len % kBlockSize) - kHeaderSize);
}

char buf[kBlockSize];
buf[0] = static_cast<char>(n & 0xff);
Expand Down
9 changes: 9 additions & 0 deletions src/pika_client_conn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,15 @@ int PikaClientConn::DealMessage() {
slash::StringToLower(opt);
std::string res = DoCmd(opt);

while ((wbuf_size_ - wbuf_len_ <= res.size())) {
if (!ExpandWbuf()) {
LOG(WARNING) << "wbuf is too large";
memcpy(wbuf_, "-ERR buf is too large\r\n", 23);
wbuf_len_ = 23;
set_is_reply(true);
return 0;
}
}
memcpy(wbuf_ + wbuf_len_, res.data(), res.size());
wbuf_len_ += res.size();
set_is_reply(true);
Expand Down
36 changes: 28 additions & 8 deletions src/pika_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,16 @@ PikaServer::~PikaServer() {
delete pika_worker_thread_[i];
}

{
slash::MutexLock l(&slave_mutex_);
std::vector<SlaveItem>::iterator iter = slaves_.begin();

while (iter != slaves_.end()) {
delete static_cast<PikaBinlogSenderThread*>(iter->sender);
iter = slaves_.erase(iter);
DLOG(INFO) << "Delete slave success";
}
}
delete ping_thread_;
delete pika_binlog_receiver_thread_;
delete pika_trysync_thread_;
Expand All @@ -86,17 +96,27 @@ PikaServer::~PikaServer() {
}

bool PikaServer::ServerInit() {
char hname[128];
struct hostent *hent;

gethostname(hname, sizeof(hname));
hent = gethostbyname(hname);
int fd;
struct ifreq ifr;

host_ = inet_ntoa(*(struct in_addr*)(hent->h_addr_list[0]));
fd = socket(AF_INET, SOCK_DGRAM, 0);

/* I want to get an IPv4 IP address */
ifr.ifr_addr.sa_family = AF_INET;

/* I want IP address attached to "eth0" */
strncpy(ifr.ifr_name, "eth0", IFNAMSIZ-1);

ioctl(fd, SIOCGIFADDR, &ifr);

close(fd);
host_ = inet_ntoa(((struct sockaddr_in *)&ifr.ifr_addr)->sin_addr);

port_ = g_pika_conf->port();
DLOG(INFO) << "host: " << host_ << " port: " << port_;
return true;

}

void PikaServer::Cleanup() {
Expand Down Expand Up @@ -184,7 +204,7 @@ void PikaServer::MayUpdateSlavesMap(int64_t sid, int32_t hb_fd) {
}

bool PikaServer::FindSlave(std::string& ip_port) {
slash::MutexLock l(&slave_mutex_);
// slash::MutexLock l(&slave_mutex_);
std::vector<SlaveItem>::iterator iter = slaves_.begin();

while (iter != slaves_.end()) {
Expand Down Expand Up @@ -340,7 +360,7 @@ Status PikaServer::AddBinlogSender(SlaveItem &slave, uint32_t filenum, uint64_t

DLOG(INFO) << "AddBinlogSender ok, tid is " << slave.sender_tid << " hd_fd: " << slave.hb_fd << " stage: " << slave.stage;
// Add sender
slash::MutexLock l(&slave_mutex_);
// slash::MutexLock l(&slave_mutex_);
slaves_.push_back(slave);

return Status::OK();
Expand Down Expand Up @@ -718,7 +738,7 @@ void PikaServer::KeyScan() {
void PikaServer::InitKeyScan() {
key_scan_info_.start_time = time(NULL);
char s_time[32];
int len = strftime(s_time, sizeof(s_time), "%Y%m%d%H%M%S", localtime(&key_scan_info_.start_time));
int len = strftime(s_time, sizeof(s_time), "%Y-%m-%d %H:%M:%S", localtime(&key_scan_info_.start_time));
key_scan_info_.s_start_time.assign(s_time, len);
}

Expand Down
2 changes: 2 additions & 0 deletions src/pika_slaveping_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ pink::Status PikaSlavepingThread::RecvProc() {
} else {
s = pink::Status::Corruption("");
}
} else {
DLOG(INFO) << "RecvProc, recv error: " << s.ToString();
}
return s;
}
Expand Down
2 changes: 1 addition & 1 deletion third/pink
Submodule pink updated from 76b786 to 2a06e3

0 comments on commit 3bbccaa

Please sign in to comment.