Skip to content

Commit

Permalink
[Java] Tidy up after aeron-io#1661.
Browse files Browse the repository at this point in the history
  • Loading branch information
vyazelenko committed Sep 14, 2024
1 parent e98c69a commit 252ba1b
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 17 deletions.
46 changes: 30 additions & 16 deletions aeron-archive/src/main/java/io/aeron/archive/ArchiveTool.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package io.aeron.archive;

import io.aeron.Aeron;
import io.aeron.CncFileDescriptor;
import io.aeron.CommonContext;
import io.aeron.archive.checksum.Checksum;
Expand Down Expand Up @@ -306,7 +307,7 @@ else if (args.length >= 2 && "delete-orphaned-segments".equals(args[1]))
out.print("WARNING: All orphaned segment files will be deleted.");
if (readContinueAnswer("Continue? (y/n)"))
{
deleteOrphanedSegments(out, archiveDir, null);
deleteOrphanedSegments(out, archiveDir);
}
}
else
Expand Down Expand Up @@ -797,18 +798,30 @@ public static void compact(final PrintStream out, final File archiveDir)
}

/**
* Delete orphaned recording segments that have been detached, i.e. outside the start and stop recording range,
* but are not deleted.
* Delete orphaned recording segments that have been detached for all recordings, i.e. outside the start and stop
* recording range, but are not deleted.
*
* @param out stream to print results and errors to.
* @param archiveDir that contains {@link MarkFile}, {@link Catalog}, and recordings.
*/
public static void deleteOrphanedSegments(final PrintStream out, final File archiveDir)
{
deleteOrphanedSegments(out, archiveDir, INSTANCE, NULL_RECORD_ID);
}

/**
* Delete orphaned recording segments that have been detached, i.e. outside the start and stop recording range,
* but are not deleted.
*
* @param out stream to print results and errors to.
* @param archiveDir that contains {@link MarkFile}, {@link Catalog}, and recordings.
* @param targetRecordingId optional recordingId to delete orphaned segments for a specific recording.
* If null, delete orphaned segments for all recordings.
* If {@link Aeron#NULL_VALUE}, delete orphaned segments for all recordings.
*/
public static void deleteOrphanedSegments(
final PrintStream out,
final File archiveDir,
final Long targetRecordingId)
final long targetRecordingId)
{
deleteOrphanedSegments(out, archiveDir, INSTANCE, targetRecordingId);
}
Expand All @@ -817,30 +830,31 @@ static void deleteOrphanedSegments(
final PrintStream out,
final File archiveDir,
final EpochClock epochClock,
final Long targetRecordingId)
final long targetRecordingId)
{
try (Catalog catalog = openCatalogReadOnly(archiveDir, epochClock))
{
final Long2ObjectHashMap<List<String>> segmentFilesByRecordingId = indexSegmentFiles(archiveDir);

final CatalogEntryProcessor processor = (recordingDescriptorOffset,
final MutableBoolean found = new MutableBoolean(false);
catalog.forEach((recordingDescriptorOffset,
headerEncoder,
headerDecoder,
descriptorEncoder,
descriptorDecoder) ->
{
final long recordingId = descriptorDecoder.recordingId();
final List<String> files = segmentFilesByRecordingId.getOrDefault(recordingId, emptyList());
deleteOrphanedSegmentFiles(out, archiveDir, descriptorDecoder, files);
};
if (NULL_RECORD_ID == targetRecordingId || targetRecordingId == recordingId)
{
found.set(true);
final List<String> files = segmentFilesByRecordingId.getOrDefault(recordingId, emptyList());
deleteOrphanedSegmentFiles(out, archiveDir, descriptorDecoder, files);
}
});

if (targetRecordingId != null)
{
catalog.forEntry(targetRecordingId, processor);
}
else
if (NULL_RECORD_ID != targetRecordingId && !found.get())
{
catalog.forEach(processor);
throw new AeronException("no recording found with recordingId: " + targetRecordingId);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1409,7 +1409,7 @@ void deleteOrphanedSegmentsDeletesSegmentFilesForAllRecordings() throws IOExcept
final File file25 = createFile(segmentFileName(
rec2, segmentFileBasePosition(1_000_000, Long.MAX_VALUE, TERM_LENGTH, SEGMENT_LENGTH)));

deleteOrphanedSegments(out, archiveDir, epochClock, null);
deleteOrphanedSegments(out, archiveDir);

assertFileExists(file12, file13, file15, file17);
assertFileDoesNotExist(file11, file14, file16);
Expand All @@ -1418,6 +1418,57 @@ void deleteOrphanedSegmentsDeletesSegmentFilesForAllRecordings() throws IOExcept
assertFileDoesNotExist(file21, file25);
}

@ParameterizedTest
@EnumSource(RecordingState.class)
void deleteOrphanedSegmentsDeletesSegmentFilesOfTargetRecording(final RecordingState state) throws IOException
{
final long rec1;
final long rec2;
try (Catalog catalog = new Catalog(archiveDir, epochClock, 1024, true, null, null))
{
rec1 = catalog.addNewRecording(0, NULL_POSITION, NULL_TIMESTAMP, NULL_TIMESTAMP, 0,
SEGMENT_LENGTH, TERM_LENGTH, MTU_LENGTH, 42, 5, "some ch", "some ch", "rec1");

rec2 = catalog.addNewRecording(1_000_000, 1024 * 1024 * 1024, NULL_TIMESTAMP, NULL_TIMESTAMP, 0,
SEGMENT_LENGTH, TERM_LENGTH, MTU_LENGTH, 1, 1, "ch2", "ch2", "rec2");
assertTrue(catalog.changeState(rec2, state));
}

final File file11 = createFile(segmentFileName(rec1, -1));
final File file12 = createFile(segmentFileName(rec1, 0));
final File file13 = createFile(segmentFileName(rec1, Long.MAX_VALUE));
final File file14 = createFile(rec1 + "-will-be-deleted.rec");
final File file15 = createFile(rec1 + "-will-be-skipped.txt");
final File file16 = createFile(rec1 + "-.rec");
final File file17 = createFile(rec1 + "invalid_file_name.rec");

final File file21 = createFile(segmentFileName(rec2, 0));
final File file22 = createFile(segmentFileName(
rec2, segmentFileBasePosition(1_000_000, 1_000_000, TERM_LENGTH, SEGMENT_LENGTH)));
final File file23 = createFile(segmentFileName(
rec2, segmentFileBasePosition(1_000_000, 5_000_000, TERM_LENGTH, SEGMENT_LENGTH)));
final File file24 = createFile(segmentFileName(
rec2, segmentFileBasePosition(1_000_000, 1024 * 1024 * 1024, TERM_LENGTH, SEGMENT_LENGTH)));
final File file25 = createFile(segmentFileName(
rec2, segmentFileBasePosition(1_000_000, Long.MAX_VALUE, TERM_LENGTH, SEGMENT_LENGTH)));

deleteOrphanedSegments(out, archiveDir, epochClock, rec2);

assertFileExists(file12, file13, file15, file17, file11, file14, file16);

assertFileExists(file22, file23, file24);
assertFileDoesNotExist(file21, file25);
}

@Test
void deleteOrphanedSegmentsOfAnUnknownRecording()
{
final int recordingId = 55555555;
final AeronException exception =
assertThrowsExactly(AeronException.class, () -> deleteOrphanedSegments(out, archiveDir, recordingId));
assertEquals("ERROR - no recording found with recordingId: " + recordingId, exception.getMessage());
}

@Test
void markInvalidInvalidatesAnExistingRecording()
{
Expand Down

0 comments on commit 252ba1b

Please sign in to comment.