Skip to content

Commit

Permalink
judge binlog could purge in pika_rm (OpenAtomFoundation#672)
Browse files Browse the repository at this point in the history
  • Loading branch information
Axlgrep authored Jul 12, 2019
1 parent b526597 commit 1eb932f
Show file tree
Hide file tree
Showing 8 changed files with 143 additions and 104 deletions.
2 changes: 0 additions & 2 deletions include/pika_admin.h
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,6 @@ class InfoCmd : public Cmd {
const static std::string kCPUSection;
const static std::string kReplicationSection;
const static std::string kKeyspaceSection;
const static std::string kLogSection;
const static std::string kDataSection;
const static std::string kDebugSection;

Expand All @@ -229,7 +228,6 @@ class InfoCmd : public Cmd {
void InfoCPU(std::string& info);
void InfoReplication(std::string& info);
void InfoKeyspace(std::string& info);
void InfoLog(std::string& info);
void InfoData(std::string& info);
void InfoDebug(std::string& info);
};
Expand Down
1 change: 0 additions & 1 deletion include/pika_define.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,6 @@ struct DBSyncArg {
};

// rm define

enum SlaveState {
kSlaveNotSync = 0,
kSlaveDbSync = 1,
Expand Down
18 changes: 16 additions & 2 deletions include/pika_rm.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ class SyncPartition {
PartitionInfo& SyncPartitionInfo() {
return partition_info_;
}
private:
protected:
// std::shared_ptr<Binlog> binlog_;
PartitionInfo partition_info_;
};
Expand All @@ -129,6 +129,9 @@ class SyncMasterPartition : public SyncPartition {
Status SetLastRecvTime(const std::string& ip, int port, uint64_t time);
Status GetLastRecvTime(const std::string& ip, int port, uint64_t* time);

Status GetSafetyPurgeBinlog(std::string* safety_purge);
bool BinlogCloudPurge(uint32_t index);

Status WakeUpSlaveBinlogSync();
Status CheckSyncTimeout(uint64_t now);

Expand Down Expand Up @@ -231,7 +234,6 @@ class PikaReplicaManager {
void Start();
void Stop();

std::shared_ptr<SyncSlavePartition> GetSyncSlavePartitionByName(const PartitionInfo& p_info);
Status AddSyncPartition(const std::set<PartitionInfo>& p_infos);
Status RemoveSyncPartition(const std::set<PartitionInfo>& p_infos);
Status SelectLocalIp(const std::string& remote_ip,
Expand All @@ -256,6 +258,18 @@ class PikaReplicaManager {
// For Pika Repl Server Thread
Status SendSlaveBinlogChipsRequest(const std::string& ip, int port, const std::vector<WriteTask>& tasks);

// For SyncMasterPartition
std::shared_ptr<SyncMasterPartition> GetSyncMasterPartitionByName(const PartitionInfo& p_info);
Status GetSafetyPurgeBinlogFromSMP(const std::string& table_name,
uint32_t partition_id, std::string* safety_purge);
bool BinlogCloudPurgeFromSMP(const std::string& table_name,
uint32_t partition_id, uint32_t index);

// For SyncSlavePartition
std::shared_ptr<SyncSlavePartition> GetSyncSlavePartitionByName(const PartitionInfo& p_info);



Status RunSyncSlavePartitionStateMachine();

Status SetMasterLastRecvTime(const RmNode& slave, uint64_t time);
Expand Down
2 changes: 0 additions & 2 deletions include/pika_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,6 @@ class PikaServer {
void PreparePartitionTrySync();
void PartitionSetMaxCacheStatisticKeys(uint32_t max_cache_statistic_keys);
void PartitionSetSmallCompactionThreshold(uint32_t small_compaction_threshold);
bool PartitionCouldPurge(const std::string& table_name,
uint32_t partition_id, uint32_t index);
bool GetTablePartitionBinlogOffset(const std::string& table_name,
uint32_t partition_id,
BinlogOffset* const boffset);
Expand Down
82 changes: 23 additions & 59 deletions src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,6 @@ const std::string InfoCmd::kExecCountSection= "command_exec_count";
const std::string InfoCmd::kCPUSection = "cpu";
const std::string InfoCmd::kReplicationSection = "replication";
const std::string InfoCmd::kKeyspaceSection = "keyspace";
const std::string InfoCmd::kLogSection = "log";
const std::string InfoCmd::kDataSection = "data";
const std::string InfoCmd::kDebugSection = "debug";

Expand Down Expand Up @@ -551,8 +550,6 @@ void InfoCmd::DoInitial() {
}
LogCommand();
return;
} else if (!strcasecmp(argv_[1].data(), kLogSection.data())) {
info_section_ = kInfoLog;
} else if (!strcasecmp(argv_[1].data(), kDataSection.data())) {
info_section_ = kInfoData;
} else if (!strcasecmp(argv_[1].data(), kDebugSection.data())) {
Expand All @@ -573,8 +570,6 @@ void InfoCmd::Do(std::shared_ptr<Partition> partition) {
info.append("\r\n");
InfoData(info);
info.append("\r\n");
InfoLog(info);
info.append("\r\n");
InfoClients(info);
info.append("\r\n");
InfoStats(info);
Expand All @@ -590,8 +585,6 @@ void InfoCmd::Do(std::shared_ptr<Partition> partition) {
info.append("\r\n");
InfoData(info);
info.append("\r\n");
InfoLog(info);
info.append("\r\n");
InfoClients(info);
info.append("\r\n");
InfoStats(info);
Expand Down Expand Up @@ -625,9 +618,6 @@ void InfoCmd::Do(std::shared_ptr<Partition> partition) {
case kInfoKeyspace:
InfoKeyspace(info);
break;
case kInfoLog:
InfoLog(info);
break;
case kInfoData:
InfoData(info);
break;
Expand Down Expand Up @@ -737,6 +727,11 @@ void InfoCmd::InfoCPU(std::string& info) {
}

void InfoCmd::InfoReplication(std::string& info) {
if (!g_pika_conf->classic_mode()) {
// In Sharding mode, we don`t show this item
return;
}

int host_role = g_pika_server->role();
std::stringstream tmp_stream;
std::stringstream out_of_sync;
Expand Down Expand Up @@ -804,6 +799,21 @@ void InfoCmd::InfoReplication(std::string& info) {
tmp_stream << "connected_slaves:" << g_pika_server->GetSlaveListString(slaves_list_str) << "\r\n" << slaves_list_str;
}


Status s;
uint32_t filenum = 0;
uint64_t offset = 0;
std::string safety_purge;
for (const auto& t_item : g_pika_server->tables_) {
slash::RWLock partition_rwl(&t_item.second->partitions_rw_, false);
for (const auto& p_item : t_item.second->partitions_) {
p_item.second->logger()->GetProducerStatus(&filenum, &offset);
tmp_stream << p_item.second->GetPartitionName() << " binlog_offset=" << filenum << " " << offset;
s = g_pika_rm->GetSafetyPurgeBinlogFromSMP(p_item.second->GetTableName(), p_item.second->GetPartitionId(), &safety_purge);
tmp_stream << ",safety_purge=" << (s.ok() ? safety_purge : "error") << "\r\n";
}
}

info.append(tmp_stream.str());
}

Expand Down Expand Up @@ -856,62 +866,16 @@ void InfoCmd::InfoKeyspace(std::string& info) {
return;
}

void InfoCmd::InfoLog(std::string& info) {
std::stringstream tmp_stream;
tmp_stream << "# Log" << "\r\n";
int64_t log_size = slash::Du(g_pika_conf->log_path());
tmp_stream << "log_size:" << log_size << "\r\n";
tmp_stream << "log_size_human:" << (log_size >> 20) << "M\r\n";
tmp_stream << "expire_logs_days:" << g_pika_conf->expire_logs_days() << "\r\n";
tmp_stream << "expire_logs_nums:" << g_pika_conf->expire_logs_nums() << "\r\n";

uint32_t filenum;
uint64_t offset;
SlaveState slave_state;
BinlogOffset sent_slave_boffset;
BinlogOffset acked_slave_boffset;
slash::RWLock table_rwl(&g_pika_server->tables_rw_, false);
for (const auto& table_item : g_pika_server->tables_) {
slash::RWLock partition_rwl(&table_item.second->partitions_rw_, false);
for (const auto& patition_item : table_item.second->partitions_) {
patition_item.second->logger()->GetProducerStatus(&filenum, &offset);
tmp_stream << patition_item.second->GetPartitionName() << " binlog_offset=" << filenum << " " << offset;

bool success = true;
uint32_t purge_max = filenum;
if (purge_max >= 10) {
purge_max -= 10; //remain some more
slash::MutexLock l(&g_pika_server->slave_mutex_);
for (const auto& slave : g_pika_server->slaves_) {
RmNode rm_node(slave.ip, slave.port,
patition_item.second->GetTableName(),
patition_item.second->GetPartitionId());
Status s = g_pika_rm->GetSyncMasterPartitionSlaveState(rm_node, &slave_state);
if (s.ok()
&& slave_state == SlaveState::kSlaveBinlogSync
&& g_pika_rm->GetSyncBinlogStatus(rm_node, &sent_slave_boffset, &acked_slave_boffset).ok()
&& sent_slave_boffset.filenum > 0) {
purge_max = (sent_slave_boffset.filenum - 1 < purge_max)
? sent_slave_boffset.filenum - 1 : purge_max;
}
}
} else {
success = false;
}
tmp_stream << ",safety_purge=" << (success ? kBinlogPrefix + std::to_string(static_cast<int32_t>(purge_max)) : "none") << "\r\n";
}
}
info.append(tmp_stream.str());
return;
}

void InfoCmd::InfoData(std::string& info) {
std::stringstream tmp_stream;

int64_t db_size = slash::Du(g_pika_conf->db_path());
tmp_stream << "# Data" << "\r\n";
tmp_stream << "db_size:" << db_size << "\r\n";
tmp_stream << "db_size_human:" << (db_size >> 20) << "M\r\n";
int64_t log_size = slash::Du(g_pika_conf->log_path());
tmp_stream << "log_size:" << log_size << "\r\n";
tmp_stream << "log_size_human:" << (log_size >> 20) << "M\r\n";
tmp_stream << "compression:" << g_pika_conf->compression() << "\r\n";

// rocksdb related memory usage
Expand Down
2 changes: 1 addition & 1 deletion src/pika_partition.cc
Original file line number Diff line number Diff line change
Expand Up @@ -628,7 +628,7 @@ bool Partition::PurgeFiles(uint32_t to, bool manual) {
&& stat(((log_path_ + it->second)).c_str(), &file_stat) == 0
&& file_stat.st_mtime < time(NULL) - g_pika_conf->expire_logs_days() * 24 * 3600)) { // Expire time trigger
// We check this every time to avoid lock when we do file deletion
if (!g_pika_server->PartitionCouldPurge(table_name_, partition_id_, it->first)) {
if (!g_pika_rm->BinlogCloudPurgeFromSMP(table_name_, partition_id_, it->first)) {
LOG(WARNING) << partition_name_ << " Could not purge "<< (it->first) << ", since it is already be used";
return false;
}
Expand Down
113 changes: 103 additions & 10 deletions src/pika_rm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,63 @@ Status SyncMasterPartition::GetLastRecvTime(const std::string& ip, int port, uin
return Status::OK();
}

Status SyncMasterPartition::GetSafetyPurgeBinlog(std::string* safety_purge) {
BinlogOffset boffset;
std::string table_name = partition_info_.table_name_;
uint32_t partition_id = partition_info_.partition_id_;
std::shared_ptr<Partition> partition =
g_pika_server->GetTablePartitionById(table_name, partition_id);
if (!partition || !partition->GetBinlogOffset(&boffset)) {
return Status::NotFound("Partition NotFound");
} else {
bool success = false;
uint32_t purge_max = boffset.filenum;
if (purge_max >= 10) {
success = true;
purge_max -= 10;
slash::MutexLock l(&partition_mu_);
for (const auto& slave : slaves_) {
if (slave->slave_state == SlaveState::kSlaveBinlogSync
&& slave->acked_offset.filenum > 0) {
purge_max = std::min(slave->acked_offset.filenum - 1, purge_max);
} else {
success = false;
break;
}
}
}
*safety_purge = (success ? kBinlogPrefix + std::to_string(static_cast<int32_t>(purge_max)) : "none");
}
return Status::OK();
}

bool SyncMasterPartition::BinlogCloudPurge(uint32_t index) {
BinlogOffset boffset;
std::string table_name = partition_info_.table_name_;
uint32_t partition_id = partition_info_.partition_id_;
std::shared_ptr<Partition> partition =
g_pika_server->GetTablePartitionById(table_name, partition_id);
if (!partition || !partition->GetBinlogOffset(&boffset)) {
return false;
} else {
if (index > boffset.filenum - 10) { // remain some more
return false;
} else {
slash::MutexLock l(&partition_mu_);
for (const auto& slave : slaves_) {
if (slave->slave_state == SlaveState::kSlaveDbSync) {
return false;
} else if (slave->slave_state == SlaveState::kSlaveBinlogSync) {
if (index >= slave->acked_offset.filenum) {
return false;
}
}
}
}
}
return true;
}

Status SyncMasterPartition::CheckSyncTimeout(uint64_t now) {
slash::MutexLock l(&partition_mu_);

Expand Down Expand Up @@ -471,7 +528,7 @@ Status SyncMasterPartition::CheckSyncTimeout(uint64_t now) {
LOG(INFO)<< SyncPartitionInfo().ToString() << " slave " << slaves_[i]->ToString();
if (node.Ip() == slaves_[i]->Ip() && node.Port() == slaves_[i]->Port()) {
slaves_.erase(slaves_.begin() + i);
LOG(INFO) << SyncPartitionInfo().ToString() << " Master del slave success " << node.ToString();
LOG(WARNING) << SyncPartitionInfo().ToString() << " Master del Recv Timeout slave success " << node.ToString();
break;
}
}
Expand Down Expand Up @@ -1313,6 +1370,51 @@ Status PikaReplicaManager::SendSlaveBinlogChipsRequest(const std::string& ip,
return pika_repl_server_->SendSlaveBinlogChips(ip, port, tasks);
}

std::shared_ptr<SyncMasterPartition>
PikaReplicaManager::GetSyncMasterPartitionByName(const PartitionInfo& p_info) {
slash::RWLock l(&partitions_rw_, false);
if (sync_master_partitions_.find(p_info) == sync_master_partitions_.end()) {
return nullptr;
}
return sync_master_partitions_[p_info];
}

Status PikaReplicaManager::GetSafetyPurgeBinlogFromSMP(const std::string& table_name,
uint32_t partition_id,
std::string* safety_purge) {
std::shared_ptr<SyncMasterPartition> master_partition =
GetSyncMasterPartitionByName(PartitionInfo(table_name, partition_id));
if (!master_partition) {
LOG(WARNING) << "Sync Master Partition: " << table_name << ":" << partition_id
<< ", NotFound";
return Status::NotFound("SyncMasterPartition NotFound");
} else {
return master_partition->GetSafetyPurgeBinlog(safety_purge);
}
}

bool PikaReplicaManager::BinlogCloudPurgeFromSMP(const std::string& table_name,
uint32_t partition_id, uint32_t index) {
std::shared_ptr<SyncMasterPartition> master_partition =
GetSyncMasterPartitionByName(PartitionInfo(table_name, partition_id));
if (!master_partition) {
LOG(WARNING) << "Sync Master Partition: " << table_name << ":" << partition_id
<< ", NotFound";
return false;
} else {
return master_partition->BinlogCloudPurge(index);
}
}

std::shared_ptr<SyncSlavePartition>
PikaReplicaManager::GetSyncSlavePartitionByName(const PartitionInfo& p_info) {
slash::RWLock l(&partitions_rw_, false);
if (sync_slave_partitions_.find(p_info) == sync_slave_partitions_.end()) {
return nullptr;
}
return sync_slave_partitions_[p_info];
}

Status PikaReplicaManager::RunSyncSlavePartitionStateMachine() {
slash::RWLock l(&partitions_rw_, false);
for (const auto& item : sync_slave_partitions_) {
Expand Down Expand Up @@ -1342,15 +1444,6 @@ Status PikaReplicaManager::RunSyncSlavePartitionStateMachine() {
return Status::OK();
}

std::shared_ptr<SyncSlavePartition>
PikaReplicaManager::GetSyncSlavePartitionByName(const PartitionInfo& p_info) {
slash::RWLock l(&partitions_rw_, false);
if (sync_slave_partitions_.find(p_info) == sync_slave_partitions_.end()) {
return nullptr;
}
return sync_slave_partitions_[p_info];
}

Status PikaReplicaManager::AddSyncPartition(
const std::set<PartitionInfo>& p_infos) {
slash::RWLock l(&partitions_rw_, true);
Expand Down
Loading

0 comments on commit 1eb932f

Please sign in to comment.