forked from krareT/pika
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpika_slaveping_thread.cc
102 lines (96 loc) · 3.14 KB
/
pika_slaveping_thread.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
// Copyright (c) 2015-present, Qihoo, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#include <glog/logging.h>
#include <poll.h>
#include "pika_slaveping_thread.h"
#include "pika_server.h"
extern PikaServer* g_pika_server;
pink::Status PikaSlavepingThread::Send() {
std::string wbuf_str;
if (!is_first_send_) {
pink::RedisCli::SerializeCommand(&wbuf_str, "ping");
} else {
pink::RedisCmdArgsType argv;
argv.push_back("spci");
argv.push_back(std::to_string(sid_));
pink::RedisCli::SerializeCommand(argv, &wbuf_str);
is_first_send_ = false;
LOG(INFO) << wbuf_str;
}
return cli_->Send(&wbuf_str);
}
pink::Status PikaSlavepingThread::RecvProc() {
pink::Status s = cli_->Recv(NULL);
if (s.ok()) {
slash::StringToLower(cli_->argv_[0]);
DLOG(INFO) << "Reply from master after ping: " << cli_->argv_[0];
if (cli_->argv_[0] == "pong" || cli_->argv_[0] == "ok") {
} else {
s = pink::Status::Corruption("Reply is not pong or ok");
}
} else {
LOG(WARNING) << "RecvProc, recv error: " << s.ToString();
}
return s;
}
void* PikaSlavepingThread::ThreadMain() {
struct timeval last_interaction;
struct timeval now;
gettimeofday(&now, NULL);
last_interaction = now;
pink::Status s;
int connect_retry_times = 0;
while (!should_exit_ && g_pika_server->ShouldStartPingMaster()) {
if (!should_exit_ && (cli_->Connect(g_pika_server->master_ip(), g_pika_server->master_port() + 2000, g_pika_server->host())).ok()) {
cli_->set_send_timeout(1000);
cli_->set_recv_timeout(1000);
connect_retry_times = 0;
g_pika_server->PlusMasterConnection();
while (true) {
if (should_exit_) {
LOG(INFO) << "Close Slaveping Thread now";
cli_->Close();
g_pika_server->KillBinlogSenderConn();
break;
}
s = Send();
if (s.ok()) {
s = RecvProc();
}
if (s.ok()) {
DLOG(INFO) << "Ping master success";
gettimeofday(&last_interaction, NULL);
} else if (s.IsTimeout()) {
LOG(WARNING) << "Slaveping timeout once";
gettimeofday(&now, NULL);
if (now.tv_sec - last_interaction.tv_sec > 30) {
//timeout;
LOG(WARNING) << "Ping master timeout";
cli_->Close();
g_pika_server->KillBinlogSenderConn();
break;
}
} else {
LOG(WARNING) << "Ping master error";
cli_->Close();
g_pika_server->KillBinlogSenderConn();
break;
}
sleep(1);
}
sleep(2);
g_pika_server->MinusMasterConnection();
} else if (!should_exit_) {
LOG(WARNING) << "Slaveping, Connect timeout";
if ((++connect_retry_times) >= 30) {
LOG(WARNING) << "Slaveping, Connect timeout 10 times, disconnect with master";
cli_->Close();
g_pika_server->KillBinlogSenderConn();
connect_retry_times = 0;
}
}
}
return NULL;
}