Skip to content

Commit

Permalink
[C] Deal with message loss on the recv_func callback used by the unde…
Browse files Browse the repository at this point in the history
…rlying transport and update test. Implement partial configuration for the transport binding to support configuration via environment/command line.
  • Loading branch information
mikeb01 committed Dec 3, 2019
1 parent bc70428 commit 189fec7
Show file tree
Hide file tree
Showing 4 changed files with 136 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,14 @@

#include <errno.h>
#include <string.h>
#include "aeron_alloc.h"
#include "util/aeron_error.h"
#include "util/aeron_dlopen.h"

#include "protocol/aeron_udp_protocol.h"
#include "aeron_udp_channel_transport_bindings.h"
#include "aeron_udp_channel_transport.h"
#include "aeron_udp_channel_transport_loss.h"
#include "aeron_udp_transport_poller.h"

aeron_udp_channel_transport_bindings_t aeron_udp_channel_transport_bindings_default =
Expand All @@ -44,6 +47,22 @@ aeron_udp_channel_transport_bindings_t aeron_udp_channel_transport_bindings_defa
aeron_udp_transport_poller_poll
};

aeron_udp_channel_transport_bindings_t aeron_udp_channel_transport_bindings_loss =
{
aeron_udp_channel_transport_init,
aeron_udp_channel_transport_close,
aeron_udp_channel_transport_loss_recvmmsg,
aeron_udp_channel_transport_loss_sendmmsg,
aeron_udp_channel_transport_loss_sendmsg,
aeron_udp_channel_transport_get_so_rcvbuf,
aeron_udp_channel_transport_bind_addr_and_port,
aeron_udp_transport_poller_init,
aeron_udp_transport_poller_close,
aeron_udp_transport_poller_add,
aeron_udp_transport_poller_remove,
aeron_udp_transport_poller_poll
};

