Skip to content

Commit

Permalink
[C] Free counters when cubic congestion control is freed. (aeron-io#1204
Browse files Browse the repository at this point in the history
)
  • Loading branch information
vyazelenko authored Jul 29, 2021
1 parent efffce1 commit 30d117e
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 13 deletions.
36 changes: 27 additions & 9 deletions aeron-driver/src/main/c/aeron_congestion_control.c
Original file line number Diff line number Diff line change
Expand Up @@ -229,8 +229,10 @@ struct aeron_cubic_congestion_control_strategy_state_stct
int64_t last_update_timestamp_ns;
int64_t last_rtt_timestamp_ns;

int64_t *rtt_indicator;
int64_t *window_indicator;
aeron_position_t rtt_indicator;
aeron_position_t window_indicator;

aeron_counters_manager_t *counters_manager;
};

typedef struct aeron_cubic_congestion_control_strategy_state_stct aeron_cubic_congestion_control_strategy_state_t;
Expand Down Expand Up @@ -260,7 +262,7 @@ void aeron_cubic_congestion_control_strategy_on_rttm(

cubic_state->last_rtt_timestamp_ns = now_ns;
cubic_state->rtt_ns = rtt_ns;
aeron_counter_set_ordered(cubic_state->rtt_indicator, rtt_ns);
aeron_counter_set_ordered(cubic_state->rtt_indicator.value_addr, rtt_ns);
cubic_state->rtt_timeout_ns =
(rtt_ns > (int64_t)cubic_state->initial_rtt_ns ? rtt_ns : (int64_t)cubic_state->initial_rtt_ns) *
AERON_CUBICCONGESTIONCONTROL_RTT_TIMEOUT_MULTIPLE;
Expand Down Expand Up @@ -326,7 +328,7 @@ int32_t aeron_cubic_congestion_control_strategy_on_track_rebuild(
}

const int32_t window = cubic_state->cwnd * cubic_state->mtu;
aeron_counter_set_ordered(cubic_state->window_indicator, window);
aeron_counter_set_ordered(cubic_state->window_indicator.value_addr, window);

return window;
}
Expand All @@ -341,6 +343,18 @@ int32_t aeron_cubic_congestion_control_strategy_max_window_length(void *state)
return ((aeron_cubic_congestion_control_strategy_state_t *)state)->max_window_length;
}

int aeron_cubic_congestion_control_strategy_fini(aeron_congestion_control_strategy_t *strategy)
{
aeron_cubic_congestion_control_strategy_state_t *state = strategy->state;
aeron_counters_manager_free(state->counters_manager, state->rtt_indicator.counter_id);
aeron_counters_manager_free(state->counters_manager, state->window_indicator.counter_id);

aeron_free(strategy->state);
aeron_free(strategy);

return 0;
}

int aeron_cubic_congestion_control_strategy_supplier(
aeron_congestion_control_strategy_t **strategy,
aeron_udp_channel_t *channel,
Expand Down Expand Up @@ -373,7 +387,7 @@ int aeron_cubic_congestion_control_strategy_supplier(
_strategy->on_track_rebuild = aeron_cubic_congestion_control_strategy_on_track_rebuild;
_strategy->initial_window_length = aeron_cubic_congestion_control_strategy_initial_window_length;
_strategy->max_window_length = aeron_cubic_congestion_control_strategy_max_window_length;
_strategy->fini = aeron_congestion_control_strategy_fini;
_strategy->fini = aeron_cubic_congestion_control_strategy_fini;

aeron_cubic_congestion_control_strategy_state_t *state = _strategy->state;

Expand Down Expand Up @@ -440,11 +454,15 @@ int aeron_cubic_congestion_control_strategy_supplier(
goto error_cleanup;
}

state->rtt_indicator = aeron_counters_manager_addr(counters_manager, rtt_indicator_counter_id);
aeron_counter_set_ordered(state->rtt_indicator, 0);
state->counters_manager = counters_manager;

state->rtt_indicator.counter_id = rtt_indicator_counter_id;
state->rtt_indicator.value_addr = aeron_counters_manager_addr(counters_manager, rtt_indicator_counter_id);
aeron_counter_set_ordered(state->rtt_indicator.value_addr, 0);

state->window_indicator = aeron_counters_manager_addr(counters_manager, window_indicator_counter_id);
aeron_counter_set_ordered(state->window_indicator, state->initial_window_length);
state->window_indicator.counter_id = window_indicator_counter_id;
state->window_indicator.value_addr = aeron_counters_manager_addr(counters_manager, window_indicator_counter_id);
aeron_counter_set_ordered(state->window_indicator.value_addr, state->initial_window_length);

state->last_rtt_timestamp_ns = 0;

Expand Down
21 changes: 17 additions & 4 deletions aeron-driver/src/test/c/aeron_congestion_control_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ class CongestionControlTest : public testing::Test

EXPECT_EQ(result, 0);
EXPECT_NE(nullptr, congestion_control_strategy);
void * const state = congestion_control_strategy->state;
void *const state = congestion_control_strategy->state;
EXPECT_NE(nullptr, state);
EXPECT_NE(nullptr, congestion_control_strategy->on_rttm_sent);
EXPECT_NE(nullptr, congestion_control_strategy->on_rttm);
Expand Down Expand Up @@ -166,6 +166,13 @@ class CongestionControlTest : public testing::Test
return udp_channel;
}

int32_t get_counter_state(int32_t counter_id) const
{
const aeron_counter_metadata_descriptor_t *metadata = (aeron_counter_metadata_descriptor_t *)
(m_counters_manager.metadata + (counter_id * AERON_COUNTERS_MANAGER_METADATA_LENGTH));
return metadata->state;
}

protected:
void TearDown() override
{
Expand All @@ -177,7 +184,7 @@ class CongestionControlTest : public testing::Test
it = m_udp_channels.erase(it);
}
}

