Skip to content

Commit

Permalink
Merge pull request grpc#13975 from danzh2010/adjustrcvbuf
Browse files Browse the repository at this point in the history
change udp_server receive/send buffer size and set SO_RXQ_OVFL
  • Loading branch information
yang-g authored Jan 12, 2018
2 parents 81c0e38 + a06720b commit a01e040
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 19 deletions.
37 changes: 26 additions & 11 deletions src/core/lib/iomgr/udp_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@
#define _GNU_SOURCE
#endif

#ifndef SO_RXQ_OVFL
#define SO_RXQ_OVFL 40
#endif

#include "src/core/lib/iomgr/port.h"

#ifdef GRPC_POSIX_SOCKET
Expand Down Expand Up @@ -280,11 +284,10 @@ static int bind_socket(grpc_socket_factory* socket_factory, int sockfd,

/* Prepare a recently-created socket for listening. */
static int prepare_socket(grpc_socket_factory* socket_factory, int fd,
const grpc_resolved_address* addr) {
const grpc_resolved_address* addr, int rcv_buf_size,
int snd_buf_size) {
grpc_resolved_address sockname_temp;
struct sockaddr* addr_ptr = (struct sockaddr*)addr->addr;
/* Set send/receive socket buffers to 1 MB */
int buffer_size_bytes = 1024 * 1024;

if (fd < 0) {
goto error;
Expand Down Expand Up @@ -325,18 +328,25 @@ static int prepare_socket(grpc_socket_factory* socket_factory, int fd,
goto error;
}

if (grpc_set_socket_sndbuf(fd, buffer_size_bytes) != GRPC_ERROR_NONE) {
if (grpc_set_socket_sndbuf(fd, snd_buf_size) != GRPC_ERROR_NONE) {
gpr_log(GPR_ERROR, "Failed to set send buffer size to %d bytes",
buffer_size_bytes);
snd_buf_size);
goto error;
}

if (grpc_set_socket_rcvbuf(fd, buffer_size_bytes) != GRPC_ERROR_NONE) {
if (grpc_set_socket_rcvbuf(fd, rcv_buf_size) != GRPC_ERROR_NONE) {
gpr_log(GPR_ERROR, "Failed to set receive buffer size to %d bytes",
buffer_size_bytes);
rcv_buf_size);
goto error;
}

{
int get_overflow = 1;
if (0 != setsockopt(fd, SOL_SOCKET, SO_RXQ_OVFL, &get_overflow,
sizeof(get_overflow))) {
gpr_log(GPR_INFO, "Failed to set socket overflow support");
}
}
return grpc_sockaddr_get_port(&sockname_temp);

error:
Expand Down Expand Up @@ -451,6 +461,7 @@ static void on_write(void* arg, grpc_error* error) {

static int add_socket_to_server(grpc_udp_server* s, int fd,
const grpc_resolved_address* addr,
int rcv_buf_size, int snd_buf_size,
grpc_udp_server_start_cb start_cb,
grpc_udp_server_read_cb read_cb,
grpc_udp_server_write_cb write_cb,
Expand All @@ -460,7 +471,8 @@ static int add_socket_to_server(grpc_udp_server* s, int fd,
char* addr_str;
char* name;

port = prepare_socket(s->socket_factory, fd, addr);
port =
prepare_socket(s->socket_factory, fd, addr, rcv_buf_size, snd_buf_size);
if (port >= 0) {
grpc_sockaddr_to_string(&addr_str, addr, 1);
gpr_asprintf(&name, "udp-server-listener:%s", addr_str);
Expand Down Expand Up @@ -495,6 +507,7 @@ static int add_socket_to_server(grpc_udp_server* s, int fd,

int grpc_udp_server_add_port(grpc_udp_server* s,
const grpc_resolved_address* addr,
int rcv_buf_size, int snd_buf_size,
grpc_udp_server_start_cb start_cb,
grpc_udp_server_read_cb read_cb,
grpc_udp_server_write_cb write_cb,
Expand Down Expand Up @@ -545,8 +558,9 @@ int grpc_udp_server_add_port(grpc_udp_server* s,
// TODO(rjshade): Test and propagate the returned grpc_error*:
GRPC_ERROR_UNREF(grpc_create_dualstack_socket_using_factory(
s->socket_factory, addr, SOCK_DGRAM, IPPROTO_UDP, &dsmode, &fd));
allocated_port1 = add_socket_to_server(s, fd, addr, start_cb, read_cb,
write_cb, orphan_cb);
allocated_port1 =
add_socket_to_server(s, fd, addr, rcv_buf_size, snd_buf_size, start_cb,
read_cb, write_cb, orphan_cb);
if (fd >= 0 && dsmode == GRPC_DSMODE_DUALSTACK) {
goto done;
}
Expand All @@ -569,7 +583,8 @@ int grpc_udp_server_add_port(grpc_udp_server* s,
addr = &addr4_copy;
}
allocated_port2 =
add_socket_to_server(s, fd, addr, start_cb, read_cb, write_cb, orphan_cb);
add_socket_to_server(s, fd, addr, rcv_buf_size, snd_buf_size, start_cb,
read_cb, write_cb, orphan_cb);

done:
gpr_free(allocated_addr);
Expand Down
1 change: 1 addition & 0 deletions src/core/lib/iomgr/udp_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ int grpc_udp_server_get_fd(grpc_udp_server* s, unsigned port_index);
all of the multiple socket port matching logic in one place */
int grpc_udp_server_add_port(grpc_udp_server* s,
const grpc_resolved_address* addr,
int rcv_buf_size, int snd_buf_size,
grpc_udp_server_start_cb start_cb,
grpc_udp_server_read_cb read_cb,
grpc_udp_server_write_cb write_cb,
Expand Down
23 changes: 15 additions & 8 deletions test/core/iomgr/udp_server_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ static int g_number_of_bytes_read = 0;
static int g_number_of_orphan_calls = 0;
static int g_number_of_starts = 0;

int rcv_buf_size = 1024;
int snd_buf_size = 1024;

static void on_start(grpc_fd* emfd, void* user_data) { g_number_of_starts++; }

static bool on_read(grpc_fd* emfd) {
Expand Down Expand Up @@ -177,8 +180,9 @@ static void test_no_op_with_port(void) {
memset(&resolved_addr, 0, sizeof(resolved_addr));
resolved_addr.len = sizeof(struct sockaddr_in);
addr->sin_family = AF_INET;
GPR_ASSERT(grpc_udp_server_add_port(s, &resolved_addr, on_start, on_read,
on_write, on_fd_orphaned));
GPR_ASSERT(grpc_udp_server_add_port(s, &resolved_addr, rcv_buf_size,
snd_buf_size, on_start, on_read, on_write,
on_fd_orphaned));

grpc_udp_server_destroy(s, nullptr);

Expand Down Expand Up @@ -207,8 +211,9 @@ static void test_no_op_with_port_and_socket_factory(void) {
memset(&resolved_addr, 0, sizeof(resolved_addr));
resolved_addr.len = sizeof(struct sockaddr_in);
addr->sin_family = AF_INET;
GPR_ASSERT(grpc_udp_server_add_port(s, &resolved_addr, on_start, on_read,
on_write, on_fd_orphaned));
GPR_ASSERT(grpc_udp_server_add_port(s, &resolved_addr, rcv_buf_size,
snd_buf_size, on_start, on_read, on_write,
on_fd_orphaned));
GPR_ASSERT(socket_factory->number_of_socket_calls == 1);
GPR_ASSERT(socket_factory->number_of_bind_calls == 1);

Expand All @@ -233,8 +238,9 @@ static void test_no_op_with_port_and_start(void) {
memset(&resolved_addr, 0, sizeof(resolved_addr));
resolved_addr.len = sizeof(struct sockaddr_in);
addr->sin_family = AF_INET;
GPR_ASSERT(grpc_udp_server_add_port(s, &resolved_addr, on_start, on_read,
on_write, on_fd_orphaned));
GPR_ASSERT(grpc_udp_server_add_port(s, &resolved_addr, rcv_buf_size,
snd_buf_size, on_start, on_read, on_write,
on_fd_orphaned));

grpc_udp_server_start(s, nullptr, 0, nullptr);
GPR_ASSERT(g_number_of_starts == 1);
Expand Down Expand Up @@ -265,8 +271,9 @@ static void test_receive(int number_of_clients) {
memset(&resolved_addr, 0, sizeof(resolved_addr));
resolved_addr.len = sizeof(struct sockaddr_storage);
addr->ss_family = AF_INET;
GPR_ASSERT(grpc_udp_server_add_port(s, &resolved_addr, on_start, on_read,
on_write, on_fd_orphaned));
GPR_ASSERT(grpc_udp_server_add_port(s, &resolved_addr, rcv_buf_size,
snd_buf_size, on_start, on_read, on_write,
on_fd_orphaned));

svrfd = grpc_udp_server_get_fd(s, 0);
GPR_ASSERT(svrfd >= 0);
Expand Down

0 comments on commit a01e040

Please sign in to comment.