Skip to content

Commit

Permalink
[Java] Tidy up after merge of PR aeron-io#1444.
Browse files Browse the repository at this point in the history
  • Loading branch information
mjpt777 committed Apr 11, 2023
1 parent 63028d0 commit d437a5b
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 87 deletions.
69 changes: 36 additions & 33 deletions aeron-archive/src/main/java/io/aeron/archive/ArchiveConductor.java
Original file line number Diff line number Diff line change
Expand Up @@ -1340,44 +1340,38 @@ void migrateSegments(
if (hasRecording(srcRecordingId, correlationId, controlSession) &&
hasRecording(dstRecordingId, correlationId, controlSession))
{
final RecordingSummary srcRecordingSummary =
catalog.recordingSummary(srcRecordingId, recordingSummary);
final RecordingSummary srcSummary = catalog.recordingSummary(srcRecordingId, recordingSummary);
final RecordingSummary dstSummary = catalog.recordingSummary(dstRecordingId, new RecordingSummary());

final RecordingSummary dstRecordingSummary =
catalog.recordingSummary(dstRecordingId, new RecordingSummary());

if (isActiveRecording(controlSession, correlationId, recordingSummary) ||
!hasMatchingStreamParameters(controlSession, correlationId, srcRecordingSummary, dstRecordingSummary))
if (isActiveRecording(controlSession, correlationId, srcSummary) ||
!hasMatchingStreamParameters(controlSession, correlationId, srcSummary, dstSummary))
{
return;
}

final boolean canPrepend = srcRecordingSummary.stopPosition == dstRecordingSummary.startPosition;
final boolean canAppend = srcRecordingSummary.startPosition == dstRecordingSummary.stopPosition;

final long seamPosition;
final long joinPosition;

if (canPrepend)
if (srcSummary.stopPosition == dstSummary.startPosition)
{
seamPosition = srcRecordingSummary.stopPosition;
joinPosition = srcSummary.stopPosition;
}
else if (canAppend)
else if (srcSummary.startPosition == dstSummary.stopPosition)
{
seamPosition = srcRecordingSummary.startPosition;
joinPosition = srcSummary.startPosition;
}
else
{
final String msg = "invalid migrate: src and dst are not contiguous" +
" srcStartPosition=" + srcRecordingSummary.startPosition +
" srcStopPosition=" + srcRecordingSummary.stopPosition +
" dstStartPosition=" + dstRecordingSummary.startPosition +
" dstStopPosition=" + dstRecordingSummary.stopPosition;
" srcStartPosition=" + srcSummary.startPosition +
" srcStopPosition=" + srcSummary.stopPosition +
" dstStartPosition=" + dstSummary.startPosition +
" dstStopPosition=" + dstSummary.stopPosition;
controlSession.sendErrorResponse(correlationId, msg, controlResponseProxy);
return;
}

if (isSeamPosSegmentUnaligned(controlSession, correlationId, "src", srcRecordingSummary, seamPosition) ||
isSeamPosSegmentUnaligned(controlSession, correlationId, "dst", dstRecordingSummary, seamPosition))
if (isJoinPositionSegmentUnaligned(controlSession, correlationId, "src", srcSummary, joinPosition) ||
isJoinPositionSegmentUnaligned(controlSession, correlationId, "dst", dstSummary, joinPosition))
{
return;
}
Expand All @@ -1389,31 +1383,31 @@ else if (canAppend)
correlationId,
srcRecordingId,
dstRecordingId,
srcRecordingSummary,
srcSummary,
emptyFollowingSrcSegment);

if (movedSegmentCount >= 0)
{
final int toBeDeletedSegmentCount =
addDeleteSegmentsSession(correlationId, srcRecordingId, controlSession, emptyFollowingSrcSegment);
final int toBeDeletedSegmentCount = addDeleteSegmentsSession(
correlationId, srcRecordingId, controlSession, emptyFollowingSrcSegment);

if (toBeDeletedSegmentCount >= 0)
{
if (canPrepend)
if (srcSummary.stopPosition == dstSummary.startPosition)
{
catalog.startPosition(dstRecordingId, srcRecordingSummary.startPosition);
catalog.startPosition(dstRecordingId, srcSummary.startPosition);
}
else
{
catalog.stopPosition(dstRecordingId, srcRecordingSummary.stopPosition);
catalog.stopPosition(dstRecordingId, srcSummary.stopPosition);
}

catalog.stopPosition(srcRecordingId, srcRecordingSummary.startPosition);
catalog.stopPosition(srcRecordingId, srcSummary.startPosition);

controlSession.sendOkResponse(correlationId, movedSegmentCount, controlResponseProxy);

final boolean willDeleteSegmentLater = toBeDeletedSegmentCount > 0;
if (movedSegmentCount > 0 && !willDeleteSegmentLater)
final boolean hasSegmentsToDelete = toBeDeletedSegmentCount > 0;
if (movedSegmentCount > 0 && !hasSegmentsToDelete)
{
controlSession.sendSignal(
correlationId, srcRecordingId, Aeron.NULL_VALUE, Aeron.NULL_VALUE, RecordingSignal.DELETE);
Expand Down Expand Up @@ -2035,7 +2029,7 @@ private boolean isValidDetach(
return true;
}

private boolean isSeamPosSegmentUnaligned(
private boolean isJoinPositionSegmentUnaligned(
final ControlSession controlSession,
final long correlationId,
final String label,
Expand All @@ -2051,7 +2045,7 @@ private boolean isSeamPosSegmentUnaligned(

if (segmentBasePosition != seamPosition)
{
final String error = "invalid migrate: seam position is not on segment boundary of " +
final String error = "invalid migrate: join position is not on segment boundary of " +
label + " recording" +
" seamPosition=" + seamPosition +
" startPosition=" + recordingSummary.startPosition +
Expand Down Expand Up @@ -2212,6 +2206,7 @@ private long moveAllSegments(
attachedSegmentCount++;
}
}

return attachedSegmentCount;
}

Expand Down Expand Up @@ -2241,7 +2236,15 @@ private boolean eraseRemainingSegment(

channel.truncate(segmentOffset);
dataBuffer.byteBuffer().put(0, (byte)0).limit(1).position(0);
channel.write(dataBuffer.byteBuffer(), segmentLength - 1);

while (true)
{
final int written = channel.write(dataBuffer.byteBuffer(), segmentLength - 1);
if (1 == written)
{
break;
}
}
}
catch (final IOException ex)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -525,7 +525,7 @@ public enum VerifyOption
VERIFY_ALL_SEGMENT_FILES("-a"),

/**
* Perform checksum for each data frame within a segment file being verify.
* Perform checksum for each data frame within a segment file being verified.
*/
APPLY_CHECKSUM("-checksum");

Expand Down
4 changes: 2 additions & 2 deletions aeron-client/src/main/java/io/aeron/Aeron.java
Original file line number Diff line number Diff line change
Expand Up @@ -1112,8 +1112,8 @@ public Context awaitingIdleStrategy(final IdleStrategy idleStrategy)
/**
* The {@link IdleStrategy} to be used when awaiting a response from the Media Driver.
* <p>
* This can be change to a {@link BusySpinIdleStrategy} or {@link YieldingIdleStrategy} for lower response time,
* especially for adding counters or releasing resources, at the expense of CPU usage.
* This can be changed to a {@link BusySpinIdleStrategy} or {@link YieldingIdleStrategy} for lower response
* time, especially for adding counters or releasing resources, at the expense of CPU usage.
*
* @return the {@link IdleStrategy} to be used when awaiting a response from the Media Driver.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public final class ReceiveChannelEndpointThreadLocals
private long nextReceiverId;

/**
* Construct a set of local state to be use by the receiver thread.
* Construct a set of local state to be used by the receiver thread.
*/
public ReceiveChannelEndpointThreadLocals()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@
import static java.util.Arrays.asList;
import static java.util.Arrays.copyOfRange;
import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.Mockito.*;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;

Expand Down Expand Up @@ -175,8 +175,7 @@ void truncateRecordingShouldDeleteAllFilesWhenTruncatingToTheStartOfTheRecording
assertEquals(stopPosition, aeronArchive.getStopPosition(recordingId));

final Path archiveDir = archive.context().archiveDir().toPath();
final ArrayList<String> segmentFiles =
Catalog.listSegmentFiles(archiveDir.toFile(), recordingId);
final ArrayList<String> segmentFiles = Catalog.listSegmentFiles(archiveDir.toFile(), recordingId);
segmentFiles.sort(Comparator.naturalOrder());
assertEquals(asList(
recordingId + "-1048576.rec",
Expand All @@ -199,8 +198,8 @@ void truncateRecordingShouldDeleteAllFilesWhenTruncatingToTheStartOfTheRecording
deleteList.add(Files.createFile(archiveDir.resolve(
recordingId + "-" + archive.context().segmentFileLength() + RECORDING_SEGMENT_SUFFIX)));

final Path otherFile =
Files.createFile(archiveDir.resolve((recordingId + 1) + "-1179648" + RECORDING_SEGMENT_SUFFIX));
final Path otherFile = Files.createFile(
archiveDir.resolve((recordingId + 1) + "-1179648" + RECORDING_SEGMENT_SUFFIX));
final Path otherFile2 = Files.createFile(archiveDir.resolve("something-else.txt"));

recordingSignalConsumer.reset();
Expand Down Expand Up @@ -255,8 +254,7 @@ void truncateRecordingShouldDeleteAllFilesWhenTruncatingToZero() throws IOExcept
assertEquals(stopPosition, aeronArchive.getStopPosition(recordingId));

final Path archiveDir = archive.context().archiveDir().toPath();
final ArrayList<String> segmentFiles =
Catalog.listSegmentFiles(archiveDir.toFile(), recordingId);
final ArrayList<String> segmentFiles = Catalog.listSegmentFiles(archiveDir.toFile(), recordingId);
segmentFiles.sort(Comparator.naturalOrder());
assertEquals(
asList(recordingId + "-0.rec", recordingId + "-262144.rec", recordingId + "-524288.rec"),
Expand All @@ -266,8 +264,7 @@ void truncateRecordingShouldDeleteAllFilesWhenTruncatingToZero() throws IOExcept
for (final String segmentFileName : segmentFiles)
{
final Path file = archiveDir.resolve(segmentFileName);
final Path renamed = Files.move(
file, archiveDir.resolve(segmentFileName + ".del"));
final Path renamed = Files.move(file, archiveDir.resolve(segmentFileName + ".del"));
assertTrue(Files.exists(renamed));
deleteList.add(renamed);
}
Expand Down Expand Up @@ -373,8 +370,7 @@ void truncateRecordingShouldDeleteSegmentFilesPastPositionAndEraseAlreadyWritten
final byte[] startFileData = Files.readAllBytes(startFile);
final byte[] truncatedFileData = Files.readAllBytes(truncatedFile);

final ArrayList<String> segmentFiles =
Catalog.listSegmentFiles(archiveDir.toFile(), recordingId);
final ArrayList<String> segmentFiles = Catalog.listSegmentFiles(archiveDir.toFile(), recordingId);
segmentFiles.sort(Comparator.naturalOrder());
assertEquals(Arrays.asList(
truncatedFile.getFileName().toString(),
Expand Down
Loading

0 comments on commit d437a5b

Please sign in to comment.