aeron_driver_context_t *m_context = nullptr;
aeron_counters_manager_t m_counters_manager = {};
std::vector<aeron_udp_channel_t *> m_udp_channels;
Expand Down Expand Up @@ -288,7 +295,7 @@ TEST_F(CongestionControlTest, defaultStrategySupplierShouldChooseCubicCongestion

EXPECT_EQ(result, 0);
EXPECT_NE(nullptr, congestion_control_strategy);
void * const state = congestion_control_strategy->state;
void *const state = congestion_control_strategy->state;
EXPECT_NE(nullptr, state);
EXPECT_NE(nullptr, congestion_control_strategy->on_rttm_sent);
EXPECT_NE(nullptr, congestion_control_strategy->on_rttm);
Expand Down Expand Up @@ -398,7 +405,7 @@ TEST_F(CongestionControlTest, cubicCongestionControlStrategyConfiguration)

EXPECT_EQ(result, 0);
EXPECT_NE(nullptr, congestion_control_strategy);
void * const state = congestion_control_strategy->state;
void *const state = congestion_control_strategy->state;
EXPECT_NE(nullptr, state);
EXPECT_NE(nullptr, congestion_control_strategy->on_rttm_sent);
EXPECT_NE(nullptr, congestion_control_strategy->on_rttm);
Expand All @@ -411,12 +418,14 @@ TEST_F(CongestionControlTest, cubicCongestionControlStrategyConfiguration)
&m_counters_manager,
AERON_COUNTER_PER_IMAGE_TYPE_ID,
AERON_CUBICCONGESTIONCONTROL_RTT_INDICATOR_COUNTER_NAME);
EXPECT_EQ(AERON_COUNTER_RECORD_ALLOCATED, get_counter_state(rtt_indicator_counter_id));
EXPECT_EQ(0, aeron_counter_get(aeron_counters_manager_addr(&m_counters_manager, rtt_indicator_counter_id)));

const int32_t window_counter_id = find_counter_by_label_prefix(
&m_counters_manager,
AERON_COUNTER_PER_IMAGE_TYPE_ID,
AERON_CUBICCONGESTIONCONTROL_WINDOW_INDICATOR_COUNTER_NAME);
EXPECT_EQ(AERON_COUNTER_RECORD_ALLOCATED, get_counter_state(window_counter_id));
EXPECT_EQ(
sender_mtu_length * 2, aeron_counter_get(aeron_counters_manager_addr(&m_counters_manager, window_counter_id)));

Expand All @@ -431,4 +440,8 @@ TEST_F(CongestionControlTest, cubicCongestionControlStrategyConfiguration)
EXPECT_TRUE(congestion_control_strategy->should_measure_rtt(state, 30000000000LL));

congestion_control_strategy->fini(congestion_control_strategy);

// assert that counters were freed
EXPECT_EQ(AERON_COUNTER_RECORD_RECLAIMED, get_counter_state(rtt_indicator_counter_id));
EXPECT_EQ(AERON_COUNTER_RECORD_RECLAIMED, get_counter_state(window_counter_id));
}

0 comments on commit 30d117e

Please sign in to comment.