Skip to content

Commit

Permalink
Merge branch 'gdq_dev'
Browse files Browse the repository at this point in the history
  • Loading branch information
gaodunqiao committed Nov 17, 2017
2 parents 6a8e0bf + 3aec8b7 commit 9a7b8c9
Show file tree
Hide file tree
Showing 7 changed files with 151 additions and 111 deletions.
175 changes: 90 additions & 85 deletions include/pika_admin.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,16 @@
* Admin
*/
class SlaveofCmd : public Cmd {
public:
SlaveofCmd() : is_noone_(false), have_offset_(false),
filenum_(0), pro_offset_(0) {
public:
SlaveofCmd()
: is_noone_(false),
have_offset_(false),
filenum_(0),
pro_offset_(0) {
}
virtual void Do();
private:

private:
std::string master_ip_;
int64_t master_port_;
bool is_noone_;
Expand All @@ -32,11 +36,11 @@ class SlaveofCmd : public Cmd {
};

class TrysyncCmd : public Cmd {
public:
TrysyncCmd() {
}
public:
TrysyncCmd() {}
virtual void Do();
private:

private:
std::string slave_ip_;
int64_t slave_port_;
int64_t filenum_;
Expand All @@ -45,116 +49,117 @@ class TrysyncCmd : public Cmd {
};

class InternalTrysyncCmd : public Cmd {
public:
InternalTrysyncCmd() {
}
public:
InternalTrysyncCmd() {}
virtual void Do();
private:

private:
std::string hub_ip_;
int64_t hub_port_;
int64_t filenum_;
int64_t pro_offset_;
bool send_most_recently_;
virtual void DoInitial(PikaCmdArgsType &argvs, const CmdInfo* const ptr_info);
};

class AuthCmd : public Cmd {
public:
AuthCmd() {
}
public:
AuthCmd() {}
virtual void Do();
private:

private:
std::string pwd_;
virtual void DoInitial(PikaCmdArgsType &argvs, const CmdInfo* const ptr_info);
};

class BgsaveCmd : public Cmd {
public:
BgsaveCmd() {
}
public:
BgsaveCmd() {}
virtual void Do();
private:

private:
virtual void DoInitial(PikaCmdArgsType &argvs, const CmdInfo* const ptr_info);
};

class BgsaveoffCmd : public Cmd {
public:
BgsaveoffCmd() {
}
public:
BgsaveoffCmd() {}
virtual void Do();
private:

private:
virtual void DoInitial(PikaCmdArgsType &argvs, const CmdInfo* const ptr_info);
};

class CompactCmd : public Cmd {
public:
CompactCmd() {
}
public:
CompactCmd() {}
virtual void Do();
private:

private:
virtual void DoInitial(PikaCmdArgsType &argvs, const CmdInfo* const ptr_info);
};

class PurgelogstoCmd : public Cmd {
public:
PurgelogstoCmd() : num_(0){
}
public:
PurgelogstoCmd() : num_(0) {}
virtual void Do();
private:

private:
uint32_t num_;
virtual void DoInitial(PikaCmdArgsType &argvs, const CmdInfo* const ptr_info);
};

class PingCmd : public Cmd {
public:
PingCmd() {
}
public:
PingCmd() {}
virtual void Do();
private:

private:
virtual void DoInitial(PikaCmdArgsType &argvs, const CmdInfo* const ptr_info);
};

class SelectCmd : public Cmd {
public:
SelectCmd() {
}
public:
SelectCmd() {}
virtual void Do();
private:

private:
virtual void DoInitial(PikaCmdArgsType &argvs, const CmdInfo* const ptr_info);
};

class FlushallCmd : public Cmd {
public:
FlushallCmd() {
}
public:
FlushallCmd() {}
virtual void Do();
private:

private:
virtual void DoInitial(PikaCmdArgsType &argvs, const CmdInfo* const ptr_info);
};

class ReadonlyCmd : public Cmd {
public:
ReadonlyCmd() : is_open_(false) {
}
public:
ReadonlyCmd() : is_open_(false) {}
virtual void Do();
private:

private:
bool is_open_;
virtual void DoInitial(PikaCmdArgsType &argvs, const CmdInfo* const ptr_info);
};

class ClientCmd : public Cmd {
public:
ClientCmd() {
}
public:
ClientCmd() {}
virtual void Do();
const static std::string CLIENT_LIST_S;
const static std::string CLIENT_KILL_S;
private:

private:
std::string operation_, ip_port_;
virtual void DoInitial(PikaCmdArgsType &argvs, const CmdInfo* const ptr_info);
};

class InfoCmd : public Cmd {
public:
public:
enum InfoSection {
kInfoErr = 0x0,
kInfoServer,
Expand All @@ -170,10 +175,10 @@ class InfoCmd : public Cmd {
kInfoDoubleMaster
};

InfoCmd() : rescan_(false), off_(false) {
}
InfoCmd() : rescan_(false), off_(false) {}
virtual void Do();
private:

private:
InfoSection info_section_;
bool rescan_; //whether to rescan the keyspace
bool off_;
Expand Down Expand Up @@ -207,20 +212,20 @@ class InfoCmd : public Cmd {
};

class ShutdownCmd : public Cmd {
public:
ShutdownCmd() {
}
public:
ShutdownCmd() {}
virtual void Do();
private:

private:
virtual void DoInitial(PikaCmdArgsType &argvs, const CmdInfo* const ptr_info);
};

class ConfigCmd : public Cmd {
public:
ConfigCmd() {
}
public:
ConfigCmd() {}
virtual void Do();
private:

private:
std::vector<std::string> config_args_v_;
virtual void DoInitial(PikaCmdArgsType &argv, const CmdInfo* const ptr_info);
void ConfigGet(std::string &ret);
Expand All @@ -230,51 +235,51 @@ class ConfigCmd : public Cmd {
};

class MonitorCmd : public Cmd {
public:
MonitorCmd() {
}
public:
MonitorCmd() {}
virtual void Do();
private:

private:
virtual void DoInitial(PikaCmdArgsType &argv, const CmdInfo* const ptr_info);
};

class DbsizeCmd : public Cmd {
public:
DbsizeCmd() {
}
public:
DbsizeCmd() {}
virtual void Do();
private:

private:
virtual void DoInitial(PikaCmdArgsType &argv, const CmdInfo* const ptr_info);
};

class TimeCmd : public Cmd {
public:
TimeCmd() {
}
public:
TimeCmd() {}
virtual void Do();
private:

private:
virtual void DoInitial(PikaCmdArgsType &argv, const CmdInfo* const ptr_info);
};

class DelbackupCmd : public Cmd {
public:
DelbackupCmd() {
}
public:
DelbackupCmd() {}
virtual void Do();
private:

private:
virtual void DoInitial(PikaCmdArgsType &argv, const CmdInfo* const ptr_info);
};

#ifdef TCMALLOC_EXTENSION
class TcmallocCmd : public Cmd {
public:
TcmallocCmd() {
}
public:
TcmallocCmd() {}
virtual void Do();
private:

private:
int64_t type_;
int64_t rate_;
virtual void DoInitial(PikaCmdArgsType &argv, const CmdInfo* const ptr_info);
};
#endif
#endif
#endif // PIKA_ADMIN_H_
15 changes: 9 additions & 6 deletions include/pika_hub_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -154,22 +154,24 @@ class PikaHubManager {
}

Status AddHub(const std::string hub_ip, int hub_port,
uint32_t filenum, uint64_t con_offset);
uint32_t filenum, uint64_t con_offset,
bool send_most_recently);

std::string hub_ip() {
return hub_ip_;
std::string hub_ip() { return hub_ip_; }

bool CouldPurge(int file_tobe_purged) {
slash::MutexLock l(&sending_window_protector_);
return file_tobe_purged < sending_window_.left;
}

std::string StatusToString();

void HubConnected() { hub_stage_ = STARTED; }
void StopHub(int connnection_num);

slash::Mutex sending_window_protector_;

private:
friend class PikaHubSenderThread;
Status ResetSenders();
Status ResetSenders(bool send_most_recently);
bool GetNextFilenum(PikaHubSenderThread* thread,
uint32_t* filenum, uint64_t* con_offset);

Expand All @@ -182,6 +184,7 @@ class PikaHubManager {
uint32_t hub_filenum_;
uint64_t hub_con_offset_;

slash::Mutex sending_window_protector_;
struct {
int64_t left;
int64_t right;
Expand Down
6 changes: 5 additions & 1 deletion src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -215,14 +215,18 @@ void InternalTrysyncCmd::DoInitial(PikaCmdArgsType &argv,
res_.SetRes(CmdRes::kInvalidInt);
return;
}

send_most_recently_ = (*it == "1");
}

void InternalTrysyncCmd::Do() {
LOG(INFO) << "InternalTrysync, Hub ip: " << hub_ip_ << "Hub port:" << hub_port_
<< " filenum: " << filenum_ << " pro_offset: " << pro_offset_;

Status status = g_pika_server->pika_hub_manager_->AddHub(
hub_ip_, hub_port_ + 1000, filenum_, pro_offset_);
hub_ip_, hub_port_ + 1000,
filenum_, pro_offset_,
send_most_recently_);

if (!status.ok()) {
LOG(WARNING) << "hub offset is larger than mine, slave ip: " << hub_ip_
Expand Down
2 changes: 1 addition & 1 deletion src/pika_command.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ void InitCmdInfoTable() {
CmdInfo* trysyncptr = new CmdInfo(kCmdNameTrysync, 5, kCmdFlagsRead | kCmdFlagsAdmin | kCmdFlagsSuspend | kCmdFlagsAdminRequire);
cmd_infos.insert(std::pair<std::string, CmdInfo*>(kCmdNameTrysync, trysyncptr));
////InternalTrysync for Pika HUB
CmdInfo* internal_trysyncptr = new CmdInfo(kCmdNameInternalTrysync, 5, kCmdFlagsRead | kCmdFlagsAdmin | kCmdFlagsSuspend | kCmdFlagsAdminRequire);
CmdInfo* internal_trysyncptr = new CmdInfo(kCmdNameInternalTrysync, 6, kCmdFlagsRead | kCmdFlagsAdmin | kCmdFlagsSuspend | kCmdFlagsAdminRequire);
cmd_infos.insert(std::pair<std::string, CmdInfo*>(kCmdNameInternalTrysync, internal_trysyncptr ));
CmdInfo* authptr = new CmdInfo(kCmdNameAuth, 2, kCmdFlagsRead | kCmdFlagsAdmin);
cmd_infos.insert(std::pair<std::string, CmdInfo*>(kCmdNameAuth, authptr));
Expand Down
Loading

0 comments on commit 9a7b8c9

Please sign in to comment.