aeron_udp_channel_transport_bindings_t *aeron_udp_channel_transport_bindings_load(const char *bindings_name)
{
aeron_udp_channel_transport_bindings_t *bindings = NULL;
Expand All @@ -58,6 +77,22 @@ aeron_udp_channel_transport_bindings_t *aeron_udp_channel_transport_bindings_loa
{
return aeron_udp_channel_transport_bindings_load("aeron_udp_channel_transport_bindings_default");
}
else if (strncmp(bindings_name, "loss", sizeof("loss")) == 0)
{
bindings = aeron_udp_channel_transport_bindings_load("aeron_udp_channel_transport_bindings_loss");
const aeron_udp_channel_transport_bindings_t *delegate_bindings =
aeron_udp_channel_transport_bindings_load("default");
aeron_udp_channel_transport_loss_params_t* loss_params;
aeron_alloc((void **)&loss_params, sizeof(aeron_udp_channel_transport_loss_params_t));

// TODO: Default until configuration is implemented.
loss_params->rate = 0.2;
loss_params->seed = 123123;
loss_params->recv_msg_type_mask = 1U << (unsigned int)AERON_HDR_TYPE_DATA;
loss_params->send_msg_type_mask = 0;

aeron_udp_channel_transport_loss_init(delegate_bindings, loss_params);
}
else
{
if ((bindings = (aeron_udp_channel_transport_bindings_t *)aeron_dlsym(RTLD_DEFAULT, bindings_name)) == NULL)
Expand Down
95 changes: 61 additions & 34 deletions aeron-driver/src/main/c/media/aeron_udp_channel_transport_loss.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@

#include <stdlib.h>
#include <string.h>
#include <protocol/aeron_udp_protocol.h>
#include "aeron_windows.h"
#include <stdio.h>

#include "concurrent/aeron_atomic.h"
#include "protocol/aeron_udp_protocol.h"
#include "aeron_windows.h"
#include "aeron_udp_channel_transport_loss.h"

#if !defined(HAVE_STRUCT_MMSGHDR)
Expand All @@ -34,25 +36,22 @@ struct mmsghdr
};
#endif

static aeron_udp_channel_transport_bindings_t* delegate = NULL;
static aeron_udp_channel_transport_loss_params_t *params = NULL;
static const aeron_udp_channel_transport_bindings_t* delegate = NULL;
static const aeron_udp_channel_transport_loss_params_t *params = NULL;
static unsigned short data_loss_xsubi[3];

static bool aeron_udp_channel_transport_loss_should_drop_frame(
const struct mmsghdr *mmsg,
const double rate,
const unsigned int msg_type_mask)
typedef struct aeron_udp_channel_transport_loss_clientd_stct
{
const aeron_frame_header_t *frame_header = (aeron_frame_header_t *)(mmsg->msg_hdr.msg_iov[0].iov_base);
const unsigned int msg_type_bit = 1U << (unsigned int)frame_header->type;
const bool msg_type_matches_mask = (msg_type_bit & msg_type_mask) != 0;

return 0.0 < rate && msg_type_matches_mask && (aeron_erand48(data_loss_xsubi) <= rate);
void *original_clientd;
aeron_udp_transport_recv_func_t original_recv_func;
int64_t bytes_dropped;
int messages_dropped;
}
aeron_udp_channel_transport_loss_clientd_t;

int aeron_udp_channel_transport_loss_init(
aeron_udp_channel_transport_bindings_t *delegate_bindings,
aeron_udp_channel_transport_loss_params_t *loss_params)
const aeron_udp_channel_transport_bindings_t *delegate_bindings,
const aeron_udp_channel_transport_loss_params_t *loss_params)
{
delegate = delegate_bindings;
params = loss_params;
Expand All @@ -64,6 +63,38 @@ int aeron_udp_channel_transport_loss_init(
return 0;
}

static bool aeron_udp_channel_transport_loss_should_drop_frame(
const uint8_t *buffer,
const double rate,
const unsigned int msg_type_mask)
{
const aeron_frame_header_t *frame_header = (aeron_frame_header_t *)buffer;
const unsigned int msg_type_bit = 1U << (unsigned int)frame_header->type;
const bool msg_type_matches_mask = (msg_type_bit & msg_type_mask) != 0;

return 0.0 < rate && msg_type_matches_mask && (aeron_erand48(data_loss_xsubi) <= rate);
}

static void aeron_udp_channel_transport_loss_recv_callback(
void *clientd,
void *transport_clientd,
uint8_t *buffer,
size_t length,
struct sockaddr_storage *addr)
{
aeron_udp_channel_transport_loss_clientd_t* loss_clientd = clientd;

if (aeron_udp_channel_transport_loss_should_drop_frame(buffer, params->rate, params->recv_msg_type_mask))
{
loss_clientd->bytes_dropped += length;
loss_clientd->messages_dropped++;
}
else
{
loss_clientd->original_recv_func(loss_clientd->original_clientd, transport_clientd, buffer, length, addr);
}
}

int aeron_udp_channel_transport_loss_recvmmsg(
aeron_udp_channel_transport_t *transport,
struct mmsghdr *msgvec,
Expand All @@ -72,28 +103,24 @@ int aeron_udp_channel_transport_loss_recvmmsg(
aeron_udp_transport_recv_func_t recv_func,
void *clientd)
{
struct mmsghdr temp;

const int messages_received = delegate->recvmmsg_func(transport, msgvec, vlen, bytes_rcved, recv_func, clientd);
int messages_after_loss = messages_received;

for (int i = messages_received; --i > -1;)
aeron_udp_channel_transport_loss_clientd_t loss_clientd;
loss_clientd.original_recv_func = recv_func;
loss_clientd.original_clientd = clientd;
loss_clientd.bytes_dropped = 0;
loss_clientd.messages_dropped = 0;

// TODO: Do we actually need to change the contents of the msgvec??
// At the moment the aeron_driver_receiver doesn't use the msgvec.
// All of the data is pushed back through the recv_func.
const int messages_received = delegate->recvmmsg_func(
transport, msgvec, vlen, bytes_rcved, aeron_udp_channel_transport_loss_recv_callback, &loss_clientd);

if (NULL != bytes_rcved)
{
if (aeron_udp_channel_transport_loss_should_drop_frame(&msgvec[i], params->rate, params->recv_msg_type_mask))
{
if (i != (messages_after_loss - 1))
{
const size_t to_copy = (size_t)(messages_received - (i + 1));
memcpy(&temp, &msgvec[i], sizeof(temp));
memcpy(&msgvec[i], &msgvec[i + 1], sizeof(struct mmsghdr) * to_copy);
memcpy(&msgvec[messages_received - 1], &temp, sizeof(temp));
}

messages_after_loss--;
}
*bytes_rcved -= loss_clientd.bytes_dropped;
}

return messages_after_loss;
return messages_received - loss_clientd.messages_dropped;
}

int aeron_udp_channel_transport_loss_sendmmsg(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ aeron_udp_channel_transport_loss_params_t;


int aeron_udp_channel_transport_loss_init(
aeron_udp_channel_transport_bindings_t *delegate_bindings,
aeron_udp_channel_transport_loss_params_t *params);
const aeron_udp_channel_transport_bindings_t *delegate_bindings,
const aeron_udp_channel_transport_loss_params_t *params);

int aeron_udp_channel_transport_loss_recvmmsg(
aeron_udp_channel_transport_t *transport,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/

#include <gtest/gtest.h>
#include <media/aeron_udp_channel_transport.h>

extern "C"
{
Expand All @@ -30,6 +31,15 @@ class UdpChannelTransportLossTest : public testing::Test
}
};

void test_recv_callback(
void *clientd,
void *transport_clientd,
uint8_t *buffer,
size_t length,
struct sockaddr_storage *addr)
{
}

static int delegate_return_packets_recvmmsg(
aeron_udp_channel_transport_t *transport,
struct mmsghdr *msgvec,
Expand All @@ -38,13 +48,20 @@ static int delegate_return_packets_recvmmsg(
aeron_udp_transport_recv_func_t recv_func,
void *clientd)
{
const int16_t *msg_type = static_cast<int16_t *>(clientd);
const int16_t *msg_type = static_cast<int16_t *>(transport->dispatch_clientd);
for (size_t i = 0; i < vlen; i++)
{
iovec *iovec = msgvec[i].msg_hdr.msg_iov;
aeron_frame_header_t *frame_header = static_cast<aeron_frame_header_t *>(iovec[0].iov_base);
frame_header->type = *msg_type;
msgvec[i].msg_len = static_cast<unsigned int>(iovec[0].iov_len);

recv_func(clientd, NULL, static_cast<uint8_t *>(iovec[0].iov_base), msgvec[i].msg_len, NULL);

if (NULL != bytes_rcved)
{
*bytes_rcved += msgvec[i].msg_len;
}
}

return static_cast<int>(vlen);
Expand All @@ -53,8 +70,11 @@ static int delegate_return_packets_recvmmsg(
TEST_F(UdpChannelTransportLossTest, shouldDiscardAllPacketsWithRateOfOne)
{
uint16_t msg_type = AERON_HDR_TYPE_DATA;
aeron_udp_channel_transport_t transport;
aeron_udp_channel_transport_bindings_t bindings;
aeron_udp_channel_transport_loss_params_t params;
transport.dispatch_clientd = reinterpret_cast<void *>(&msg_type);

struct mmsghdr msgvec[2];
const size_t vlen = 2;
uint8_t data_0[1024];
Expand All @@ -79,7 +99,7 @@ TEST_F(UdpChannelTransportLossTest, shouldDiscardAllPacketsWithRateOfOne)
aeron_udp_channel_transport_loss_init(&bindings, &params);

int messages_received = aeron_udp_channel_transport_loss_recvmmsg(
NULL, msgvec, vlen, NULL, NULL, reinterpret_cast<void *>(&msg_type));
&transport, msgvec, vlen, NULL, test_recv_callback, NULL);

EXPECT_EQ(messages_received, 0);
}
Expand All @@ -90,6 +110,9 @@ TEST_F(UdpChannelTransportLossTest, shouldNotDiscardAllPacketsWithRateOfOneWithD
uint16_t data_msg_type = AERON_HDR_TYPE_SETUP;
aeron_udp_channel_transport_bindings_t bindings;
aeron_udp_channel_transport_loss_params_t params;
aeron_udp_channel_transport_t transport;
transport.dispatch_clientd = reinterpret_cast<void *>(&data_msg_type);

struct mmsghdr msgvec[2];
const size_t vlen = 2;
uint8_t data_0[1024];
Expand All @@ -114,7 +137,7 @@ TEST_F(UdpChannelTransportLossTest, shouldNotDiscardAllPacketsWithRateOfOneWithD
aeron_udp_channel_transport_loss_init(&bindings, &params);

int messages_received = aeron_udp_channel_transport_loss_recvmmsg(
NULL, msgvec, vlen, NULL, NULL, reinterpret_cast<void *>(&data_msg_type));
&transport, msgvec, vlen, NULL, test_recv_callback, NULL);

EXPECT_EQ(messages_received, 2);
}
Expand All @@ -124,6 +147,9 @@ TEST_F(UdpChannelTransportLossTest, shouldNotDiscardAllPacketsWithRateOfZero)
uint16_t loss_msg_type = AERON_HDR_TYPE_DATA;
aeron_udp_channel_transport_bindings_t bindings;
aeron_udp_channel_transport_loss_params_t params;
aeron_udp_channel_transport_t transport;
transport.dispatch_clientd = reinterpret_cast<void *>(&loss_msg_type);

struct mmsghdr msgvec[2];
const size_t vlen = 2;
uint8_t data_0[1024];
Expand All @@ -148,7 +174,7 @@ TEST_F(UdpChannelTransportLossTest, shouldNotDiscardAllPacketsWithRateOfZero)
aeron_udp_channel_transport_loss_init(&bindings, &params);

int messages_received = aeron_udp_channel_transport_loss_recvmmsg(
NULL, msgvec, vlen, NULL, NULL, reinterpret_cast<void *>(&loss_msg_type));
&transport, msgvec, vlen, NULL, test_recv_callback, NULL);

EXPECT_EQ(messages_received, 2);
}
Expand All @@ -158,6 +184,9 @@ TEST_F(UdpChannelTransportLossTest, shouldDiscardRoughlyHalfTheMessages)
uint16_t msg_type = AERON_HDR_TYPE_DATA;
aeron_udp_channel_transport_bindings_t bindings;
aeron_udp_channel_transport_loss_params_t params;
aeron_udp_channel_transport_t transport;
transport.dispatch_clientd = reinterpret_cast<void *>(&msg_type);
int64_t bytes_received = 0;

const size_t vlen = 10;
struct mmsghdr msgvec[vlen];
Expand All @@ -182,9 +211,11 @@ TEST_F(UdpChannelTransportLossTest, shouldDiscardRoughlyHalfTheMessages)
aeron_udp_channel_transport_loss_init(&bindings, &params);

int messages_received = aeron_udp_channel_transport_loss_recvmmsg(
NULL, msgvec, vlen, NULL, NULL, reinterpret_cast<void *>(&msg_type));
&transport, msgvec, vlen, &bytes_received, test_recv_callback, NULL);

EXPECT_NE(messages_received, 10);
EXPECT_NE(messages_received, 0);
EXPECT_LT(messages_received, static_cast<int>(vlen));
EXPECT_GT(messages_received, 0);
EXPECT_LT(bytes_received, static_cast<int64_t>(vlen * bytes_received));
EXPECT_GT(bytes_received, 0);
EXPECT_EQ(messages_received, 6);
}

0 comments on commit 189fec7

Please sign in to comment.