Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master'
Browse files Browse the repository at this point in the history
  • Loading branch information
tmontgomery committed Jul 8, 2022
2 parents 8146305 + 7a6080d commit 2ce48ae
Show file tree
Hide file tree
Showing 10 changed files with 265 additions and 309 deletions.
63 changes: 30 additions & 33 deletions aeron-client/src/main/cpp/ControlledFragmentAssembler.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,58 +94,55 @@ class ControlledFragmentAssembler
{
action = m_delegate(buffer, offset, length, header);
}
else if ((flags & FrameDescriptor::BEGIN_FRAG) == FrameDescriptor::BEGIN_FRAG)
{
BufferBuilder &builder = getBuffer(header.sessionId());
auto nextOffset = BitUtil::align(
offset + length + DataFrameHeader::LENGTH, FrameDescriptor::FRAME_ALIGNMENT);

builder.reset().append(buffer, offset, length, header).nextTermOffset(nextOffset);
}
else
{
if ((flags & FrameDescriptor::BEGIN_FRAG) == FrameDescriptor::BEGIN_FRAG)
{
BufferBuilder &builder = getBuffer(header.sessionId());
auto nextOffset = BitUtil::align(
offset + length + DataFrameHeader::LENGTH, FrameDescriptor::FRAME_ALIGNMENT);
auto result = m_builderBySessionIdMap.find(header.sessionId());

builder.reset().append(buffer, offset, length, header).nextTermOffset(nextOffset);
}
else
if (result != m_builderBySessionIdMap.end())
{
auto result = m_builderBySessionIdMap.find(header.sessionId());
BufferBuilder &builder = result->second;
const std::uint32_t limit = builder.limit();

if (result != m_builderBySessionIdMap.end())
if (offset == builder.nextTermOffset())
{
BufferBuilder &builder = result->second;
const std::uint32_t limit = builder.limit();
builder.append(buffer, offset, length, header);

if (offset == builder.nextTermOffset())
if ((flags & FrameDescriptor::END_FRAG) == FrameDescriptor::END_FRAG)
{
builder.append(buffer, offset, length, header);
util::index_t msgLength =
static_cast<util::index_t>(builder.limit()) - DataFrameHeader::LENGTH;
AtomicBuffer msgBuffer(builder.buffer(), builder.limit());

action = m_delegate(msgBuffer, DataFrameHeader::LENGTH, msgLength, header);

if ((flags & FrameDescriptor::END_FRAG) == FrameDescriptor::END_FRAG)
if (ControlledPollAction::ABORT == action)
{
util::index_t msgLength =
static_cast<util::index_t>(builder.limit()) - DataFrameHeader::LENGTH;
AtomicBuffer msgBuffer(builder.buffer(), builder.limit());

action = m_delegate(msgBuffer, DataFrameHeader::LENGTH, msgLength, header);

if (ControlledPollAction::ABORT == action)
{
builder.limit(limit);
}
else
{
builder.reset();
}
builder.limit(limit);
}
else
{
auto nextOffset = BitUtil::align(
offset + length + DataFrameHeader::LENGTH, FrameDescriptor::FRAME_ALIGNMENT);
builder.nextTermOffset(nextOffset);
builder.reset();
}
}
else
{
builder.reset();
auto nextOffset = BitUtil::align(
offset + length + DataFrameHeader::LENGTH, FrameDescriptor::FRAME_ALIGNMENT);
builder.nextTermOffset(nextOffset);
}
}
else
{
builder.reset();
}
}
}

