-
Notifications
You must be signed in to change notification settings - Fork 389
/
Copy pathClientSocket.cpp
208 lines (191 loc) · 6.35 KB
/
ClientSocket.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
/**
* Copyright (c) 2014-present, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree. An additional grant
* of patent rights can be found in the PATENTS file in the same directory.
*/
#include "ClientSocket.h"
#include "Reporting.h"
#include "SocketUtils.h"
#include "WdtOptions.h"
#include <glog/logging.h>
#include <sys/socket.h>
#include <poll.h>
#include <fcntl.h>
namespace facebook {
namespace wdt {
using std::string;
ClientSocket::ClientSocket(const string &dest, const string &port,
IAbortChecker const *abortChecker)
: dest_(dest), port_(port), fd_(-1), abortChecker_(abortChecker) {
memset(&sa_, 0, sizeof(sa_));
const auto &options = WdtOptions::get();
if (options.ipv6) {
sa_.ai_family = AF_INET6;
}
if (options.ipv4) {
sa_.ai_family = AF_INET;
}
sa_.ai_socktype = SOCK_STREAM;
}
ErrorCode ClientSocket::connect() {
WDT_CHECK(fd_ < 0) << "Previous connection not closed " << fd_ << " "
<< port_;
// Lookup
struct addrinfo *infoList = nullptr;
auto guard = folly::makeGuard([&] {
if (infoList) {
freeaddrinfo(infoList);
}
});
int res = getaddrinfo(dest_.c_str(), port_.c_str(), &sa_, &infoList);
if (res) {
// not errno, can't use PLOG (perror)
LOG(ERROR) << "Failed getaddrinfo " << dest_ << " , " << port_ << " : "
<< res << " : " << gai_strerror(res);
return CONN_ERROR;
}
int count = 0;
for (struct addrinfo *info = infoList; info != nullptr;
info = info->ai_next) {
++count;
std::string host, port;
SocketUtils::getNameInfo(info->ai_addr, info->ai_addrlen, host, port);
VLOG(2) << "will connect to " << host << " " << port;
fd_ = socket(info->ai_family, info->ai_socktype, info->ai_protocol);
if (fd_ == -1) {
PLOG(WARNING) << "Error making socket for port " << port_;
continue;
}
VLOG(1) << "new socket " << fd_ << " for port " << port_;
// make the socket non blocking
int sockArg = fcntl(fd_, F_GETFL, nullptr);
sockArg |= O_NONBLOCK;
int retValue = fcntl(fd_, F_SETFL, sockArg);
if (retValue == -1) {
PLOG(ERROR) << "Could not make the socket non-blocking " << port_;
this->close();
continue;
}
if (::connect(fd_, info->ai_addr, info->ai_addrlen) != 0) {
if (errno != EINPROGRESS) {
PLOG(INFO) << "Error connecting on " << host << " " << port;
this->close();
continue;
}
auto startTime = Clock::now();
int connectTimeout = WdtOptions::get().connect_timeout_millis;
while (true) {
// check for abort
if (abortChecker_->shouldAbort()) {
LOG(ERROR) << "Transfer aborted during connect " << port_ << " "
<< fd_;
this->close();
return ABORT;
}
// we need this loop because poll() can return before any file handles
// have changes or before timing out. In that case, we check whether it
// is because of EINTR or not. If true, we have to try poll with
// reduced timeout. Also we set the poll timeout to be at max equal to
// abort check interval. This allows us to check for abort regularly.
int timeElapsed = durationMillis(Clock::now() - startTime);
if (timeElapsed >= connectTimeout) {
VLOG(1) << "connect() timed out" << host << " " << port;
this->close();
return CONN_ERROR_RETRYABLE;
}
int pollTimeout =
std::min(connectTimeout - timeElapsed,
WdtOptions::get().abort_check_interval_millis);
struct pollfd pollFds[] = {{fd_, POLLOUT, 0}};
int retValue;
if ((retValue = poll(pollFds, 1, pollTimeout)) <= 0) {
if (errno == EINTR) {
VLOG(1) << "poll() call interrupted. retrying... " << port_;
continue;
}
if (retValue == 0) {
VLOG(1) << "poll() timed out " << host << " " << port;
continue;
}
PLOG(ERROR) << "poll() failed " << host << " " << port << " " << fd_;
this->close();
return CONN_ERROR;
}
break;
}
// have to check whether the connection attempt succeeded
int connectResult;
socklen_t len = sizeof(connectResult);
if (getsockopt(fd_, SOL_SOCKET, SO_ERROR, &connectResult, &len) < 0) {
PLOG(WARNING) << "getsockopt() failed";
this->close();
continue;
}
if (connectResult != 0) {
LOG(WARNING) << "connect did not succeed on " << host << " " << port
<< " : " << strerrorStr(connectResult);
this->close();
continue;
}
}
// Set to blocking mode again
sockArg = fcntl(fd_, F_GETFL, nullptr);
sockArg &= (~O_NONBLOCK);
retValue = fcntl(fd_, F_SETFL, sockArg);
if (retValue == -1) {
PLOG(ERROR) << "Could not make the socket blocking " << port_;
this->close();
continue;
}
VLOG(1) << "Successful connect on " << fd_;
sa_ = *info;
break;
}
if (fd_ < 0) {
if (count > 1) {
// Only log this if not redundant with log above (ie --ipv6=false)
LOG(INFO) << "Unable to connect to either of the " << count << " addrs";
}
return CONN_ERROR_RETRYABLE;
}
SocketUtils::setReadTimeout(fd_);
SocketUtils::setWriteTimeout(fd_);
return OK;
}
int ClientSocket::getFd() const {
VLOG(1) << "fd is " << fd_;
return fd_;
}
std::string ClientSocket::getPort() const {
return port_;
}
int ClientSocket::read(char *buf, int nbyte, bool tryFull) {
return SocketUtils::readWithAbortCheck(fd_, buf, nbyte, abortChecker_,
tryFull);
}
int ClientSocket::write(const char *buf, int nbyte, bool tryFull) {
return SocketUtils::writeWithAbortCheck(fd_, buf, nbyte, abortChecker_,
tryFull);
}
void ClientSocket::close() {
if (fd_ >= 0) {
VLOG(1) << "Closing socket : " << fd_;
if (::close(fd_) < 0) {
VLOG(1) << "Socket close failed for fd " << fd_;
}
fd_ = -1;
}
}
void ClientSocket::shutdown() {
if (::shutdown(fd_, SHUT_WR) < 0) {
VLOG(1) << "Socket shutdown failed for fd " << fd_;
}
}
ClientSocket::~ClientSocket() {
this->close();
}
}
} // end namespace facebook::wtd