Skip to content

Commit

Permalink
[C] Add unconnected stream message.
Browse files Browse the repository at this point in the history
  • Loading branch information
mikeb01 committed Mar 27, 2024
1 parent f066c33 commit d7ea169
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 16 deletions.
13 changes: 12 additions & 1 deletion aeron-client/src/main/c/protocol/aeron_udp_protocol.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,17 @@ typedef struct aeron_rttm_header_stct
}
aeron_rttm_header_t;

typedef struct aeron_unknown_stream_header_stct
{
aeron_frame_header_t frame_header;
int32_t term_offset;
int32_t session_id;
int32_t stream_id;
int32_t term_id;
int64_t reserved_value;
}
aeron_unknown_stream_header_t;

#pragma pack(pop)

#define AERON_RES_HEADER_ADDRESS_LENGTH_IP4 (4u)
Expand Down Expand Up @@ -161,14 +172,14 @@ int aeron_udp_protocol_group_tag(aeron_status_message_header_t *sm, int64_t *gro
#define AERON_HDR_TYPE_ATS_SETUP (INT16_C(0x09))
#define AERON_HDR_TYPE_ATS_SM (INT16_C(0x0A))
#define AERON_HDR_TYPE_RSP_SETUP (INT16_C(0x0B))
#define AERON_HDR_TYPE_UNCONNECTED_STREAM (INT16_C(0xC))
#define AERON_HDR_TYPE_EXT (INT16_C(-1))

#define AERON_DATA_HEADER_LENGTH (sizeof(aeron_data_header_t))

#define AERON_DATA_HEADER_BEGIN_FLAG (UINT8_C(0x80))
#define AERON_DATA_HEADER_END_FLAG (UINT8_C(0x40))
#define AERON_DATA_HEADER_EOS_FLAG (UINT8_C(0x20))
#define AERON_DATA_HEADER_UNCONNECTED_FLAG (UINT8_C(0x10))

#define AERON_DATA_HEADER_UNFRAGMENTED (UINT8_C(AERON_DATA_HEADER_BEGIN_FLAG | AERON_DATA_HEADER_END_FLAG))

Expand Down
41 changes: 26 additions & 15 deletions aeron-driver/src/main/c/aeron_data_packet_dispatcher.c
Original file line number Diff line number Diff line change
Expand Up @@ -357,21 +357,7 @@ int aeron_data_packet_dispatcher_on_data(
const bool found = aeron_int64_to_tagged_ptr_hash_map_get(
&stream_interest->image_by_session_id_map, header->session_id, NULL, (void **)&image);

if (0 != (AERON_DATA_HEADER_UNCONNECTED_FLAG & header->frame_header.flags))
{
if (aeron_data_packet_dispatcher_stream_interest_for_session(stream_interest, header->session_id))
{
if (aeron_data_packet_dispatcher_elicit_setup_from_source(
dispatcher, stream_interest, endpoint, destination, addr, header->stream_id, header->session_id) < 0)
{
AERON_APPEND_ERR("%s", "");
return -1;
}
}

return 0;
}
else if (NULL != image)
if (NULL != image)
{
return aeron_publication_image_insert_packet(
image, destination, header->term_id, header->term_offset, buffer, length, addr);
Expand Down Expand Up @@ -556,6 +542,31 @@ int aeron_data_packet_dispatcher_on_rttm(
return 0;
}

int aeron_data_packet_dispatcher_on_unconnected_stream(
aeron_data_packet_dispatcher_t *dispatcher,
aeron_receive_channel_endpoint_t *endpoint,
aeron_receive_destination_t *destination,
aeron_unknown_stream_header_t *header,
uint8_t *buffer,
size_t length,
struct sockaddr_storage *addr)
{
aeron_data_packet_dispatcher_stream_interest_t *stream_interest =
aeron_int64_to_ptr_hash_map_get(&dispatcher->session_by_stream_id_map, header->stream_id);

if (aeron_data_packet_dispatcher_stream_interest_for_session(stream_interest, header->session_id))
{
if (aeron_data_packet_dispatcher_elicit_setup_from_source(
dispatcher, stream_interest, endpoint, destination, addr, header->stream_id, header->session_id) < 0)
{
AERON_APPEND_ERR("%s", "");
return -1;
}
}

return 0;
}

int aeron_data_packet_dispatcher_elicit_setup_from_source(
aeron_data_packet_dispatcher_t *dispatcher,
aeron_data_packet_dispatcher_stream_interest_t *stream_interest,
Expand Down
9 changes: 9 additions & 0 deletions aeron-driver/src/main/c/aeron_data_packet_dispatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,15 @@ int aeron_data_packet_dispatcher_on_rttm(
size_t length,
struct sockaddr_storage *addr);

int aeron_data_packet_dispatcher_on_unconnected_stream(
aeron_data_packet_dispatcher_t *dispatcher,
aeron_receive_channel_endpoint_t *endpoint,
aeron_receive_destination_t *destination,
aeron_unknown_stream_header_t *header,
uint8_t *buffer,
size_t length,
struct sockaddr_storage *addr);

int aeron_data_packet_dispatcher_elicit_setup_from_source(
aeron_data_packet_dispatcher_t *dispatcher,
aeron_data_packet_dispatcher_stream_interest_t *stream_interest,
Expand Down
30 changes: 30 additions & 0 deletions aeron-driver/src/main/c/media/aeron_receive_channel_endpoint.c
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,20 @@ void aeron_receive_channel_endpoint_dispatch(
}
break;

case AERON_HDR_TYPE_UNCONNECTED_STREAM:
if (length >= sizeof(aeron_unknown_stream_header_t))
{
if (aeron_receive_channel_endpoint_on_unconnected_stream(endpoint, destination, buffer, length, addr) < 0)
{
AERON_APPEND_ERR("%s", "recevier on_unknown_stream");
aeron_driver_receiver_log_error(receiver);
}
}
else
{
aeron_counter_increment(receiver->invalid_frames_counter, 1);
}

default:
break;
}
Expand Down Expand Up @@ -589,6 +603,22 @@ int aeron_receive_channel_endpoint_on_rttm(
return result;
}

int aeron_receive_channel_endpoint_on_unconnected_stream(
aeron_receive_channel_endpoint_t *endpoint,
aeron_receive_destination_t *destination,
uint8_t *buffer,
size_t length,
struct sockaddr_storage *addr)
{
aeron_unknown_stream_header_t *unknown_stream_header = (aeron_unknown_stream_header_t *)buffer;

aeron_receive_destination_update_last_activity_ns(
destination, aeron_clock_cached_nano_time(endpoint->cached_clock));

return aeron_data_packet_dispatcher_on_unconnected_stream(
&endpoint->dispatcher, endpoint, destination, unknown_stream_header, buffer, length, addr);
}

void aeron_receive_channel_endpoint_try_remove_endpoint(aeron_receive_channel_endpoint_t *endpoint)
{
if (0 == endpoint->stream_id_to_refcnt_map.size &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,13 @@ int aeron_receive_channel_endpoint_on_rttm(
size_t length,
struct sockaddr_storage *addr);

int aeron_receive_channel_endpoint_on_unconnected_stream(
aeron_receive_channel_endpoint_t *endpoint,
aeron_receive_destination_t *destination,
uint8_t *buffer,
size_t length,
struct sockaddr_storage *addr);

void aeron_receive_channel_endpoint_try_remove_endpoint(aeron_receive_channel_endpoint_t *endpoint);

int aeron_receive_channel_endpoint_incref_to_stream(aeron_receive_channel_endpoint_t *endpoint, int32_t stream_id);
Expand Down

0 comments on commit d7ea169

Please sign in to comment.