Skip to content

Commit

Permalink
proxy protocol v2
Browse files Browse the repository at this point in the history
  • Loading branch information
wodesuck committed Nov 24, 2016
1 parent 63f81b1 commit 024ce65
Show file tree
Hide file tree
Showing 10 changed files with 341 additions and 13 deletions.
1 change: 1 addition & 0 deletions phxrpc_package_config/tools/etc_template/phxsqlproxy.conf
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ SlaveForkProcCnt=1
SlaveWorkerThread=30
SlaveIORoutineCnt=100
MasterEnableReadPort=1
ProxyProtocol=2

IP = $InnerIP
Port = 54321
Expand Down
218 changes: 208 additions & 10 deletions phxsqlproxy/io_routine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,9 @@ void IORoutine::ClearVariablesAndStatus() {
connect_dest_ = "";
connect_port_ = 0;
client_ip_ = "";
client_port_ = 0;
server_ip_ = "";
server_port_ = 0;
req_uniq_id_ = 0;
last_sent_request_timestamp_ = 0;
last_received_request_timestamp_ = 0;
Expand Down Expand Up @@ -233,7 +236,7 @@ void IORoutine::SetClientFD(int fd) {
ClearAll();
req_uniq_id_ = ((uint64_t)(random_engine()) << 32) | random_engine();
client_fd_ = fd;
int ret1 = GetSockName(fd, listen_ip_, listen_port_);
int ret1 = GetSockName(fd, server_ip_, server_port_);
int ret2 = GetPeerName(fd, client_ip_, client_port_);
if (ret1 == 0 && ret2 == 0) {
LogVerbose("requniqid %llu receive connect from [%s:%d]",
Expand Down Expand Up @@ -349,20 +352,37 @@ int IORoutine::run() {
while (true) {
//first connect to master mysql to finish auth
if (sqlsvr_fd_ == -1) {
SetNoDelay(client_fd_);
if (server_port_ == GetWorkerConfig()->proxy_port_) {
if (!GetGroupStatusCache()->IsMember(client_ip_)) {
break;
}

int ret = ProcessProxyHeader(client_fd_);
if (ret < 0) {
LogError("process proxy header err: %d", ret);
break;
}
LogVerbose("process proxy header succ: %s:%d %s:%d",
client_ip_.c_str(), client_port_, server_ip_.c_str(), server_port_);
}

int fd = ConnectDest();
if (fd <= 0) {
break;
}
sqlsvr_fd_ = fd;
SetNoDelay(client_fd_);
SetNoDelay(sqlsvr_fd_);

if (listen_port_ == GetWorkerConfig()->port_) {
string proxy_line = "PROXY TCP4 " + client_ip_ + " " + listen_ip_ + " " +
UIntToStr(client_port_) + " " + UIntToStr(listen_port_) + "\r\n";
int ret = WriteToDest(sqlsvr_fd_, proxy_line.c_str(), proxy_line.size());
if (config_->ProxyProtocol()) {
string proxy_header;
int ret = MakeProxyHeader(proxy_header);
if (ret < 0) {
LogError("make proxy header err: %d", ret);
break;
}
ret = WriteToDest(sqlsvr_fd_, proxy_header.c_str(), proxy_header.size());
if (ret < 0) {
LogError("send proxy line error: %s", proxy_line.c_str());
break;
}
}
Expand Down Expand Up @@ -416,7 +436,7 @@ int IORoutine::run() {

if (byte_size_tot == 0) {
// MasterEnableReadPort=0
if (connect_port_ == GetWorkerConfig()->proxy_port_ && !GetWorkerConfig()->is_master_port_) {
if (connect_port_ != config_->GetMysqlPort() && !GetWorkerConfig()->is_master_port_) {
LogVerbose("%s:%d requniqid %llu mark %s failure", __func__, __LINE__, req_uniq_id_,
connect_dest_.c_str());
GetGroupStatusCache()->MarkFailure(connect_dest_);
Expand Down Expand Up @@ -503,7 +523,7 @@ int MasterIORoutine::GetDestEndpoint(std::string & dest_ip, int & dest_port) {
dest_port = config_->GetMysqlPort();
} else {
dest_ip = master_ip;
dest_port = GetWorkerConfig()->proxy_port_;
dest_port = config_->ProxyProtocol() ? GetWorkerConfig()->proxy_port_ : GetWorkerConfig()->port_;
}

LogVerbose("%s:%d requniqid %llu ret ip [%s] port [%d]", __func__, __LINE__, req_uniq_id_, dest_ip.c_str(),
Expand Down Expand Up @@ -575,7 +595,7 @@ int SlaveIORoutine::GetDestEndpoint(std::string & dest_ip, int & dest_port) {
req_uniq_id_, master_ip.c_str());
return -__LINE__;
}
dest_port = GetWorkerConfig()->proxy_port_;
dest_port = config_->ProxyProtocol() ? GetWorkerConfig()->proxy_port_ : GetWorkerConfig()->port_;
} else {
dest_ip = "127.0.0.1";
dest_port = config_->GetMysqlPort();
Expand Down Expand Up @@ -622,4 +642,182 @@ WorkerConfig_t * SlaveIORoutine::GetWorkerConfig() {
return config_->GetSlaveWorkerConfig();
}

int IORoutine::ProcessProxyHeader(int fd) {
union {
ProxyHdrV1_t v1;
ProxyHdrV2_t v2;
} hdr;

int ret = RoutinePeekWithTimeout(fd, (char *)&hdr, sizeof(hdr), config_->ProxyProtocolTimeoutMs());
if (ret < 0) {
return ret;
}

if (ret >= 16 && memcmp(&hdr.v2, PP2_SIGNATURE, 12) == 0 && (hdr.v2.ver_cmd & 0xF0) == PP2_VERSION) {
ret = ProcessProxyHeaderV2(hdr.v2, ret);
} else if (ret >= 8 && memcmp(hdr.v1.line, "PROXY ", 6) == 0) {
ret = ProcessProxyHeaderV1(hdr.v1.line, ret);
} else {
return -__LINE__;
}

if (ret > 0) {
ret = RoutineReadWithTimeout(fd, (char *)&hdr, ret, config_->ProxyProtocolTimeoutMs());
}
return ret;
}

int IORoutine::ProcessProxyHeaderV1(char * hdr, int read_count) {
char * end = (char *)memchr(hdr, '\r', read_count - 1);
int size = end - hdr + 2;
if (!end || *(end + 1) != '\n') {
return -__LINE__;
}

vector<string> tokens = SplitStr(string(hdr, end), " ");
if (tokens[1] == "UNKNOWN") {
return size;
}
if (tokens.size() != 6) {
return -__LINE__;
}

const string & src_ip = tokens[2];
const string & dst_ip = tokens[3];
if (tokens[1] == "TCP4") {
struct in_addr addr;
if (inet_pton(AF_INET, src_ip.c_str(), &addr) <= 0) {
return -__LINE__;
}
if (inet_pton(AF_INET, dst_ip.c_str(), &addr) <= 0) {
return -__LINE__;
}
} else if (tokens[1] == "TCP6") {
struct in6_addr addr;
if (inet_pton(AF_INET6, src_ip.c_str(), &addr) <= 0) {
return -__LINE__;
}
if (inet_pton(AF_INET6, dst_ip.c_str(), &addr) <= 0) {
return -__LINE__;
}
}

int src_port = atoi(tokens[4].c_str());
int dst_port = atoi(tokens[5].c_str());
if (src_port < 0 || src_port > 65535) {
return -__LINE__;
}
if (dst_port < 0 || dst_port > 65535) {
return -__LINE__;
}

client_ip_ = src_ip;
server_ip_ = dst_ip;
client_port_ = src_port;
server_port_ = dst_port;

return size;
}

int IORoutine::ProcessProxyHeaderV2(const ProxyHdrV2_t & hdr, int read_count) {
int size = 16 + ntohs(hdr.len);
if (read_count < size) { // truncated or too large header
return -__LINE__;
}
char src_ip[INET6_ADDRSTRLEN] = { 0 };
char dst_ip[INET6_ADDRSTRLEN] = { 0 };
switch (hdr.ver_cmd & 0xF) {
case PP2_CMD_PROXY:
switch (hdr.fam) {
case PP2_FAM_TCP4:
if (inet_ntop(AF_INET, &hdr.addr.ip4.src_addr, src_ip, sizeof(src_ip)) == NULL) {
return -__LINE__;
}
if (inet_ntop(AF_INET, &hdr.addr.ip4.dst_addr, dst_ip, sizeof(dst_ip)) == NULL) {
return -__LINE__;
}
client_ip_ = string(src_ip);
server_ip_ = string(dst_ip);
client_port_ = ntohs(hdr.addr.ip4.src_port);
server_port_ = ntohs(hdr.addr.ip4.dst_port);
return size;
case PP2_FAM_TCP6:
if (inet_ntop(AF_INET6, &hdr.addr.ip6.src_addr, src_ip, sizeof(src_ip)) == NULL) {
return -__LINE__;
}
if (inet_ntop(AF_INET6, &hdr.addr.ip6.dst_addr, dst_ip, sizeof(dst_ip)) == NULL) {
return -__LINE__;
}
client_ip_ = string(src_ip);
server_ip_ = string(dst_ip);
client_port_ = ntohs(hdr.addr.ip6.src_port);
server_port_ = ntohs(hdr.addr.ip6.dst_port);
return size;
case PP2_FAM_UNSPEC:
return size; // unknown protocol, keep local connection address
default:
return -__LINE__; // unsupport protocol
}
case PP2_CMD_LOCAL:
return size; // keep local connection address for LOCAL
default:
return -__LINE__; // unsupport command
}
return -__LINE__;
}

int IORoutine::MakeProxyHeader(string & header) {
switch (config_->ProxyProtocol()) {
case 1:
return MakeProxyHeaderV1(header);
case 2:
return MakeProxyHeaderV2(header);
}
return -__LINE__;
}

int IORoutine::MakeProxyHeaderV1(string & header) {
header = "PROXY ";
struct in_addr addr;
struct in6_addr addr6;
if (inet_pton(AF_INET, client_ip_.c_str(), &addr) == 1 &&
inet_pton(AF_INET, server_ip_.c_str(), &addr) == 1) {
header += "TCP4 ";
} else if (inet_pton(AF_INET6, client_ip_.c_str(), &addr6) == 1 &&
inet_pton(AF_INET6, server_ip_.c_str(), &addr6) == 1) {
header += "TCP6 ";
} else {
return -__LINE__;
}
header += client_ip_ + " " + server_ip_ + " " + UIntToStr(client_port_) + " " + UIntToStr(server_port_) + "\r\n";
return 0;
}

int IORoutine::MakeProxyHeaderV2(string & header) {
ProxyHdrV2_t hdr;
memcpy(hdr.sig, PP2_SIGNATURE, sizeof(hdr.sig));
hdr.ver_cmd = PP2_VERSION | PP2_CMD_PROXY;
if (inet_pton(AF_INET, client_ip_.c_str(), &hdr.addr.ip4.src_addr) == 1 &&
inet_pton(AF_INET, server_ip_.c_str(), &hdr.addr.ip4.dst_addr) == 1)
{
hdr.addr.ip4.src_port = htons(client_port_);
hdr.addr.ip4.dst_port = htons(server_port_);
hdr.fam = PP2_FAM_TCP4;
hdr.len = htons(sizeof(hdr.addr.ip4));
header = string((char *)&hdr, 16 + sizeof(hdr.addr.ip4));
return 0;
}
if (inet_pton(AF_INET6, client_ip_.c_str(), &hdr.addr.ip6.src_addr) == 1 &&
inet_pton(AF_INET6, server_ip_.c_str(), &hdr.addr.ip6.dst_addr) == 1)
{
hdr.addr.ip6.src_port = htons(client_port_);
hdr.addr.ip6.dst_port = htons(server_port_);
hdr.fam = PP2_FAM_TCP6;
hdr.len = htons(sizeof(hdr.addr.ip6));
header = string((char *)&hdr, 16 + sizeof(hdr.addr.ip6));
return 0;
}
return -__LINE__;
}

}
14 changes: 12 additions & 2 deletions phxsqlproxy/io_routine.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

#include "co_routine.h"
#include "phxcoroutine.h"
#include "proxy_protocol.h"
#include <string>
#include <stack>

Expand Down Expand Up @@ -72,6 +73,15 @@ class IORoutine : public Coroutine {
void GetDBNameFromAuthBuf(const char * buf, int buf_size);

void GetDBNameFromReqBuf(const char * buf, int buf_size);

private:
int ProcessProxyHeader(int fd);
int ProcessProxyHeaderV1(char * hdr, int read_count);
int ProcessProxyHeaderV2(const ProxyHdrV2_t & hdr, int read_count);
int MakeProxyHeader(std::string & header);
int MakeProxyHeaderV1(std::string & header);
int MakeProxyHeaderV2(std::string & header);

protected:
uint64_t req_uniq_id_;

Expand All @@ -88,10 +98,10 @@ class IORoutine : public Coroutine {

int client_fd_;
int sqlsvr_fd_;
std::string listen_ip_;
int listen_port_;
std::string client_ip_;
int client_port_;
std::string server_ip_;
int server_port_;
uint64_t last_received_request_timestamp_;
uint64_t last_sent_request_timestamp_;
int last_read_fd_;
Expand Down
24 changes: 24 additions & 0 deletions phxsqlproxy/phxcoroutine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -177,5 +177,29 @@ int Coroutine::RoutineWriteWithTimeout(int dest_fd, const char * buf, int write_
return written_once;
}

int Coroutine::RoutinePeekWithTimeout(int source_fd, char * buf, int buf_size, int timeout_ms) {
assert(IsNonBlock(source_fd));
struct pollfd pf[1];
int nfds = 0;
memset(pf, 0, sizeof(pf));
pf[0].fd = source_fd;
pf[0].events = (POLLIN | POLLERR | POLLHUP);
nfds++;

int return_fd_count = co_poll(co_get_epoll_ct(), pf, nfds, timeout_ms);
if (return_fd_count < 0) {
return return_fd_count;
}

if (pf[0].revents & POLLIN) {
return recv(source_fd, buf, buf_size, MSG_PEEK);
} else if (pf[0].revents & POLLHUP) {
return return_fd_count;
} else if (pf[0].revents & POLLERR) {
return return_fd_count;
}
return return_fd_count;
}

}

4 changes: 3 additions & 1 deletion phxsqlproxy/phxcoroutine.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ class Coroutine {

static int RoutineWriteWithTimeout(int dest_fd, const char * buf, int write_size, int timeout_ms);

static int RoutineReadWithTimeout(int source_fd, char * buf, int write_size, int timeout_ms);
static int RoutineReadWithTimeout(int source_fd, char * buf, int buf_size, int timeout_ms);

static int RoutinePeekWithTimeout(int source_fd, char * buf, int buf_size, int timeout_ms);

private:
stCoRoutine_t * routine_;
Expand Down
10 changes: 10 additions & 0 deletions phxsqlproxy/phxsqlproxyconfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ void PHXSqlProxyConfig::ReadConfig() {
sleep_ = GetInteger("Server", "Sleep", 0);
connect_timeout_ms_ = GetInteger("Server", "ConnectTimeoutMs", 200);
write_timeout_ms_ = GetInteger("Server", "WriteTimeoutMs", 1000);
proxy_protocol_ = GetInteger("Server", "ProxyProtocol", 0);
proxy_protocol_timeout_ms_ = GetInteger("Server", "ProxyProtocolTimeoutMs", 1000);

/*
phxsql_config_ = PhxMySqlConfig :: GetDefault();
Expand Down Expand Up @@ -154,4 +156,12 @@ uint32_t PHXSqlProxyConfig::WriteTimeoutMs() {
return write_timeout_ms_;
}

int PHXSqlProxyConfig::ProxyProtocol() {
return proxy_protocol_;
}

uint32_t PHXSqlProxyConfig::ProxyProtocolTimeoutMs() {
return proxy_protocol_timeout_ms_;
}

}
Loading

0 comments on commit 024ce65

Please sign in to comment.