forked from OpenAtomFoundation/pika
-
Notifications
You must be signed in to change notification settings - Fork 2
/
pika_slave_node.cc
109 lines (94 loc) · 3.52 KB
/
pika_slave_node.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
// Copyright (c) 2015-present, Qihoo, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#include "include/pika_slave_node.h"
#include "include/pika_conf.h"
using pstd::Status;
extern std::unique_ptr<PikaConf> g_pika_conf;
/* SyncWindow */
void SyncWindow::Push(const SyncWinItem& item) {
win_.push_back(item);
total_size_ += item.binlog_size_;
}
bool SyncWindow::Update(const SyncWinItem& start_item, const SyncWinItem& end_item, LogOffset* acked_offset) {
size_t start_pos = win_.size();
size_t end_pos = win_.size();
for (size_t i = 0; i < win_.size(); ++i) {
if (win_[i] == start_item) {
start_pos = i;
}
if (win_[i] == end_item) {
end_pos = i;
break;
}
}
if (start_pos == win_.size() || end_pos == win_.size()) {
LOG(WARNING) << "Ack offset Start: " << start_item.ToString() << "End: " << end_item.ToString()
<< " not found in binlog controller window." << std::endl
<< "window status " << std::endl
<< ToStringStatus();
return false;
}
for (size_t i = start_pos; i <= end_pos; ++i) {
win_[i].acked_ = true;
total_size_ -= win_[i].binlog_size_;
}
while (!win_.empty()) {
if (win_[0].acked_) {
*acked_offset = win_[0].offset_;
win_.pop_front();
} else {
break;
}
}
return true;
}
int SyncWindow::Remaining() {
std::size_t remaining_size = g_pika_conf->sync_window_size() - win_.size();
return static_cast<int>(remaining_size > 0 ? remaining_size : 0);
}
/* SlaveNode */
SlaveNode::SlaveNode(const std::string& ip, int port, const std::string& db_name, uint32_t slot_id,
int session_id)
: RmNode(ip, port, db_name, slot_id, session_id)
{}
SlaveNode::~SlaveNode() = default;
Status SlaveNode::InitBinlogFileReader(const std::shared_ptr<Binlog>& binlog, const BinlogOffset& offset) {
binlog_reader = std::make_shared<PikaBinlogReader>();
int res = binlog_reader->Seek(binlog, offset.filenum, offset.offset);
if (res != 0) {
return Status::Corruption(ToString() + " binlog reader init failed");
}
return Status::OK();
}
std::string SlaveNode::ToStringStatus() {
std::stringstream tmp_stream;
tmp_stream << " Slave_state: " << SlaveStateMsg[slave_state] << "\r\n";
tmp_stream << " Binlog_sync_state: " << BinlogSyncStateMsg[b_state] << "\r\n";
tmp_stream << " Sync_window: "
<< "\r\n"
<< sync_win.ToStringStatus();
tmp_stream << " Sent_offset: " << sent_offset.ToString() << "\r\n";
tmp_stream << " Acked_offset: " << acked_offset.ToString() << "\r\n";
tmp_stream << " Binlog_reader activated: " << (binlog_reader != nullptr) << "\r\n";
return tmp_stream.str();
}
Status SlaveNode::Update(const LogOffset& start, const LogOffset& end, LogOffset* updated_offset) {
if (slave_state != kSlaveBinlogSync) {
return Status::Corruption(ToString() + "state not BinlogSync");
}
*updated_offset = LogOffset();
bool res = sync_win.Update(SyncWinItem(start), SyncWinItem(end), updated_offset);
if (!res) {
return Status::Corruption("UpdateAckedInfo failed");
}
if (*updated_offset == LogOffset()) {
// nothing to update return current acked_offset
*updated_offset = acked_offset;
return Status::OK();
}
// update acked_offset
acked_offset = *updated_offset;
return Status::OK();
}