Skip to content

Commit

Permalink
[Java] add config option and uri param for setting max retransmits (r…
Browse files Browse the repository at this point in the history
…eal-logic#1640)

* [Java] add config option and uri param for setting max retransmits

* [Java] fix checkstyle

* [Java] add a max to the max_retransmits

* [C] C implementation of configurable max retransmits

* [C] use AERON_RETRANSMIT_HANDLER_MAX_RETRANSMITS_MAX instead of 256

* [Java] update ChannelUriStringBuilder for maxRetransmits

* [C++] update cpp and cpp_wrapper uri string builder for max retransmits

* [Java] updates to conform with naming standards

* [C,C++] updates to conform with naming standards

* [C] add better logging when configured max is out of valid range

* [Java] add invalid value to illegal argument exception message

* [Java] switch to max-resend

* [C,C++] switch to max-resend

* [C] fix invalid print format
  • Loading branch information
nbradac authored Aug 20, 2024
1 parent ed0fb6f commit 9fb1e8e
Show file tree
Hide file tree
Showing 28 changed files with 464 additions and 23 deletions.
1 change: 1 addition & 0 deletions aeron-client/src/main/c/uri/aeron_uri.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ aeron_uri_params_t;
#define AERON_URI_NAK_DELAY_KEY "nak-delay"
#define AERON_URI_UNTETHERED_WINDOW_LIMIT_TIMEOUT_KEY "untethered-window-limit-timeout"
#define AERON_URI_UNTETHERED_RESTING_TIMEOUT_KEY "untethered-resting-timeout"
#define AERON_URI_MAX_RESEND_KEY "max-resend"
#define AERON_URI_INVALID_TAG (-1)

typedef struct aeron_udp_channel_params_stct
Expand Down
1 change: 1 addition & 0 deletions aeron-client/src/main/cpp/ChannelUri.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ static constexpr const char RESPONSE_CORRELATION_ID_PARAM_NAME[] = "response-cor
static constexpr const char NAK_DELAY_PARAM_NAME[] = "nak-delay";
static constexpr const char UNTETHERED_WINDOW_LIMIT_TIMEOUT_PARAM_NAME[] = "untethered-window-limit-timeout";
static constexpr const char UNTETHERED_RESTING_TIMEOUT_PARAM_NAME[] = "untethered-resting-timeout";
static constexpr const char MAX_RESEND_PARAM_NAME[] = "max-resend";

using namespace aeron::util;

Expand Down
9 changes: 9 additions & 0 deletions aeron-client/src/main/cpp/ChannelUriStringBuilder.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ class ChannelUriStringBuilder
m_nakDelay.reset(nullptr);
m_untetheredWindowLimitTimeout.reset(nullptr);
m_untetheredRestingTimeout.reset(nullptr);
m_maxResend.reset(nullptr);

return *this;
}
Expand Down Expand Up @@ -405,6 +406,12 @@ class ChannelUriStringBuilder
return *this;
}

inline this_t &maxResend(std::int32_t maxResend)
{
m_maxResend.reset(new Value(maxResend));
return *this;
}

