Skip to content

Commit

Permalink
nghttpx: Receive ECN
Browse files Browse the repository at this point in the history
  • Loading branch information
tatsuhiro-t committed Nov 5, 2021
1 parent 94372fb commit 47c33b8
Show file tree
Hide file tree
Showing 13 changed files with 113 additions and 42 deletions.
5 changes: 3 additions & 2 deletions src/shrpx_client_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -292,11 +292,12 @@ int ClientHandler::write_tls() {
#ifdef ENABLE_HTTP3
int ClientHandler::read_quic(const UpstreamAddr *faddr,
const Address &remote_addr,
const Address &local_addr, const uint8_t *data,
const Address &local_addr,
const ngtcp2_pkt_info &pi, const uint8_t *data,
size_t datalen) {
auto upstream = static_cast<Http3Upstream *>(upstream_.get());

return upstream->on_read(faddr, remote_addr, local_addr, data, datalen);
return upstream->on_read(faddr, remote_addr, local_addr, pi, data, datalen);
}

int ClientHandler::write_quic() { return upstream_->on_write(); }
Expand Down
3 changes: 2 additions & 1 deletion src/shrpx_client_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,8 @@ class ClientHandler {
#ifdef ENABLE_HTTP3
void setup_http3_upstream(std::unique_ptr<Http3Upstream> &&upstream);
int read_quic(const UpstreamAddr *faddr, const Address &remote_addr,
const Address &local_addr, const uint8_t *data, size_t datalen);
const Address &local_addr, const ngtcp2_pkt_info &pi,
const uint8_t *data, size_t datalen);
int write_quic();
#endif // ENABLE_HTTP3

Expand Down
33 changes: 18 additions & 15 deletions src/shrpx_connection_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1017,12 +1017,10 @@ void ConnectionHandler::set_enable_acceptor_on_ocsp_completion(bool f) {
}

#ifdef ENABLE_HTTP3
int ConnectionHandler::forward_quic_packet(const UpstreamAddr *faddr,
const Address &remote_addr,
const Address &local_addr,
const uint8_t *cid_prefix,
const uint8_t *data,
size_t datalen) {
int ConnectionHandler::forward_quic_packet(
const UpstreamAddr *faddr, const Address &remote_addr,
const Address &local_addr, const ngtcp2_pkt_info &pi,
const uint8_t *cid_prefix, const uint8_t *data, size_t datalen) {
assert(!get_config()->single_thread);

for (auto &worker : workers_) {
Expand All @@ -1034,7 +1032,7 @@ int ConnectionHandler::forward_quic_packet(const UpstreamAddr *faddr,
WorkerEvent wev{};
wev.type = WorkerEventType::QUIC_PKT_FORWARD;
wev.quic_pkt = std::make_unique<QUICPacket>(faddr->index, remote_addr,
local_addr, data, datalen);
local_addr, pi, data, datalen);

worker->send(std::move(wev));

Expand Down Expand Up @@ -1111,10 +1109,11 @@ void ConnectionHandler::set_quic_lingering_worker_processes(

int ConnectionHandler::forward_quic_packet_to_lingering_worker_process(
QUICLingeringWorkerProcess *quic_lwp, const Address &remote_addr,
const Address &local_addr, const uint8_t *data, size_t datalen) {
const Address &local_addr, const ngtcp2_pkt_info &pi, const uint8_t *data,
size_t datalen) {
std::array<uint8_t, 512> header;

assert(header.size() >= 1 + 1 + 1 + sizeof(sockaddr_storage) * 2);
assert(header.size() >= 1 + 1 + 1 + 1 + sizeof(sockaddr_storage) * 2);
assert(remote_addr.len > 0);
assert(local_addr.len > 0);

Expand All @@ -1127,6 +1126,7 @@ int ConnectionHandler::forward_quic_packet_to_lingering_worker_process(
*p++ = static_cast<uint8_t>(local_addr.len - 1);
p = std::copy_n(reinterpret_cast<const uint8_t *>(&local_addr.su),
local_addr.len, p);
*p++ = pi.ecn;

iovec msg_iov[] = {
{
Expand Down Expand Up @@ -1185,14 +1185,14 @@ int ConnectionHandler::quic_ipc_read() {
return 0;
}

size_t len = 1 + 1 + 1;
size_t len = 1 + 1 + 1 + 1;

// Wire format:
// TYPE(1) REMOTE_ADDRLEN(1) REMOTE_ADDR(N) LOCAL_ADDRLEN(1) REMOTE_ADDR(N)
// DGRAM_PAYLAOD(N)
// TYPE(1) REMOTE_ADDRLEN(1) REMOTE_ADDR(N) LOCAL_ADDRLEN(1) LOCAL_ADDR(N)
// ECN(1) DGRAM_PAYLOAD(N)
//
// When encoding, REMOTE_ADDRLEN and LOCAL_ADDRLEN is decremented by
// 1.
// When encoding, REMOTE_ADDRLEN and LOCAL_ADDRLEN are decremented
// by 1.
if (static_cast<size_t>(nread) < len) {
return 0;
}
Expand Down Expand Up @@ -1249,6 +1249,8 @@ int ConnectionHandler::quic_ipc_read() {

p += local_addrlen;

pkt->pi.ecn = *p++;

auto datalen = nread - (p - buf.data());

pkt->data.assign(p, p + datalen);
Expand Down Expand Up @@ -1288,7 +1290,8 @@ int ConnectionHandler::quic_ipc_read() {

// Ignore return value
quic_conn_handler->handle_packet(faddr, pkt->remote_addr, pkt->local_addr,
pkt->data.data(), pkt->data.size());
pkt->pi, pkt->data.data(),
pkt->data.size());

return 0;
}
Expand Down
8 changes: 5 additions & 3 deletions src/shrpx_connection_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -196,8 +196,9 @@ class ConnectionHandler {
const std::vector<SSL_CTX *> &get_quic_indexed_ssl_ctx(size_t idx) const;

int forward_quic_packet(const UpstreamAddr *faddr, const Address &remote_addr,
const Address &local_addr, const uint8_t *cid_prefix,
const uint8_t *data, size_t datalen);
const Address &local_addr, const ngtcp2_pkt_info &pi,
const uint8_t *cid_prefix, const uint8_t *data,
size_t datalen);

void set_quic_keying_materials(std::shared_ptr<QUICKeyingMaterials> qkms);
const std::shared_ptr<QUICKeyingMaterials> &get_quic_keying_materials() const;
Expand All @@ -218,7 +219,8 @@ class ConnectionHandler {

int forward_quic_packet_to_lingering_worker_process(
QUICLingeringWorkerProcess *quic_lwp, const Address &remote_addr,
const Address &local_addr, const uint8_t *data, size_t datalen);
const Address &local_addr, const ngtcp2_pkt_info &pi, const uint8_t *data,
size_t datalen);

void set_quic_ipc_fd(int fd);

Expand Down
5 changes: 2 additions & 3 deletions src/shrpx_http3_upstream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1632,10 +1632,9 @@ void Http3Upstream::cancel_premature_downstream(

int Http3Upstream::on_read(const UpstreamAddr *faddr,
const Address &remote_addr,
const Address &local_addr, const uint8_t *data,
size_t datalen) {
const Address &local_addr, const ngtcp2_pkt_info &pi,
const uint8_t *data, size_t datalen) {
int rv;
ngtcp2_pkt_info pi{};

auto path = ngtcp2_path{
{
Expand Down
3 changes: 2 additions & 1 deletion src/shrpx_http3_upstream.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ class Http3Upstream : public Upstream {
const ngtcp2_cid *odcid, const uint8_t *token, size_t tokenlen);

int on_read(const UpstreamAddr *faddr, const Address &remote_addr,
const Address &local_addr, const uint8_t *data, size_t datalen);
const Address &local_addr, const ngtcp2_pkt_info &pi,
const uint8_t *data, size_t datalen);

int write_streams();

Expand Down
15 changes: 9 additions & 6 deletions src/shrpx_quic_connection_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ QUICConnectionHandler::~QUICConnectionHandler() {
int QUICConnectionHandler::handle_packet(const UpstreamAddr *faddr,
const Address &remote_addr,
const Address &local_addr,
const ngtcp2_pkt_info &pi,
const uint8_t *data, size_t datalen) {
int rv;
uint32_t version;
Expand Down Expand Up @@ -98,7 +99,7 @@ int QUICConnectionHandler::handle_packet(const UpstreamAddr *faddr,
if (cwit != std::end(close_waits_)) {
auto cw = (*cwit).second;

cw->handle_packet(faddr, remote_addr, local_addr, data, datalen);
cw->handle_packet(faddr, remote_addr, local_addr, pi, data, datalen);

return 0;
}
Expand All @@ -115,7 +116,7 @@ int QUICConnectionHandler::handle_packet(const UpstreamAddr *faddr,
if (cwit != std::end(close_waits_)) {
auto cw = (*cwit).second;

cw->handle_packet(faddr, remote_addr, local_addr, data, datalen);
cw->handle_packet(faddr, remote_addr, local_addr, pi, data, datalen);

return 0;
}
Expand Down Expand Up @@ -147,7 +148,7 @@ int QUICConnectionHandler::handle_packet(const UpstreamAddr *faddr,
decrypted_dcid.data(), decrypted_dcid.size());
if (quic_lwp) {
if (conn_handler->forward_quic_packet_to_lingering_worker_process(
quic_lwp, remote_addr, local_addr, data, datalen) == 0) {
quic_lwp, remote_addr, local_addr, pi, data, datalen) == 0) {
return 0;
}

Expand Down Expand Up @@ -310,7 +311,7 @@ int QUICConnectionHandler::handle_packet(const UpstreamAddr *faddr,
std::begin(decrypted_dcid) + SHRPX_QUIC_CID_PREFIXLEN,
worker_->get_cid_prefix())) {
if (conn_handler->forward_quic_packet(faddr, remote_addr, local_addr,
decrypted_dcid.data(), data,
pi, decrypted_dcid.data(), data,
datalen) == 0) {
return 0;
}
Expand All @@ -333,7 +334,8 @@ int QUICConnectionHandler::handle_packet(const UpstreamAddr *faddr,
handler = (*it).second;
}

if (handler->read_quic(faddr, remote_addr, local_addr, data, datalen) != 0) {
if (handler->read_quic(faddr, remote_addr, local_addr, pi, data, datalen) !=
0) {
delete handler;
return 0;
}
Expand Down Expand Up @@ -708,7 +710,8 @@ CloseWait::~CloseWait() {

int CloseWait::handle_packet(const UpstreamAddr *faddr,
const Address &remote_addr,
const Address &local_addr, const uint8_t *data,
const Address &local_addr,
const ngtcp2_pkt_info &pi, const uint8_t *data,
size_t datalen) {
if (pkt.empty()) {
return 0;
Expand Down
8 changes: 4 additions & 4 deletions src/shrpx_quic_connection_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ struct CloseWait {
~CloseWait();

int handle_packet(const UpstreamAddr *faddr, const Address &remote_addr,
const Address &local_addr, const uint8_t *data,
size_t datalen);
const Address &local_addr, const ngtcp2_pkt_info &pi,
const uint8_t *data, size_t datalen);

Worker *worker;
// Source Connection IDs of the connection.
Expand All @@ -82,8 +82,8 @@ class QUICConnectionHandler {
QUICConnectionHandler(Worker *worker);
~QUICConnectionHandler();
int handle_packet(const UpstreamAddr *faddr, const Address &remote_addr,
const Address &local_addr, const uint8_t *data,
size_t datalen);
const Address &local_addr, const ngtcp2_pkt_info &pi,
const uint8_t *data, size_t datalen);
// Send Retry packet. |ini_dcid| is the destination Connection ID
// which appeared in Client Initial packet and its length is
// |dcidlen|. |ini_scid| is the source Connection ID which appeared
Expand Down
12 changes: 9 additions & 3 deletions src/shrpx_quic_listener.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ void QUICListener::on_read() {
msg.msg_iov = &msg_iov;
msg.msg_iovlen = 1;

uint8_t msg_ctrl[CMSG_SPACE(sizeof(in6_pktinfo))];
uint8_t
msg_ctrl[CMSG_SPACE(sizeof(uint8_t)) + CMSG_SPACE(sizeof(in6_pktinfo))];
msg.msg_control = msg_ctrl;

auto quic_conn_handler = worker_->get_quic_connection_handler();
Expand All @@ -83,11 +84,16 @@ void QUICListener::on_read() {

util::set_port(local_addr, faddr_->port);

ngtcp2_pkt_info pi{
.ecn = util::msghdr_get_ecn(&msg, su.storage.ss_family),
};

if (LOG_ENABLED(INFO)) {
LOG(INFO) << "QUIC received packet: local="
<< util::to_numeric_addr(&local_addr)
<< " remote=" << util::to_numeric_addr(&su.sa, msg.msg_namelen)
<< " " << nread << " bytes";
<< " ecn=" << log::hex << pi.ecn << log::dec << " " << nread
<< " bytes";
}

if (nread == 0) {
Expand All @@ -98,7 +104,7 @@ void QUICListener::on_read() {
remote_addr.su = su;
remote_addr.len = msg.msg_namelen;

quic_conn_handler->handle_packet(faddr_, remote_addr, local_addr,
quic_conn_handler->handle_packet(faddr_, remote_addr, local_addr, pi,
buf.data(), nread);
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/shrpx_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,7 @@ void Worker::process_events() {

quic_conn_handler_.handle_packet(
faddr, wev.quic_pkt->remote_addr, wev.quic_pkt->local_addr,
wev.quic_pkt->data.data(), wev.quic_pkt->data.size());
wev.quic_pkt->pi, wev.quic_pkt->data.data(), wev.quic_pkt->data.size());

break;
}
Expand Down Expand Up @@ -844,7 +844,7 @@ int Worker::create_quic_server_socket(UpstreamAddr &faddr) {
}
}

// TODO Enable ECN
util::fd_set_recv_ecn(fd, faddr.family);

if (bind(fd, rp->ai_addr, rp->ai_addrlen) == -1) {
auto error = errno;
Expand Down
7 changes: 5 additions & 2 deletions src/shrpx_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -258,15 +258,18 @@ struct WorkerStat {
#ifdef ENABLE_HTTP3
struct QUICPacket {
QUICPacket(size_t upstream_addr_index, const Address &remote_addr,
const Address &local_addr, const uint8_t *data, size_t datalen)
const Address &local_addr, const ngtcp2_pkt_info &pi,
const uint8_t *data, size_t datalen)
: upstream_addr_index{upstream_addr_index},
remote_addr{remote_addr},
local_addr{local_addr},
pi{pi},
data{data, data + datalen} {}
QUICPacket() {}
QUICPacket() : upstream_addr_index{}, remote_addr{}, local_addr{}, pi{} {}
size_t upstream_addr_index;
Address remote_addr;
Address local_addr;
ngtcp2_pkt_info pi;
std::vector<uint8_t> data;
};
#endif // ENABLE_HTTP3
Expand Down
48 changes: 48 additions & 0 deletions src/util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1722,6 +1722,54 @@ int msghdr_get_local_addr(Address &dest, msghdr *msg, int family) {

return -1;
}

unsigned int msghdr_get_ecn(msghdr *msg, int family) {
switch (family) {
case AF_INET:
for (auto cmsg = CMSG_FIRSTHDR(msg); cmsg; cmsg = CMSG_NXTHDR(msg, cmsg)) {
if (cmsg->cmsg_level == IPPROTO_IP && cmsg->cmsg_type == IP_TOS &&
cmsg->cmsg_len) {
return *reinterpret_cast<uint8_t *>(CMSG_DATA(cmsg));
}
}

return 0;
case AF_INET6:
for (auto cmsg = CMSG_FIRSTHDR(msg); cmsg; cmsg = CMSG_NXTHDR(msg, cmsg)) {
if (cmsg->cmsg_level == IPPROTO_IPV6 && cmsg->cmsg_type == IPV6_TCLASS &&
cmsg->cmsg_len) {
return *reinterpret_cast<uint8_t *>(CMSG_DATA(cmsg));
}
}

return 0;
}

return 0;
}

int fd_set_recv_ecn(int fd, int family) {
unsigned int tos = 1;

switch (family) {
case AF_INET:
if (setsockopt(fd, IPPROTO_IP, IP_RECVTOS, &tos,
static_cast<socklen_t>(sizeof(tos))) == -1) {
return -1;
}

return 0;
case AF_INET6:
if (setsockopt(fd, IPPROTO_IPV6, IPV6_RECVTCLASS, &tos,
static_cast<socklen_t>(sizeof(tos))) == -1) {
return -1;
}

return 0;
}

return -1;
}
#endif // ENABLE_HTTP3

} // namespace util
Expand Down
4 changes: 4 additions & 0 deletions src/util.h
Original file line number Diff line number Diff line change
Expand Up @@ -918,6 +918,10 @@ int daemonize(int nochdir, int noclose);

#ifdef ENABLE_HTTP3
int msghdr_get_local_addr(Address &dest, msghdr *msg, int family);

unsigned int msghdr_get_ecn(msghdr *msg, int family);

int fd_set_recv_ecn(int fd, int family);
#endif // ENABLE_HTTP3

} // namespace util
Expand Down

0 comments on commit 47c33b8

Please sign in to comment.