Skip to content

Commit

Permalink
Solve syncing binlog block problem (OpenAtomFoundation#555)
Browse files Browse the repository at this point in the history
* Add binlog sent offset and acked offset into info command

* Update Pink

Solve binlog sync block problem
  • Loading branch information
whoiami authored and Axlgrep committed May 15, 2019
1 parent bcea965 commit 853c208
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 13 deletions.
2 changes: 1 addition & 1 deletion include/pika_repl_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ class PikaReplClient {
Status AddBinlogSyncCtl(const RmNode& slave, std::shared_ptr<Binlog> logger, uint32_t filenum, uint64_t offset);
Status RemoveSlave(const SlaveItem& slave);
Status RemoveBinlogSyncCtl(const RmNode& slave);
Status GetBinlogReaderStatus(const RmNode& slave, BinlogOffset* const boffset);
Status GetBinlogSyncCtlStatus(const RmNode& slave, BinlogOffset* const sent_boffset, BinlogOffset* const acked_boffset);

Status SendMetaSync();
Status SendPartitionTrySync(const std::string& table_name,
Expand Down
7 changes: 4 additions & 3 deletions src/pika_repl_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,6 @@ bool PikaReplClient::SetAckInfo(const RmNode& slave, uint32_t ack_filenum_start,
break;
}
}
//LOG(INFO) << " offset start " << ack_filenum_start << " " << ack_offset_start << " offset end " << ack_filenum_end <<" "<< ack_offset_end << " ctl offset " << ctl->ack_file_num_ << " " << ctl->ack_offset_;
ctl->active_time_ = active_time;
}
return true;
Expand All @@ -161,7 +160,7 @@ bool PikaReplClient::GetAckInfo(const RmNode& slave, uint32_t* ack_file_num, uin
return true;
}

Status PikaReplClient::GetBinlogReaderStatus(const RmNode& slave, BinlogOffset* const boffset) {
Status PikaReplClient::GetBinlogSyncCtlStatus(const RmNode& slave, BinlogOffset* const sent_boffset, BinlogOffset* const acked_boffset) {
BinlogSyncCtl* ctl = nullptr;
{
slash::RWLock l_rw(&binlog_ctl_rw_, false);
Expand All @@ -172,7 +171,9 @@ Status PikaReplClient::GetBinlogReaderStatus(const RmNode& slave, BinlogOffset*
ctl = iter->second;

slash::MutexLock l(&(ctl->ctl_mu_));
ctl->reader_->GetReaderStatus(&boffset->filenum, &boffset->offset);
ctl->reader_->GetReaderStatus(&sent_boffset->filenum, &sent_boffset->offset);
acked_boffset->filenum = ctl->ack_file_num_;
acked_boffset->offset = ctl->ack_offset_;
}
return Status::OK();
}
Expand Down
24 changes: 16 additions & 8 deletions src/pika_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -541,13 +541,14 @@ void PikaServer::PartitionSetSmallCompactionThreshold(uint32_t small_compaction_

bool PikaServer::PartitionCouldPurge(const std::string& table_name,
uint32_t partition_id, uint32_t index) {
BinlogOffset slave_boffset;
BinlogOffset sent_slave_boffset;
BinlogOffset acked_slave_boffset;
slash::MutexLock l(&slave_mutex_);
for (const auto& slave : slaves_) {
RmNode rm_node(table_name, partition_id, slave.ip, slave.port + kPortShiftReplServer);
Status s = pika_repl_client_->GetBinlogReaderStatus(rm_node, &slave_boffset);
Status s = pika_repl_client_->GetBinlogSyncCtlStatus(rm_node, &sent_slave_boffset, &acked_slave_boffset);
if (s.ok()) {
if (index >= slave_boffset.filenum) {
if (index >= acked_slave_boffset.filenum) {
return false;
}
} else {
Expand Down Expand Up @@ -707,8 +708,11 @@ int32_t PikaServer::CountSyncSlaves() {
int32_t PikaServer::GetSlaveListString(std::string& slave_list_str) {
size_t index = 0;
BinlogOffset master_boffset;
BinlogOffset slave_boffset;
BinlogOffset sent_slave_boffset;
BinlogOffset acked_slave_boffset;
std::stringstream tmp_stream;
std::stringstream sync_status_stream;
sync_status_stream << " sync points:" << "\r\n";
slash::MutexLock l(&slave_mutex_);
for (const auto& slave : slaves_) {
tmp_stream << "slave" << index++ << ":ip=" << slave.ip << ",port=" << slave.port << ",sid=" << slave.sid << ",lag=";
Expand All @@ -719,19 +723,23 @@ int32_t PikaServer::GetSlaveListString(std::string& slave_list_str) {
if (!partition || !partition->GetBinlogOffset(&master_boffset)) {
continue;
} else {
Status s = pika_repl_client_->GetBinlogReaderStatus(rm_node, &slave_boffset);
Status s = pika_repl_client_->GetBinlogSyncCtlStatus(rm_node, &sent_slave_boffset, &acked_slave_boffset);
if (s.ok()) {
uint64_t lag =
(master_boffset.filenum - slave_boffset.filenum) * g_pika_conf->binlog_file_size()
+ (master_boffset.offset - slave_boffset.offset);
tmp_stream << "(" << partition->GetPartitionName() << ":" << lag << ")";
(master_boffset.filenum - sent_slave_boffset.filenum) * g_pika_conf->binlog_file_size()
+ (master_boffset.offset - sent_slave_boffset.offset);
tmp_stream << "(" << partition->GetPartitionName() << ":" << lag << " sent " << sent_slave_boffset.filenum << " " << sent_slave_boffset.offset
<< " acked " << acked_slave_boffset.filenum << " " << acked_slave_boffset.offset << ")";
sync_status_stream << " (" << partition->GetPartitionName() << ":" << "sent " << sent_slave_boffset.filenum << " " << sent_slave_boffset.offset
<< " acked " << acked_slave_boffset.filenum << " " << acked_slave_boffset.offset<< ")" << "\r\n";
} else {
tmp_stream << "(" << partition->GetPartitionName() << ":not syncing)";
}
}
}
}
tmp_stream << "\r\n";
tmp_stream << sync_status_stream.str();
}
slave_list_str.assign(tmp_stream.str());
return index;
Expand Down
2 changes: 1 addition & 1 deletion third/pink

0 comments on commit 853c208

Please sign in to comment.