std::string build()
{
std::ostringstream sb;
Expand Down Expand Up @@ -498,6 +505,7 @@ class ChannelUriStringBuilder
append(sb, NAK_DELAY_PARAM_NAME, m_nakDelay);
append(sb, UNTETHERED_WINDOW_LIMIT_TIMEOUT_PARAM_NAME, m_untetheredWindowLimitTimeout);
append(sb, UNTETHERED_RESTING_TIMEOUT_PARAM_NAME, m_untetheredRestingTimeout);
append(sb, MAX_RESEND_PARAM_NAME, m_maxResend);

std::string result = sb.str();
const char lastChar = result.back();
Expand Down Expand Up @@ -554,6 +562,7 @@ class ChannelUriStringBuilder
std::unique_ptr<Value> m_nakDelay;
std::unique_ptr<Value> m_untetheredWindowLimitTimeout;
std::unique_ptr<Value> m_untetheredRestingTimeout;
std::unique_ptr<Value> m_maxResend;
std::unique_ptr<std::string> m_mediaReceiveTimestampOffset;
std::unique_ptr<std::string> m_channelReceiveTimestampOffset;
std::unique_ptr<std::string> m_channelSendTimestampOffset;
Expand Down
3 changes: 2 additions & 1 deletion aeron-client/src/main/cpp_wrapper/ChannelUri.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ static constexpr const char RESPONSE_CORRELATION_ID_PARAM_NAME[] = "response-cor
static constexpr const char NAK_DELAY_PARAM_NAME[] = "nak-delay";
static constexpr const char UNTETHERED_WINDOW_LIMIT_TIMEOUT_PARAM_NAME[] = "untethered-window-limit-timeout";
static constexpr const char UNTETHERED_RESTING_TIMEOUT_PARAM_NAME[] = "untethered-resting-timeout";
static constexpr const char MAX_RESEND_PARAM_NAME[] = "max-resend";

using namespace aeron::util;

Expand Down Expand Up @@ -174,7 +175,7 @@ class ChannelUri
{
return m_params->find(key) != m_params->end();
}

inline bool hasControlModeResponse()
{
const std::string &controlMode = get(MDC_CONTROL_MODE_PARAM_NAME);
Expand Down
9 changes: 9 additions & 0 deletions aeron-client/src/main/cpp_wrapper/ChannelUriStringBuilder.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ class ChannelUriStringBuilder
m_nakDelay.reset(nullptr);
m_untetheredWindowLimitTimeout.reset(nullptr);
m_untetheredRestingTimeout.reset(nullptr);
m_maxResend.reset(nullptr);

return *this;
}
Expand Down Expand Up @@ -405,6 +406,12 @@ class ChannelUriStringBuilder
return *this;
}

inline this_t &maxResend(std::int32_t maxResend)
{
m_maxResend.reset(new Value(maxResend));
return *this;
}

