Skip to content

Commit

Permalink
[C] Add recvfrom, recvmsg, recvmmsg, and sendmmsg to ping pong raw sa…
Browse files Browse the repository at this point in the history
…mple.
  • Loading branch information
mikeb01 committed Jan 25, 2022
1 parent e4661b9 commit 70f8969
Show file tree
Hide file tree
Showing 2 changed files with 194 additions and 24 deletions.
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ add_definitions(-DAERON_VERSION_PATCH=${aeron_VERSION_PATCH})

# all UNIX-based platform compiler flags
if (UNIX)
add_compile_options(-Wall -Wpedantic -Wextra -Wno-unused-parameter)
add_compile_options(-Wall -Wpedantic -Wextra -Wno-unused-parameter -g)

if (CMAKE_COMPILER_IS_GNUCXX AND CMAKE_CXX_COMPILER_VERSION VERSION_GREATER "11.0")
add_compile_options(-Wno-error=maybe-uninitialized)
Expand Down
216 changes: 193 additions & 23 deletions aeron-samples/src/main/c/raw/ping_pong_raw.c
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,15 @@
#define AERON_RAW_DEFAULT_PING_PORT (13334)
#define AERON_RAW_DEFAULT_PONG_HOST "127.0.0.1"
#define AERON_RAW_DEFAULT_PONG_PORT (13335)
#define AERON_RAW_DEFAULT_TRANSPORT_TYPE (1)
#define AERON_RAW_DEFAULT_TRANSMIT_TYPE (1)
#define AERON_RAW_DEFAULT_RECEIVE_TYPE (1)

const char usage_str[] =
"[-h][-v][-h host][-p port][-H host][-P port][-m messages][-w messages]\n"
" -? help\n"
" -s run in echo server mode\n"
" -t transport type (1=connect/sendto, 2=sendto, 3=sendmsg)\n"
" -t transmit type (1=connect/sendto, 2=sendto, 3=sendmsg, 4=sendmmsg)\n"
" -r receive type (1=recvfrom, 2=recvmsg, 3=recvmmsg)\n"
" -h host ping host (default 127.0.0.1)\n"
" -p port ping port (default 13334)\n"
" -H host pong host (default 127.0.0.1)\n"
Expand All @@ -69,6 +71,13 @@ typedef int (*aeron_ping_pong_raw_send)(
void *buffer,
size_t buffer_len);

typedef int (*aeron_ping_pong_raw_recv)(
int recv_fd,
struct sockaddr *addr,
socklen_t addr_len,
void *buffer,
size_t buffer_len);

