Skip to content

Commit

Permalink
Merge pull request aeron-io#778 from mikeb01/master
Browse files Browse the repository at this point in the history
[C] remove old loss code from aeron_driver_agent, replaced with loss interceptor
  • Loading branch information
mjpt777 authored Dec 10, 2019
2 parents 75324ed + 8718568 commit 9f2952b
Show file tree
Hide file tree
Showing 2 changed files with 2 additions and 56 deletions.
55 changes: 2 additions & 53 deletions aeron-driver/src/main/c/agent/aeron_driver_agent.c
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,6 @@ static AERON_INIT_ONCE agent_is_initialized = AERON_INIT_ONCE_VALUE;
static aeron_mpsc_rb_t logging_mpsc_rb;
static uint8_t *rb_buffer = NULL;
static uint64_t mask = 0;
static double receive_data_loss_rate = 0.0;
static unsigned short receive_data_loss_xsubi[3];
static aeron_thread_t log_reader_thread;

int64_t aeron_agent_epoch_clock()
Expand Down Expand Up @@ -84,18 +82,6 @@ void aeron_agent_format_date(char *str, size_t count, int64_t timestamp)
snprintf(str, count, "%s%s%s", time_buffer, msec_buffer, tz_buffer);
}

bool aeron_agent_should_drop_frame(struct msghdr *message)
{
aeron_frame_header_t *frame_header = (aeron_frame_header_t *)(message->msg_iov->iov_base);

if (frame_header->type == AERON_HDR_TYPE_DATA || frame_header->type == AERON_HDR_TYPE_PAD)
{
return aeron_erand48(receive_data_loss_xsubi) <= receive_data_loss_rate;
}

return false;
}

static void *aeron_driver_agent_log_reader(void *arg)
{
while (true)
Expand All @@ -110,7 +96,6 @@ static void *aeron_driver_agent_log_reader(void *arg)
static void initialize_agent_logging()
{
char *mask_str = getenv(AERON_AGENT_MASK_ENV_VAR);
char *receive_loss_rate_str = getenv(AERON_AGENT_RECEIVE_DATA_LOSS_RATE_ENV_VAR);

if (mask_str)
{
Expand Down Expand Up @@ -139,30 +124,6 @@ static void initialize_agent_logging()
exit(EXIT_FAILURE);
}
}

if (receive_loss_rate_str)
{
receive_data_loss_rate = strtod(receive_loss_rate_str, NULL);
}

if (0.0 != receive_data_loss_rate)
{
char *receive_loss_seed_str = getenv(AERON_AGENT_RECEIVE_DATA_LOSS_SEED_ENV_VAR);
long long seed_value;

if (receive_loss_seed_str)
{
seed_value = strtoll(receive_loss_seed_str, NULL, 0);
}
else
{
seed_value = aeron_agent_epoch_clock();
}

receive_data_loss_xsubi[2] = (unsigned short)(seed_value & 0xFFFF);
receive_data_loss_xsubi[1] = (unsigned short)((seed_value >> 16) & 0xFFFF);
receive_data_loss_xsubi[0] = (unsigned short)((seed_value >> 32) & 0xFFFF);
}
}

void aeron_driver_agent_conductor_to_driver_interceptor(
Expand Down Expand Up @@ -376,12 +337,7 @@ ssize_t recvmsg(int socket, struct msghdr *message, int flags)

if (result > 0)
{
if (receive_data_loss_rate > 0.0 && aeron_agent_should_drop_frame(message))
{
aeron_driver_agent_log_frame(AERON_FRAME_IN_DROPPED, socket, message, flags, (int)result, (int32_t)result);
result = 0;
}
else if (mask & AERON_FRAME_IN)
if (mask & AERON_FRAME_IN)
{
aeron_driver_agent_log_frame(AERON_FRAME_IN, socket, message, flags, (int)result, (int32_t)result);
}
Expand Down Expand Up @@ -471,14 +427,7 @@ int recvmmsg(int sockfd, struct mmsghdr *msgvec, unsigned int vlen, int flags, r

for (int i = 0; i < result; i++)
{
if (receive_data_loss_rate > 0.0 && aeron_agent_should_drop_frame(&msgvec[i].msg_hdr))
{
aeron_driver_agent_log_frame(
AERON_FRAME_IN_DROPPED, sockfd, &msgvec[i].msg_hdr, flags, msgvec[i].msg_len, msgvec[i].msg_len);
result = i;
break;
}
else if (mask & AERON_FRAME_IN)
if (mask & AERON_FRAME_IN)
{
aeron_driver_agent_log_frame(
AERON_FRAME_IN, sockfd, &msgvec[i].msg_hdr, flags, msgvec[i].msg_len, msgvec[i].msg_len);
Expand Down
3 changes: 0 additions & 3 deletions aeron-driver/src/main/c/agent/aeron_driver_agent.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,6 @@

#define AERON_MAP_RAW_LOG_OP_CLOSE (0x11)

#define AERON_AGENT_RECEIVE_DATA_LOSS_RATE_ENV_VAR "AERON_DEBUG_RECEIVE_DATA_LOSS_RATE"
#define AERON_AGENT_RECEIVE_DATA_LOSS_SEED_ENV_VAR "AERON_DEBUG_RECEIVE_DATA_LOSS_SEED"

typedef struct aeron_driver_agent_cmd_log_header_stct
{
int64_t time_ms;
Expand Down

0 comments on commit 9f2952b

Please sign in to comment.