Skip to content

Commit

Permalink
[C++] Remove ExclusiveTermAppender and fold functionality into Exclus…
Browse files Browse the repository at this point in the history
…ivePublication.
  • Loading branch information
mjpt777 committed Oct 20, 2022
1 parent 543bb1a commit c5fe3c6
Show file tree
Hide file tree
Showing 4 changed files with 294 additions and 377 deletions.
1 change: 0 additions & 1 deletion aeron-client/src/main/cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@ SET(HEADERS
${CMAKE_CURRENT_SOURCE_DIR}/concurrent/logbuffer/TermBlockScanner.h
${CMAKE_CURRENT_SOURCE_DIR}/concurrent/logbuffer/TermReader.h
${CMAKE_CURRENT_SOURCE_DIR}/concurrent/logbuffer/TermScanner.h
${CMAKE_CURRENT_SOURCE_DIR}/concurrent/logbuffer/ExclusiveTermAppender.h
${CMAKE_CURRENT_SOURCE_DIR}/concurrent/ringbuffer/ManyToOneRingBuffer.h
${CMAKE_CURRENT_SOURCE_DIR}/concurrent/ringbuffer/RecordDescriptor.h
${CMAKE_CURRENT_SOURCE_DIR}/concurrent/ringbuffer/RingBufferDescriptor.h
Expand Down
16 changes: 3 additions & 13 deletions aeron-client/src/main/cpp/ExclusivePublication.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,19 +50,9 @@ ExclusivePublication::ExclusivePublication(
m_logBuffers(std::move(logBuffers)),
m_headerWriter(LogBufferDescriptor::defaultFrameHeader(m_logMetaDataBuffer))
{
for (int i = 0; i < LogBufferDescriptor::PARTITION_COUNT; i++)
{
/*
* Perhaps allow copy-construction and be able to move appenders and AtomicBuffers directly into Publication
* for locality.
*/
m_appenders[i] = std::unique_ptr<ExclusiveTermAppender>(new ExclusiveTermAppender(
m_logBuffers->atomicBuffer(i),
m_logBuffers->atomicBuffer(LogBufferDescriptor::LOG_META_DATA_SECTION_INDEX),
i));
}
const util::index_t tailCounterOffset = LogBufferDescriptor::tailCounterOffset(m_activePartitionIndex);
const std::int64_t rawTail = m_logMetaDataBuffer.getInt64Volatile(tailCounterOffset);

const std::int64_t rawTail = m_appenders[m_activePartitionIndex]->rawTail();
m_termId = LogBufferDescriptor::termId(rawTail);
m_termOffset = LogBufferDescriptor::termOffset(rawTail, m_logBuffers->atomicBuffer(0).capacity());
m_termBeginPosition = LogBufferDescriptor::computeTermBeginPosition(
Expand Down Expand Up @@ -112,7 +102,7 @@ std::int64_t ExclusivePublication::channelStatus() const

std::vector<std::string> ExclusivePublication::localSocketAddresses() const
{
return LocalSocketAddressStatus::findAddresses(m_conductor.countersReader(), channelStatus(), channelStatusId());
return LocalSocketAddressStatus::findAddresses(m_conductor.countersReader(), channelStatus(), m_channelStatusId);
}

}
Loading

0 comments on commit c5fe3c6

Please sign in to comment.