Skip to content

Commit

Permalink
Support pkcluster info slot command (OpenAtomFoundation#670)
Browse files Browse the repository at this point in the history
* Support pkcluster info slot command

pkcluster info slot
pkcluster info slot table_name:slot_id

* Bugfix
  • Loading branch information
whoiami authored and Axlgrep committed Jul 10, 2019
1 parent 9b49228 commit 20040b4
Show file tree
Hide file tree
Showing 11 changed files with 253 additions and 8 deletions.
3 changes: 3 additions & 0 deletions include/pika_command.h
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,9 @@ const std::string kCmdNameSlotsMgrtTagSlot = "slotsmgrttagslot";
const std::string kCmdNameSlotsMgrtOne = "slotsmgrtone";
const std::string kCmdNameSlotsMgrtTagOne = "slotsmgrttagone";

const std::string kCmdNamePkClusterInfo = "pkclusterinfo";

const std::string kClusterPrefix = "pkcluster";
typedef pink::RedisCmdArgsType PikaCmdArgsType;
static const int RAW_ARGS_LEN = 1024 * 1024;

Expand Down
12 changes: 6 additions & 6 deletions include/pika_define.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,9 @@ enum SlaveState {

// debug only
const std::string SlaveStateMsg[] = {
"kSlaveNotSync",
"kSlaveDbSync",
"kSlaveBinlogSync"
"SlaveNotSync",
"SlaveDbSync",
"SlaveBinlogSync"
};

enum BinlogSyncState {
Expand All @@ -151,9 +151,9 @@ enum BinlogSyncState {

// debug only
const std::string BinlogSyncStateMsg[] = {
"kNotSync",
"kReadFromCache",
"kReadFromFile"
"NotSync",
"ReadFromCache",
"ReadFromFile"
};

struct BinlogChip {
Expand Down
12 changes: 12 additions & 0 deletions include/pika_rm.h
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,9 @@ class SyncMasterPartition : public SyncPartition {
bool CheckSlaveNodeExist(const std::string& ip, int port);
Status GetSlaveNodeSession(const std::string& ip, int port, int32_t* session);

// display use
Status GetInfo(std::string* info);
// debug use
std::string ToStringStatus();

int32_t GenSessionId();
Expand Down Expand Up @@ -178,6 +181,9 @@ class SyncSlavePartition : public SyncPartition {

Status CheckSyncTimeout(uint64_t now);

// For display
Status GetInfo(std::string* info);
// For debug
std::string ToStringStatus();

const std::string& MasterIp() {
Expand Down Expand Up @@ -256,6 +262,12 @@ class PikaReplicaManager {
Status SetSlaveLastRecvTime(const RmNode& slave, uint64_t time);

Status CheckSyncTimeout(uint64_t now);

// To check partition info
// For pkcluster info command
Status GetPartitionInfo(
const std::string& table, uint32_t partition_id, std::string* info);

Status CheckPartitionRole(
const std::string& table, uint32_t partition_id, int* role);

Expand Down
1 change: 1 addition & 0 deletions include/pika_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,7 @@ class PikaServer {
friend class AddSlotsCmd;
friend class RemoveSlotsCmd;
friend class PikaReplClientConn;
friend class PkClusterInfoCmd;

private:
/*
Expand Down
35 changes: 35 additions & 0 deletions include/pika_slot.h
Original file line number Diff line number Diff line change
Expand Up @@ -202,4 +202,39 @@ class SlotsMgrtTagOneCmd : public Cmd {
virtual void DoInitial() override;
};

class PkClusterInfoCmd : public Cmd {
public:
enum InfoSection {
kInfoErr = 0x0,
kInfoSlot
};
enum InfoRange {
kSingle = 0x0,
kAll
};
PkClusterInfoCmd(const std::string& name, int arity, uint16_t flag)
: Cmd(name, arity, flag),
info_section_(kInfoErr), info_range_(kAll), partition_id_(0) {}
virtual void Do(std::shared_ptr<Partition> partition = nullptr);

private:
InfoSection info_section_;
InfoRange info_range_;

std::string table_name_;
uint32_t partition_id_;

virtual void DoInitial() override;
virtual void Clear() {
info_section_ = kInfoErr;
info_range_ = kAll;
table_name_.clear();
partition_id_ = 0;
}
const static std::string kSlotSection;
void ClusterInfoSlotAll(std::string* info);
Status GetSlotInfo(const std::string table_name, uint32_t partition_id, std::string* info);
bool ParseInfoSlotSubCmd();
};

#endif // PIKA_SLOT_H_
1 change: 1 addition & 0 deletions include/pika_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ class Table : public std::enable_shared_from_this<Table>{

friend class Cmd;
friend class InfoCmd;
friend class PkClusterInfoCmd;
friend class PikaServer;

std::string GetTableName();
Expand Down
5 changes: 5 additions & 0 deletions src/pika_client_conn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,11 @@ int PikaClientConn::DealMessage(const PikaCmdArgsType& argv, std::string* respon

if (argv.empty()) return -2;
std::string opt = argv[0];
if (opt == kClusterPrefix) {
if (argv.size() >=2 ) {
opt += argv[1];
}
}
slash::StringToLower(opt);

if (response->empty()) {
Expand Down
4 changes: 4 additions & 0 deletions src/pika_command.cc
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@ void InitCmdTable(std::unordered_map<std::string, Cmd*> *cmd_table) {
Cmd* slotmgrttagoneptr = new SlotsMgrtTagOneCmd(kCmdNameSlotsMgrtTagOne, 5, kCmdFlagsRead | kCmdFlagsAdmin);
cmd_table->insert(std::pair<std::string, Cmd*>(kCmdNameSlotsMgrtTagOne, slotmgrttagoneptr));

// Cluster related
Cmd* pkclusterinfoptr = new PkClusterInfoCmd(kCmdNamePkClusterInfo, -3, kCmdFlagsRead | kCmdFlagsAdmin);
cmd_table->insert(std::pair<std::string, Cmd*>(kCmdNamePkClusterInfo, pkclusterinfoptr));

#ifdef TCMALLOC_EXTENSION
Cmd* tcmallocptr = new TcmallocCmd(kCmdNameTcmalloc, -2, kCmdFlagsRead | kCmdFlagsAdmin);
cmd_table->insert(std::pair<std::string, Cmd*>(kCmdNameTcmalloc, tcmallocptr));
Expand Down
67 changes: 67 additions & 0 deletions src/pika_rm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,33 @@ std::string SyncMasterPartition::ToStringStatus() {
return tmp_stream.str();
}

Status SyncMasterPartition::GetInfo(std::string* info) {
std::stringstream tmp_stream;
slash::MutexLock l(&partition_mu_);
tmp_stream << " Role: Master" << "\r\n";
tmp_stream << " connected_slaves: " << slaves_.size() << "\r\n";
for (size_t i = 0; i < slaves_.size(); ++i) {
std::shared_ptr<SlaveNode> slave_ptr = slaves_[i];
slash::MutexLock l(&slave_ptr->slave_mu);
tmp_stream << " slave[" << i << "]: "
<< slave_ptr->Ip() << ":" << std::to_string(slave_ptr->Port()) << "\r\n";
tmp_stream << " replication_status: " << SlaveStateMsg[slave_ptr->slave_state] << "\r\n";
if (slave_ptr->slave_state == kSlaveBinlogSync) {
std::shared_ptr<Partition> partition = g_pika_server->GetTablePartitionById(slave_ptr->TableName(), slave_ptr->PartitionId());
BinlogOffset binlog_offset;
if (!partition || !partition->GetBinlogOffset(&binlog_offset)) {
return Status::Corruption("Get Info failed.");
}
uint64_t lag = (binlog_offset.filenum - slave_ptr->acked_offset.filenum) *
g_pika_conf->binlog_file_size()
+ (binlog_offset.offset - slave_ptr->acked_offset.offset);
tmp_stream << " lag: " << lag << "\r\n";
}
}
info->append(tmp_stream.str());
return Status::OK();
}

int32_t SyncMasterPartition::GenSessionId() {
slash::MutexLock ml(&session_mu_);
return session_id_++;
Expand Down Expand Up @@ -567,6 +594,13 @@ Status SyncSlavePartition::CheckSyncTimeout(uint64_t now) {
return Status::OK();
}

Status SyncSlavePartition::GetInfo(std::string* info) {
std::string tmp_str = " Role: Slave\r\n";
tmp_str += " master: " + MasterIp() + ":" + std::to_string(MasterPort()) + "\r\n";
info->append(tmp_str);
return Status::OK();
}

void SyncSlavePartition::Activate(const RmNode& master, const ReplState& repl_state) {
slash::MutexLock l(&partition_mu_);
m_info_ = master;
Expand Down Expand Up @@ -1031,6 +1065,39 @@ Status PikaReplicaManager::CheckPartitionRole(
return Status::OK();
}

Status PikaReplicaManager::GetPartitionInfo(
const std::string& table, uint32_t partition_id, std::string* info) {
int role = 0;
std::string tmp_res;
Status s = CheckPartitionRole(table, partition_id, &role);
if (!s.ok()) {
return s;
}

slash::RWLock l(&partitions_rw_, false);
PartitionInfo p_info(table, partition_id);
if (role & PIKA_ROLE_MASTER) {
if (sync_master_partitions_.find(p_info) == sync_master_partitions_.end()) {
return Status::NotFound(table + std::to_string(partition_id) + " not found");
}
Status s = sync_master_partitions_[p_info]->GetInfo(info);
if (!s.ok()) {
return s;
}
}
if (role & PIKA_ROLE_SLAVE) {
if (sync_slave_partitions_.find(p_info) == sync_slave_partitions_.end()) {
return Status::NotFound(table + std::to_string(partition_id) + " not found");
}
Status s = sync_slave_partitions_[p_info]->GetInfo(info);
if (!s.ok()) {
return s;
}
}
info->append("\r\n");
return Status::OK();
}

Status PikaReplicaManager::SelectLocalIp(const std::string& remote_ip,
const int remote_port,
std::string* const local_ip) {
Expand Down
Loading

0 comments on commit 20040b4

Please sign in to comment.