Skip to content

Commit

Permalink
add SyncError stage for slave
Browse files Browse the repository at this point in the history
  • Loading branch information
KernelMaker committed Jul 4, 2016
1 parent 5255704 commit 2d7380c
Show file tree
Hide file tree
Showing 6 changed files with 39 additions and 8 deletions.
1 change: 1 addition & 0 deletions include/pika_define.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ struct SlaveItem {
#define PIKA_REPL_CONNECTING 2
#define PIKA_REPL_CONNECTED 3
#define PIKA_REPL_WAIT_DBSYNC 4
#define PIKA_REPL_ERROR 5

//role
#define PIKA_ROLE_SINGLE 0
Expand Down
1 change: 1 addition & 0 deletions include/pika_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ class PikaServer
void MinusMasterConnection();
void PlusMasterConnection();
bool ShouldAccessConnAsMaster(const std::string& ip);
void SyncError();
void RemoveMaster();
bool WaitingDBSync();
void NeedWaitDBSync();
Expand Down
8 changes: 6 additions & 2 deletions src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -136,10 +136,12 @@ void TrysyncCmd::Do() {
} else if (status.IsIncomplete()) {
res_.AppendString(kInnerReplWait);
} else {
res_.SetRes(CmdRes::kErrOther, status.ToString());
LOG(WARNING) << "slave offset is larger than mine, slave ip: " << ip_port << " filenum: " << filenum_ << " pro_offset_: " << pro_offset_;
res_.SetRes(CmdRes::kErrOther, "InvalidOffset");
}
} else {
res_.SetRes(CmdRes::kErrOther, "Already Exist");
LOG(WARNING) << "slave already exist, slave ip: " << ip_port;
res_.SetRes(CmdRes::kErrOther, "AlreadyExist");
}
}

Expand Down Expand Up @@ -544,12 +546,14 @@ void InfoCmd::InfoReplication(std::string &info) {
tmp_stream << "master_port:" << g_pika_server->master_port() << "\r\n";
tmp_stream << "master_link_status:" << (g_pika_server->repl_state() == PIKA_REPL_CONNECTED ? "up" : "down") << "\r\n";
tmp_stream << "slave_read_only:" << g_pika_conf->readonly() << "\r\n";
tmp_stream << "repl_state: " << (g_pika_server->repl_state()) << "\r\n";
break;
case PIKA_ROLE_MASTER | PIKA_ROLE_SLAVE :
tmp_stream << "master_host:" << g_pika_server->master_ip() << "\r\n";
tmp_stream << "master_port:" << g_pika_server->master_port() << "\r\n";
tmp_stream << "master_link_status:" << (g_pika_server->repl_state() == PIKA_REPL_CONNECTED ? "up" : "down") << "\r\n";
tmp_stream << "slave_read_only:" << g_pika_conf->readonly() << "\r\n";
tmp_stream << "repl_state: " << (g_pika_server->repl_state()) << "\r\n";
case PIKA_ROLE_SINGLE :
case PIKA_ROLE_MASTER :
tmp_stream << "connected_slaves:" << g_pika_server->GetSlaveListString(slaves_list_str) << "\r\n" << slaves_list_str;
Expand Down
19 changes: 19 additions & 0 deletions src/pika_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,25 @@ bool PikaServer::ShouldAccessConnAsMaster(const std::string& ip) {
return false;
}

void PikaServer::SyncError() {

{
slash::RWLock l(&state_protector_, true);
repl_state_ = PIKA_REPL_ERROR;
}
if (ping_thread_ != NULL) {
ping_thread_->should_exit_ = true;
int err = pthread_join(ping_thread_->thread_id(), NULL);
if (err != 0) {
std::string msg = "can't join thread " + std::string(strerror(err));
LOG(WARNING) << msg;
}
delete ping_thread_;
ping_thread_ = NULL;
}
LOG(WARNING) << "Sync error, set repl_state to PIKA_REPL_ERROR";
}

void PikaServer::RemoveMaster() {

{
Expand Down
12 changes: 6 additions & 6 deletions src/pika_trysync_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,11 @@ bool PikaTrysyncThread::RecvProc() {
}

reply = cli_->argv_[0];
LOG(INFO) << "Reply from master after trysync: " << reply;
LOG(WARNING) << "Reply from master after trysync: " << reply;
if (!is_authed && should_auth) {
if (kInnerReplOk != slash::StringToLower(reply)) {
LOG(WARNING) << "remove master";
g_pika_server->RemoveMaster();
LOG(WARNING) << "auth with master, error, come in SyncError stage";
g_pika_server->SyncError();
return false;
}
is_authed = true;
Expand All @@ -93,9 +93,9 @@ bool PikaTrysyncThread::RecvProc() {
// 3, Master do dbsyncing
LOG(INFO) << "Need wait to sync";
g_pika_server->NeedWaitDBSync();
// } else {
// LOG(WARNING) << "remove master";
// g_pika_server->RemoveMaster();
} else {
LOG(WARNING) << "something wrong with sync, come in SyncError stage";
g_pika_server->SyncError();
}
return false;
}
Expand Down
6 changes: 6 additions & 0 deletions tools/binlog_tools/binlog_parser.cc
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ int main(int argc, char *argv[]) {
exit(-1);
case 'i':
input_path = optarg;
if (input_path[input_path.length() - 1] != '/' ) {
input_path.append("/");
}
default_input_path = false;
break;
case 'c':
Expand All @@ -87,6 +90,9 @@ int main(int argc, char *argv[]) {
break;
case 'o':
output_path = optarg;
if (output_path[output_path.length() - 1] != '/' ) {
output_path.append("/");
}
default_output_path = false;
break;
case 'f':
Expand Down

0 comments on commit 2d7380c

Please sign in to comment.