Skip to content

Commit

Permalink
Dev:set limit of total binlog size in sync window (OpenAtomFoundation…
Browse files Browse the repository at this point in the history
  • Loading branch information
kernelai authored and whoiami committed Mar 20, 2020
1 parent 0bcb64b commit 596281b
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 9 deletions.
21 changes: 13 additions & 8 deletions include/pika_slave_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,26 +14,27 @@

struct SyncWinItem {
BinlogOffset offset_;
std::size_t binlog_size_;
bool acked_;
bool operator==(const SyncWinItem& other) const {
if (offset_.filenum == other.offset_.filenum && offset_.offset == other.offset_.offset) {
return true;
}
return false;
return offset_.filenum == other.offset_.filenum && offset_.offset == other.offset_.offset;
}
explicit SyncWinItem(const BinlogOffset& offset) : offset_(offset), acked_(false) {
explicit SyncWinItem(const BinlogOffset& offset)
: offset_(offset), binlog_size_(0), acked_(false) {
}
SyncWinItem(uint32_t filenum, uint64_t offset) : offset_(filenum, offset), acked_(false) {
SyncWinItem(uint32_t filenum, uint64_t offset, std::size_t binglog_size)
: offset_(filenum, offset), binlog_size_(binglog_size), acked_(false) {
}
std::string ToString() const {
return offset_.ToString() + " acked: " + std::to_string(acked_);
return offset_.ToString() + " binglog size: " + std::to_string(binlog_size_) +
" acked: " + std::to_string(acked_);
}
};


class SyncWindow {
public:
SyncWindow() {
SyncWindow() :total_size_(0) {
}
void Push(const SyncWinItem& item);
bool Update(const SyncWinItem& start_item, const SyncWinItem& end_item, BinlogOffset* acked_offset);
Expand All @@ -49,9 +50,13 @@ class SyncWindow {
return res;
}
}
std::size_t GetTotalBinglogSize() {
return total_size_;
}
private:
// TODO(whoiami) ring buffer maybe
std::deque<SyncWinItem> win_;
std::size_t total_size_;
};

// role master use
Expand Down
9 changes: 8 additions & 1 deletion src/pika_rm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,11 @@ Status SyncMasterPartition::ReadBinlogFileToWq(const std::shared_ptr<SlaveNode>&
std::string msg;
uint32_t filenum;
uint64_t offset;
if (slave_ptr->sync_win.GetTotalBinglogSize() > PIKA_MAX_CONN_RBUF_HB * 2) {
LOG(INFO) << slave_ptr->ToString() << " total binlog size in sync window is :"
<< slave_ptr->sync_win.GetTotalBinglogSize();
break;
}
Status s = reader->Get(&msg, &filenum, &offset);
if (s.IsEndFile()) {
break;
Expand All @@ -160,7 +165,7 @@ Status SyncMasterPartition::ReadBinlogFileToWq(const std::shared_ptr<SlaveNode>&
<< " Read Binlog error : " << s.ToString();
return s;
}
slave_ptr->sync_win.Push(SyncWinItem(filenum, offset));
slave_ptr->sync_win.Push(SyncWinItem(filenum, offset, msg.size()));

BinlogOffset sent_offset = BinlogOffset(filenum, offset);
slave_ptr->sent_offset = sent_offset;
Expand Down Expand Up @@ -578,6 +583,7 @@ std::string SyncSlavePartition::LocalIp() {

void SyncWindow::Push(const SyncWinItem& item) {
win_.push_back(item);
total_size_ += item.binlog_size_;
}

bool SyncWindow::Update(const SyncWinItem& start_item,
Expand All @@ -601,6 +607,7 @@ bool SyncWindow::Update(const SyncWinItem& start_item,
}
for (size_t i = start_pos; i <= end_pos; ++i) {
win_[i].acked_ = true;
total_size_ -= win_[i].binlog_size_;
}
while (!win_.empty()) {
if (win_[0].acked_) {
Expand Down

0 comments on commit 596281b

Please sign in to comment.