int aeron_ping_pong_raw_sendto_connected(
int send_fd,
const struct sockaddr *addr,
Expand Down Expand Up @@ -137,6 +146,126 @@ int aeron_ping_pong_raw_sendmsg(
return sendmsg_result;
}

int aeron_ping_pong_raw_sendmmsg(
int send_fd,
const struct sockaddr *addr,
socklen_t addr_len,
void *buffer,
size_t buffer_len)
{
struct mmsghdr send_msghdr = { 0 };
struct iovec send_iov = { 0 };
send_msghdr.msg_hdr.msg_iov = &send_iov;
send_iov.iov_base = buffer;
send_iov.iov_len = buffer_len;
send_msghdr.msg_hdr.msg_control = NULL;
send_msghdr.msg_hdr.msg_controllen = 0;
send_msghdr.msg_hdr.msg_name = (void *)addr;
send_msghdr.msg_hdr.msg_namelen = addr_len;
send_msghdr.msg_hdr.msg_iovlen = 1;

int sendmsg_result = sendmmsg(send_fd, &send_msghdr, 1, 0);
if (sendmsg_result < 0)
{
fprintf(stderr, "failed sendmsg(send_fd, &send_msghdr, 0), %s\n", strerror(errno));
return -1;
}

return send_msghdr.msg_len;
}

int aeron_ping_pong_raw_recvmsg(
int recv_fd,
struct sockaddr *addr,
socklen_t addr_len,
void *buffer,
size_t buffer_len)
{
struct msghdr msghdr = { 0 };
struct iovec iov = { 0 };
iov.iov_base = buffer;
iov.iov_len = buffer_len;
msghdr.msg_iov = &iov;
msghdr.msg_iovlen = 1;
msghdr.msg_name = addr;
msghdr.msg_namelen = addr_len;

ssize_t recvmsg_result = recvmsg(recv_fd, &msghdr, 0);
if (recvmsg_result < 0)
{
if (errno == EAGAIN)
{
return 0;
}
else
{
fprintf(stderr, "failed recvmsg(recv_fd, &msghdr, 0), %s\n", strerror(errno));
return -1;
}
}

return recvmsg_result;
}

int aeron_ping_pong_raw_recvfrom(
int recv_fd,
struct sockaddr *addr,
socklen_t addr_len,
void *buffer,
size_t buffer_len)
{
ssize_t recvmsg_result = recvfrom(recv_fd, buffer, buffer_len, 0, addr, &addr_len);
if (recvmsg_result < 0)
{
if (errno == EAGAIN)
{
return 0;
}
else
{
fprintf(stderr, "failed recvmsg(recv_fd, &msghdr, 0), %s\n", strerror(errno));
return -1;
}
}

return recvmsg_result;
}

int aeron_ping_pong_raw_recvmmsg(
int recv_fd,
struct sockaddr *addr,
socklen_t addr_len,
void *buffer,
size_t buffer_len)
{
struct mmsghdr mmsghdr = { 0 };
struct iovec iov = { 0 };
iov.iov_base = buffer;
iov.iov_len = buffer_len;
mmsghdr.msg_hdr.msg_iov = &iov;
mmsghdr.msg_hdr.msg_iovlen = 1;
mmsghdr.msg_hdr.msg_name = addr;
mmsghdr.msg_hdr.msg_namelen = addr_len;
mmsghdr.msg_len = 0;
struct timespec tv = { .tv_nsec = 0, .tv_sec = 0 };

ssize_t recvmsg_result = recvmmsg(recv_fd, &mmsghdr, 1, 0, &tv);
if (recvmsg_result < 0)
{
if (errno == EAGAIN)
{
return 0;
}
else
{
fprintf(stderr, "failed recvmsg(recv_fd, &msghdr, 0), %s\n", strerror(errno));
return -1;
}
}

return mmsghdr.msg_len;
}

int aeron_ping_pong_raw_socket_connect(
int send_fd,
const struct sockaddr *addr,
Expand Down Expand Up @@ -168,9 +297,11 @@ struct aeron_ping_pong_config_stct
long warmup_messages;
bool show_help;
bool is_server;
int transport_type;
int transmit_type;
int receive_type;
aeron_ping_pong_raw_connect connect_func;
aeron_ping_pong_raw_send send_func;
aeron_ping_pong_raw_recv recv_func;
};
typedef struct aeron_ping_pong_config_stct aeron_ping_pong_config_t;

Expand All @@ -186,9 +317,10 @@ int aeron_ping_pong_parse_config(int argc, char **argv, aeron_ping_pong_config_t
pong_addr->sin_family = AF_INET;
inet_pton(AF_INET, AERON_RAW_DEFAULT_PONG_HOST, &pong_addr->sin_addr);
pong_addr->sin_port = htons(AERON_RAW_DEFAULT_PONG_PORT);
config->transport_type = AERON_RAW_DEFAULT_TRANSPORT_TYPE;
config->transmit_type = AERON_RAW_DEFAULT_TRANSMIT_TYPE;
config->receive_type = AERON_RAW_DEFAULT_RECEIVE_TYPE;

while ((opt = getopt(argc, argv, "?h:p:H:P:m:w:t:s")) != -1)
while ((opt = getopt(argc, argv, "?h:p:H:P:m:w:t:r:s")) != -1)
{
switch (opt)
{
Expand Down Expand Up @@ -222,7 +354,13 @@ int aeron_ping_pong_parse_config(int argc, char **argv, aeron_ping_pong_config_t

case 't':
{
config->transport_type = atoi(optarg);
config->transmit_type = atoi(optarg);
break;
}

case 'r':
{
config->receive_type = atoi(optarg);
break;
}

Expand All @@ -232,28 +370,57 @@ int aeron_ping_pong_parse_config(int argc, char **argv, aeron_ping_pong_config_t
}
}

switch (config->transport_type)
switch (config->transmit_type)
{
case 1:
config->connect_func = aeron_ping_pong_raw_socket_connect;
config->send_func = aeron_ping_pong_raw_sendto_connected;
printf("Transport: connect/sendto\n");
printf("Transmit: connect/sendto\n");
break;

case 2:
config->connect_func = aeron_ping_pong_raw_socket_null_connect;
config->send_func = aeron_ping_pong_raw_sendto_unconnected;
printf("Transport: sendto\n");
printf("Transmit: sendto\n");
break;

case 3:
config->connect_func = aeron_ping_pong_raw_socket_null_connect;
config->send_func = aeron_ping_pong_raw_sendmsg;
printf("Transport: sendmsg\n");
printf("Transmit: sendmsg\n");
break;

case 4:
config->connect_func = aeron_ping_pong_raw_socket_null_connect;
config->send_func = aeron_ping_pong_raw_sendmmsg;
printf("Transmit: sendmmsg\n");
break;


default:
fprintf(stderr, "Invalid transmit type: %d\n", config->transmit_type);
return -1;
}

switch (config->receive_type)
{
case 1:
config->recv_func = aeron_ping_pong_raw_recvmsg;
printf("Receive: recvmsg\n");
break;

case 2:
config->recv_func = aeron_ping_pong_raw_recvfrom;
printf("Receive: recvfrom\n");
break;

case 3:
config->recv_func = aeron_ping_pong_raw_recvmmsg;
printf("Receive: recvmmsg\n");
break;

default:
fprintf(stderr, "Invalid transport type: %d\n", config->transport_type);
fprintf(stderr, "Invalid receive type: %d\n", config->receive_type);
return -1;
}

Expand Down Expand Up @@ -286,13 +453,14 @@ int recv_then_send(aeron_ping_pong_config_t *config, int send_fd, int recv_fd)
iov.iov_len = sizeof(buf);
msghdr.msg_iov = &iov;
msghdr.msg_iovlen = 1;
struct sockaddr_in addr = { 0 };
socklen_t addr_len = sizeof(addr);

while (true)
{
ssize_t recvmsg_result = recvmsg(recv_fd, &msghdr, 0);
if (recvmsg_result < 0 && errno != EAGAIN)
ssize_t recvmsg_result = config->recv_func(recv_fd, (struct sockaddr *)&addr, addr_len, buf, sizeof(buf));
if (recvmsg_result < 0)
{
fprintf(stderr, "failed recvmsg(recv_fd, &msghdr, 0), %s\n", strerror(errno));
return -1;
}
else if (0 < recvmsg_result)
Expand All @@ -314,14 +482,10 @@ int send_then_recv(aeron_ping_pong_config_t *config, int send_fd, int recv_fd, l
{
int8_t send_buf[32];
int8_t buf[64 * 1024];
struct msghdr msghdr = { 0 };
struct iovec iov = { 0 };
iov.iov_base = buf;
iov.iov_len = sizeof(buf);
msghdr.msg_iov = &iov;
msghdr.msg_iovlen = 1;
struct timespec send_ts;
struct timespec recv_ts;
struct sockaddr_in addr = { 0 };
socklen_t addr_len = sizeof(addr);

for (int i = 0; i < messages; i++)
{
Expand All @@ -338,10 +502,9 @@ int send_then_recv(aeron_ping_pong_config_t *config, int send_fd, int recv_fd, l

do
{
ssize_t recvmsg_result = recvmsg(recv_fd, &msghdr, 0);
if (recvmsg_result < 0 && EAGAIN != errno)
int recvmsg_result = config->recv_func(recv_fd, (struct sockaddr*)&addr, addr_len, buf, sizeof(buf));
if (recvmsg_result < 0)
{
fprintf(stderr, "failed (send) recvmsg(recv_fd, &msghdr, 0), %s\n", strerror(errno));
return -1;
}
else if (recvmsg_result > 0)
Expand All @@ -364,6 +527,13 @@ int main(int argc, char **argv)
aeron_ping_pong_config_t config;
memset(&config, 0, sizeof(config));
aeron_ping_pong_parse_config(argc, argv, &config);

if (config.show_help)
{
printf("%s\n", usage_str);
return 1;
}

struct sockaddr_in *send_addr = config.is_server ? (struct sockaddr_in *)&config.pong_host : (struct sockaddr_in *)&config.ping_host;
struct sockaddr_in *recv_addr = config.is_server ? (struct sockaddr_in *)&config.ping_host : (struct sockaddr_in *)&config.pong_host;
struct hdr_histogram *histogram;
Expand Down

0 comments on commit 70f8969

Please sign in to comment.