Expand Down
59 changes: 28 additions & 31 deletions aeron-client/src/main/cpp/FragmentAssembler.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,50 +93,47 @@ class FragmentAssembler
{
m_delegate(buffer, offset, length, header);
}
else if ((flags & FrameDescriptor::BEGIN_FRAG) == FrameDescriptor::BEGIN_FRAG)
{
BufferBuilder &builder = getBuffer(header.sessionId());
auto nextOffset = BitUtil::align(
offset + length + DataFrameHeader::LENGTH, FrameDescriptor::FRAME_ALIGNMENT);

builder.reset().append(buffer, offset, length, header).nextTermOffset(nextOffset);
}
else
{
if ((flags & FrameDescriptor::BEGIN_FRAG) == FrameDescriptor::BEGIN_FRAG)
{
BufferBuilder &builder = getBuffer(header.sessionId());
auto nextOffset = BitUtil::align(
offset + length + DataFrameHeader::LENGTH, FrameDescriptor::FRAME_ALIGNMENT);
auto result = m_builderBySessionIdMap.find(header.sessionId());

builder.reset().append(buffer, offset, length, header).nextTermOffset(nextOffset);
}
else
if (result != m_builderBySessionIdMap.end())
{
auto result = m_builderBySessionIdMap.find(header.sessionId());
BufferBuilder &builder = result->second;

if (result != m_builderBySessionIdMap.end())
if (offset == builder.nextTermOffset())
{
BufferBuilder &builder = result->second;
builder.append(buffer, offset, length, header);

if (offset == builder.nextTermOffset())
if ((flags & FrameDescriptor::END_FRAG) == FrameDescriptor::END_FRAG)
{
builder.append(buffer, offset, length, header);

if ((flags & FrameDescriptor::END_FRAG) == FrameDescriptor::END_FRAG)
{
util::index_t msgLength =
static_cast<util::index_t>(builder.limit()) - DataFrameHeader::LENGTH;
AtomicBuffer msgBuffer(builder.buffer(), builder.limit());

m_delegate(msgBuffer, DataFrameHeader::LENGTH, msgLength, header);

builder.reset();
}
else
{
auto nextOffset = BitUtil::align(
offset + length + DataFrameHeader::LENGTH, FrameDescriptor::FRAME_ALIGNMENT);
builder.nextTermOffset(nextOffset);
}
util::index_t msgLength =
static_cast<util::index_t>(builder.limit()) - DataFrameHeader::LENGTH;
AtomicBuffer msgBuffer(builder.buffer(), builder.limit());

m_delegate(msgBuffer, DataFrameHeader::LENGTH, msgLength, header);

builder.reset();
}
else
{
builder.reset();
auto nextOffset = BitUtil::align(
offset + length + DataFrameHeader::LENGTH, FrameDescriptor::FRAME_ALIGNMENT);
builder.nextTermOffset(nextOffset);
}
}
else
{
builder.reset();
}
}
}
}
Expand Down
63 changes: 28 additions & 35 deletions aeron-client/src/main/cpp/ImageControlledFragmentAssembler.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,51 +79,44 @@ class ImageControlledFragmentAssembler
{
action = m_delegate(buffer, offset, length, header);
}
else
else if ((flags & FrameDescriptor::BEGIN_FRAG) == FrameDescriptor::BEGIN_FRAG)
{
if ((flags & FrameDescriptor::BEGIN_FRAG) == FrameDescriptor::BEGIN_FRAG)
{
auto nextOffset = BitUtil::align(
offset + length + DataFrameHeader::LENGTH, FrameDescriptor::FRAME_ALIGNMENT);
m_builder.reset().append(buffer, offset, length, header).nextTermOffset(nextOffset);
}
else
{
const std::uint32_t limit = m_builder.limit();

if (offset == m_builder.nextTermOffset())
{
m_builder.append(buffer, offset, length, header);
auto nextOffset = BitUtil::align(
offset + length + DataFrameHeader::LENGTH, FrameDescriptor::FRAME_ALIGNMENT);
m_builder.reset().append(buffer, offset, length, header).nextTermOffset(nextOffset);
}
else if (offset == m_builder.nextTermOffset())
{
const std::uint32_t limit = m_builder.limit();
m_builder.append(buffer, offset, length, header);

if ((flags & FrameDescriptor::END_FRAG) == FrameDescriptor::END_FRAG)
{
util::index_t msgLength =
static_cast<util::index_t>(m_builder.limit()) - DataFrameHeader::LENGTH;
AtomicBuffer msgBuffer(m_builder.buffer(), m_builder.limit());
if ((flags & FrameDescriptor::END_FRAG) == FrameDescriptor::END_FRAG)
{
util::index_t msgLength =
static_cast<util::index_t>(m_builder.limit()) - DataFrameHeader::LENGTH;
AtomicBuffer msgBuffer(m_builder.buffer(), m_builder.limit());

action = m_delegate(msgBuffer, DataFrameHeader::LENGTH, msgLength, header);
action = m_delegate(msgBuffer, DataFrameHeader::LENGTH, msgLength, header);

if (ControlledPollAction::ABORT == action)
{
m_builder.limit(limit);
}
else
{
m_builder.reset();
}
}
else
{
auto nextOffset = BitUtil::align(
offset + length + DataFrameHeader::LENGTH, FrameDescriptor::FRAME_ALIGNMENT);
m_builder.nextTermOffset(nextOffset);
}
if (ControlledPollAction::ABORT == action)
{
m_builder.limit(limit);
}
else
{
m_builder.reset();
}
}
else
{
auto nextOffset = BitUtil::align(
offset + length + DataFrameHeader::LENGTH, FrameDescriptor::FRAME_ALIGNMENT);
m_builder.nextTermOffset(nextOffset);
}
}
else
{
m_builder.reset();
}

return action;
Expand Down
54 changes: 24 additions & 30 deletions aeron-client/src/main/cpp/ImageFragmentAssembler.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,43 +79,37 @@ class ImageFragmentAssembler
{
m_delegate(buffer, offset, length, header);
}
else
else if ((flags & FrameDescriptor::BEGIN_FRAG) == FrameDescriptor::BEGIN_FRAG)
{
auto nextOffset = BitUtil::align(
offset + length + DataFrameHeader::LENGTH, FrameDescriptor::FRAME_ALIGNMENT);
m_builder.reset().append(buffer, offset, length, header).nextTermOffset(nextOffset);
}
else if (m_builder.nextTermOffset() == offset)
{
if ((flags & FrameDescriptor::BEGIN_FRAG) == FrameDescriptor::BEGIN_FRAG)
m_builder.append(buffer, offset, length, header);

if ((flags & FrameDescriptor::END_FRAG) == FrameDescriptor::END_FRAG)
{
auto nextOffset = BitUtil::align(
offset + length + DataFrameHeader::LENGTH, FrameDescriptor::FRAME_ALIGNMENT);
m_builder.reset().append(buffer, offset, length, header).nextTermOffset(nextOffset);
util::index_t msgLength =
static_cast<util::index_t>(m_builder.limit()) - DataFrameHeader::LENGTH;
AtomicBuffer msgBuffer(m_builder.buffer(), m_builder.limit());

m_delegate(msgBuffer, DataFrameHeader::LENGTH, msgLength, header);

m_builder.reset();
}
else
{
if (m_builder.nextTermOffset() == offset)
{
m_builder.append(buffer, offset, length, header);

if ((flags & FrameDescriptor::END_FRAG) == FrameDescriptor::END_FRAG)
{
util::index_t msgLength =
static_cast<util::index_t>(m_builder.limit()) - DataFrameHeader::LENGTH;
AtomicBuffer msgBuffer(m_builder.buffer(), m_builder.limit());

m_delegate(msgBuffer, DataFrameHeader::LENGTH, msgLength, header);

m_builder.reset();
}
else
{
auto nextOffset = BitUtil::align(
offset + length + DataFrameHeader::LENGTH, FrameDescriptor::FRAME_ALIGNMENT);
m_builder.nextTermOffset(nextOffset);
}
}
else
{
m_builder.reset();
}
auto nextOffset = BitUtil::align(
offset + length + DataFrameHeader::LENGTH, FrameDescriptor::FRAME_ALIGNMENT);
m_builder.nextTermOffset(nextOffset);
}
}
else
{
m_builder.reset();
}
}
};

Expand Down
Loading

0 comments on commit 2ce48ae

Please sign in to comment.