std::string build()
{
std::ostringstream sb;
Expand Down Expand Up @@ -498,6 +505,7 @@ class ChannelUriStringBuilder
append(sb, NAK_DELAY_PARAM_NAME, m_nakDelay);
append(sb, UNTETHERED_WINDOW_LIMIT_TIMEOUT_PARAM_NAME, m_untetheredWindowLimitTimeout);
append(sb, UNTETHERED_RESTING_TIMEOUT_PARAM_NAME, m_untetheredRestingTimeout);
append(sb, MAX_RESEND_PARAM_NAME, m_maxResend);

std::string result = sb.str();
const char lastChar = result.back();
Expand Down Expand Up @@ -554,6 +562,7 @@ class ChannelUriStringBuilder
std::unique_ptr<Value> m_nakDelay;
std::unique_ptr<Value> m_untetheredWindowLimitTimeout;
std::unique_ptr<Value> m_untetheredRestingTimeout;
std::unique_ptr<Value> m_maxResend;
std::unique_ptr<std::string> m_mediaReceiveTimestampOffset;
std::unique_ptr<std::string> m_channelReceiveTimestampOffset;
std::unique_ptr<std::string> m_channelSendTimestampOffset;
Expand Down
54 changes: 54 additions & 0 deletions aeron-client/src/main/java/io/aeron/ChannelUriStringBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ public final class ChannelUriStringBuilder
private Long nakDelay;
private Long untetheredWindowLimitTimeoutNs;
private Long untetheredRestingTimeoutNs;
private Integer maxResend;

/**
* Default constructor
Expand Down Expand Up @@ -144,6 +145,7 @@ public ChannelUriStringBuilder(final ChannelUri channelUri)
nakDelay(channelUri);
untetheredWindowLimitTimeout(channelUri);
untetheredRestingTimeout(channelUri);
maxResend(channelUri);
}

/**
Expand Down Expand Up @@ -187,6 +189,7 @@ public ChannelUriStringBuilder clear()
channelSendTimestampOffset = null;
responseEndpoint = null;
responseCorrelationId = null;
maxResend = null;

return this;
}
Expand Down Expand Up @@ -2156,6 +2159,56 @@ public Long untetheredRestingTimeoutNs()
return untetheredRestingTimeoutNs;
}

/**
* The max number of retransmit actions.
*
* @param maxResend the max number of retransmit actions.
* @return this for a fluent API.
*/
public ChannelUriStringBuilder maxResend(final Integer maxResend)
{
this.maxResend = maxResend;
return this;
}

/**
* The max number of retransmit actions.
*
* @param channelUri the existing URI to extract the maxResend from.
* @return this for a fluent API.
*/
public ChannelUriStringBuilder maxResend(final ChannelUri channelUri)
{
final String valueStr = channelUri.get(MAX_RESEND_PARAM_NAME);
if (null == valueStr)
{
this.maxResend = null;
return this;
}
else
{
try
{
return maxResend(Integer.parseInt(valueStr));
}
catch (final NumberFormatException ex)
{
throw new IllegalArgumentException(
MAX_RESEND_PARAM_NAME + " must be a number", ex);
}
}
}

/**
* The max number of retransmit actions.
*
* @return the max number of outstanding retransmit actions
*/
public Integer maxResend()
{
return maxResend;
}

/**
* Build a channel URI String for the given parameters.
*
Expand Down Expand Up @@ -2213,6 +2266,7 @@ public String build()
appendParameter(sb, NAK_DELAY_PARAM_NAME, nakDelay);
appendParameter(sb, UNTETHERED_WINDOW_LIMIT_TIMEOUT_PARAM_NAME, untetheredWindowLimitTimeoutNs);
appendParameter(sb, UNTETHERED_RESTING_TIMEOUT_PARAM_NAME, untetheredRestingTimeoutNs);
appendParameter(sb, MAX_RESEND_PARAM_NAME, maxResend);

final char lastChar = sb.charAt(sb.length() - 1);
if (lastChar == '|' || lastChar == '?')
Expand Down
7 changes: 7 additions & 0 deletions aeron-client/src/main/java/io/aeron/CommonContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,13 @@ public static InferableBoolean parse(final String value)
*/
public static final String UNTETHERED_RESTING_TIMEOUT_PARAM_NAME = "untethered-resting-timeout";

/**
* Parameter name to set the max number of outstanding active retransmits for a publication
*
* @since 1.45.0
*/
public static final String MAX_RESEND_PARAM_NAME = "max-resend";

/**
* Get the current fallback logger based on the supplied property.
*
Expand Down
15 changes: 15 additions & 0 deletions aeron-client/src/test/cpp/ChannelUriStringBuilderTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -445,3 +445,18 @@ TEST(ChannelUriStringBuilderTest, shouldGenerateControlMode)
builder.clear().media("ipc");
ASSERT_EQ(builder.build(), "aeron:ipc");
}

TEST(ChannelUriStringBuilderTest, shouldHandleMaxRetransmits)
{
ChannelUriStringBuilder builder;

builder
.media(UDP_MEDIA)
.endpoint("224.10.9.8:777")
.maxResend(123);

const std::string uriString = builder.build();

std::shared_ptr<ChannelUri> channelUri = ChannelUri::parse(uriString);
ASSERT_NE(std::string::npos, channelUri->toString().find("max-resend=123"));
}
14 changes: 14 additions & 0 deletions aeron-client/src/test/cpp_wrapper/ChannelUriStringBuilderTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -172,3 +172,17 @@ TEST(ChannelUriStringBuilderTest, shouldGenerateRxTimestampOffset)
"aeron:udp?endpoint=localhost:9999|media-rcv-ts-offset=reserved");
}

TEST(ChannelUriStringBuilderTest, shouldHandleMaxRetransmits)
{
ChannelUriStringBuilder builder;

builder
.media(UDP_MEDIA)
.endpoint("224.10.9.8:777")
.maxResend(123);

const std::string uriString = builder.build();

std::shared_ptr<ChannelUri> channelUri = ChannelUri::parse(uriString);
ASSERT_NE(std::string::npos, channelUri->toString().find("max-resend=123"));
}
Original file line number Diff line number Diff line change
Expand Up @@ -279,4 +279,17 @@ void shouldHandleUntetheredRestingTimeoutWithUnits()
assertEquals(1000000L, new ChannelUriStringBuilder()
.untetheredRestingTimeout("1ms").untetheredRestingTimeoutNs());
}

@Test
void shouldHandleMaxRetransmits()
{
assertEquals(20, new ChannelUriStringBuilder()
.maxResend(20)
.maxResend());
assertTrue(new ChannelUriStringBuilder().maxResend(20).build()
.contains(CommonContext.MAX_RESEND_PARAM_NAME + "=20"));
assertEquals(30, new ChannelUriStringBuilder()
.maxResend(ChannelUri.parse(new ChannelUriStringBuilder().maxResend(30).build()))
.maxResend());
}
}
1 change: 1 addition & 0 deletions aeron-driver/src/main/c/aeron_driver.c
Original file line number Diff line number Diff line change
Expand Up @@ -560,6 +560,7 @@ void aeron_driver_context_print_configuration(aeron_driver_context_t *context)
fprintf(fpout, "\n publication_linger_timeout_ns=%" PRIu64, context->publication_linger_timeout_ns);
fprintf(fpout, "\n untethered_window_limit_timeout_ns=%" PRIu64, context->untethered_window_limit_timeout_ns);
fprintf(fpout, "\n untethered_resting_timeout_ns=%" PRIu64, context->untethered_resting_timeout_ns);
fprintf(fpout, "\n max_resend=%" PRIu32, context->max_resend);
fprintf(fpout, "\n retransmit_unicast_delay_ns=%" PRIu64, context->retransmit_unicast_delay_ns);
fprintf(fpout, "\n retransmit_unicast_linger_ns=%" PRIu64, context->retransmit_unicast_linger_ns);
fprintf(fpout, "\n nak_unicast_delay_ns=%" PRIu64, context->nak_unicast_delay_ns);
Expand Down
21 changes: 21 additions & 0 deletions aeron-driver/src/main/c/aeron_driver_context.c
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,7 @@ int aeron_driver_context_init(aeron_driver_context_t **context)
_context->counter_free_to_reuse_ns = AERON_COUNTERS_FREE_TO_REUSE_TIMEOUT_NS_DEFAULT;
_context->untethered_window_limit_timeout_ns = AERON_UNTETHERED_WINDOW_LIMIT_TIMEOUT_NS_DEFAULT;
_context->untethered_resting_timeout_ns = AERON_UNTETHERED_RESTING_TIMEOUT_NS_DEFAULT;
_context->max_resend = AERON_RETRANSMIT_HANDLER_MAX_RESEND;
_context->retransmit_unicast_delay_ns = AERON_RETRANSMIT_UNICAST_DELAY_NS_DEFAULT;
_context->retransmit_unicast_linger_ns = AERON_RETRANSMIT_UNICAST_LINGER_NS_DEFAULT;
_context->nak_multicast_group_size = AERON_NAK_MULTICAST_GROUP_SIZE_DEFAULT;
Expand Down Expand Up @@ -895,6 +896,13 @@ int aeron_driver_context_init(aeron_driver_context_t **context)
1000,
INT64_MAX);

_context->max_resend = aeron_config_parse_uint32(
AERON_MAX_RESEND_ENV_VAR,
getenv(AERON_MAX_RESEND_ENV_VAR),
_context->max_resend,
1,
AERON_RETRANSMIT_HANDLER_MAX_RESEND_MAX);

_context->retransmit_unicast_delay_ns = aeron_config_parse_duration_ns(
AERON_RETRANSMIT_UNICAST_DELAY_ENV_VAR,
getenv(AERON_RETRANSMIT_UNICAST_DELAY_ENV_VAR),
Expand Down Expand Up @@ -2558,6 +2566,19 @@ uint64_t aeron_driver_context_get_driver_timeout_ms(aeron_driver_context_t *cont
return NULL != context ? context->driver_timeout_ms : AERON_DRIVER_TIMEOUT_MS_DEFAULT;
}

int aeron_driver_context_set_max_resend(aeron_driver_context_t *context, uint32_t value)
{
AERON_DRIVER_CONTEXT_SET_CHECK_ARG_AND_RETURN(-1, context);

context->max_resend = value;
return 0;
}

uint32_t aeron_driver_context_get_max_resend(aeron_driver_context_t *context)
{
return NULL != context ? context->max_resend : AERON_RETRANSMIT_HANDLER_MAX_RESEND;
}

int aeron_driver_context_set_retransmit_unicast_delay_ns(aeron_driver_context_t *context, uint64_t value)
{
AERON_DRIVER_CONTEXT_SET_CHECK_ARG_AND_RETURN(-1, context);
Expand Down
1 change: 1 addition & 0 deletions aeron-driver/src/main/c/aeron_driver_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ typedef struct aeron_driver_context_stct
uint32_t network_publication_max_messages_per_send; /* aeron.network.publication.max.messages.per.send = 2 */
uint32_t resource_free_limit; /* aeron.driver.resource.free.limit = 10 */
uint32_t async_executor_threads; /* aeron.driver.async.executor.threads = 1 */
uint32_t max_resend; /* aeron.max.resend = 16 */

int32_t conductor_cpu_affinity_no; /* aeron.conductor.cpu.affinity = -1 */
int32_t receiver_cpu_affinity_no; /* aeron.receiver.cpu.affinity = -1 */
Expand Down
1 change: 1 addition & 0 deletions aeron-driver/src/main/c/aeron_network_publication.c
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ int aeron_network_publication_create(
context->retransmit_unicast_delay_ns,
context->retransmit_unicast_linger_ns,
aeron_udp_channel_has_group_semantics(endpoint->conductor_fields.udp_channel),
params->has_max_resend ? params->max_resend : context->max_resend,
retransmit_overflow_counter) < 0)
{
aeron_free(_pub->log_file_name);
Expand Down
3 changes: 2 additions & 1 deletion aeron-driver/src/main/c/aeron_retransmit_handler.c
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,14 @@ int aeron_retransmit_handler_init(
uint64_t delay_timeout_ns,
uint64_t linger_timeout_ns,
bool has_group_semantics,
uint32_t max_retransmits,
int64_t *retransmit_overflow_counter)
{
handler->invalid_packets_counter = invalid_packets_counter;
handler->delay_timeout_ns = delay_timeout_ns;
handler->linger_timeout_ns = linger_timeout_ns;
handler->has_group_semantics = has_group_semantics;
handler->max_retransmits = has_group_semantics ? AERON_RETRANSMIT_HANDLER_MAX_RETRANSMITS : 1;
handler->max_retransmits = has_group_semantics ? max_retransmits : 1;
handler->retransmit_overflow_counter = retransmit_overflow_counter;

assert(NULL != retransmit_overflow_counter);
Expand Down
4 changes: 3 additions & 1 deletion aeron-driver/src/main/c/aeron_retransmit_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ typedef struct aeron_retransmit_action_stct
}
aeron_retransmit_action_t;

#define AERON_RETRANSMIT_HANDLER_MAX_RETRANSMITS (16)
#define AERON_RETRANSMIT_HANDLER_MAX_RESEND (16)
#define AERON_RETRANSMIT_HANDLER_MAX_RESEND_MAX (256)

typedef int (*aeron_retransmit_handler_resend_func_t)(
void *clientd, int32_t term_id, int32_t term_offset, size_t length);
Expand All @@ -66,6 +67,7 @@ int aeron_retransmit_handler_init(
uint64_t delay_timeout_ns,
uint64_t linger_timeout_ns,
bool has_group_semantics,
uint32_t max_retransmits,
int64_t *retransmit_overflow_counter);

void aeron_retransmit_handler_close(aeron_retransmit_handler_t *handler);
Expand Down
8 changes: 8 additions & 0 deletions aeron-driver/src/main/c/aeronmd.h
Original file line number Diff line number Diff line change
Expand Up @@ -637,6 +637,14 @@ uint64_t aeron_driver_context_get_nak_unicast_delay_ns(aeron_driver_context_t *c
int aeron_driver_context_set_nak_unicast_retry_delay_ratio(aeron_driver_context_t *context, uint64_t value);
uint64_t aeron_driver_context_get_nak_unicast_retry_delay_ratio(aeron_driver_context_t *context);

/**
* Max number of active retransmissions tracked for udp streams with group semantics.
*/
#define AERON_MAX_RESEND_ENV_VAR "AERON_MAX_RESEND"

int aeron_driver_context_set_max_resend(aeron_driver_context_t *context, uint32_t value);
uint32_t aeron_driver_context_get_max_resend(aeron_driver_context_t *context);

/**
* How long to delay before sending a retransmit following a NAK.
*/
Expand Down
Loading

0 comments on commit 9fb1e8e

Please sign in to comment.