Skip to content

Commit

Permalink
Cluster relocatable mark files (aeron-io#1485)
Browse files Browse the repository at this point in the history
* [Java] Start adding relocation of mark files for ConsensusModule and ClusteredServiceContainer.

* [Java] Add mark file relocation to cluster backup.

* [Java] Close contexts within test to prevent windows test failures.

* [Java] Update ClusterTool to support relocated cluster mark files.
  • Loading branch information
mikeb01 authored Jul 25, 2023
1 parent 884f1cd commit 905ef4f
Show file tree
Hide file tree
Showing 12 changed files with 529 additions and 79 deletions.
59 changes: 4 additions & 55 deletions aeron-archive/src/main/java/io/aeron/archive/Archive.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,6 @@
import java.nio.channels.FileChannel;
import java.nio.file.FileStore;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadFactory;
Expand Down Expand Up @@ -1138,7 +1136,10 @@ public void conclude()
markFile = new ArchiveMarkFile(this);
}

ensureMarkFileLink(archiveDir, markFile);
MarkFile.ensureMarkFileLink(
archiveDir,
new File(markFile.parentDirectory(), ArchiveMarkFile.FILENAME),
ArchiveMarkFile.LINK_FILENAME);

errorHandler = CommonContext.setupErrorHandler(
errorHandler, new DistinctErrorLog(markFile.errorBuffer(), epochClock, US_ASCII));
Expand Down Expand Up @@ -3479,58 +3480,6 @@ private void concludeArchiveId()
}
}

private static void ensureMarkFileLink(final File archiveDir, final ArchiveMarkFile markFile)
{
final String archiveDirPath;
final String markFileParentPath;
try
{
archiveDirPath = archiveDir.getCanonicalPath();
}
catch (final IOException ex)
{
throw new ConfigurationException("failed to resolve canonical path for archiveDir=" + archiveDir);
}
try
{
markFileParentPath = markFile.parentDirectory().getCanonicalPath();
}
catch (final IOException ex)
{
throw new ConfigurationException(
"failed to resolve canonical path for markFile parent dir=" + archiveDir);
}

final Path linkFile = new File(archiveDirPath, ArchiveMarkFile.LINK_FILENAME).toPath();
if (archiveDirPath.equals(markFileParentPath))
{
try
{
Files.deleteIfExists(linkFile);
}
catch (final IOException ex)
{
throw new RuntimeException("failed to remove old link file", ex);
}
}
else
{
try
{
Files.write(
linkFile,
markFileParentPath.getBytes(US_ASCII),
StandardOpenOption.CREATE,
StandardOpenOption.WRITE,
StandardOpenOption.TRUNCATE_EXISTING);
}
catch (final IOException ex)
{
throw new RuntimeException("failed to create link for mark file directory", ex);
}
}
}

/**
* {@inheritDoc}
*/
Expand Down
49 changes: 48 additions & 1 deletion aeron-cluster/src/main/java/io/aeron/cluster/ClusterBackup.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import io.aeron.*;
import io.aeron.archive.client.AeronArchive;
import io.aeron.archive.client.ArchiveException;
import io.aeron.cluster.client.AeronCluster;
import io.aeron.cluster.client.ClusterException;
import io.aeron.cluster.codecs.mark.ClusterComponentType;
Expand All @@ -31,6 +32,8 @@
import org.agrona.ErrorHandler;
import org.agrona.ExpandableArrayBuffer;
import org.agrona.IoUtil;
import org.agrona.MarkFile;
import org.agrona.Strings;
import org.agrona.concurrent.*;
import org.agrona.concurrent.errors.DistinctErrorLog;
import org.agrona.concurrent.status.AtomicCounter;
Expand Down Expand Up @@ -523,6 +526,7 @@ public static class Context implements Cloneable
private boolean useAgentInvoker = false;
private String clusterDirectoryName = ClusteredServiceContainer.Configuration.clusterDirName();
private File clusterDir;
private File markFileDir;
private ClusterMarkFile markFile;
private String clusterConsensusEndpoints = ConsensusModule.Configuration.clusterConsensusEndpoints();
private ThreadFactory threadFactory;
Expand Down Expand Up @@ -601,6 +605,17 @@ public void conclude()
throw new ClusterException("failed to create cluster dir: " + clusterDir.getAbsolutePath());
}

if (null == markFileDir)
{
final String dir = ConsensusModule.Configuration.markFileDir();
markFileDir = Strings.isEmpty(dir) ? clusterDir : new File(dir);
}

if (!markFileDir.exists() && !markFileDir.mkdirs())
{
throw new ArchiveException("failed to create mark file dir: " + markFileDir.getAbsolutePath());
}

if (null == epochClock)
{
epochClock = SystemEpochClock.INSTANCE;
Expand All @@ -614,13 +629,18 @@ public void conclude()
if (null == markFile)
{
markFile = new ClusterMarkFile(
new File(clusterDir, ClusterMarkFile.FILENAME),
new File(markFileDir, ClusterMarkFile.FILENAME),
ClusterComponentType.BACKUP,
errorBufferLength,
epochClock,
LIVENESS_TIMEOUT_MS);
}

MarkFile.ensureMarkFileLink(
clusterDir,
new File(markFileDir, ClusterMarkFile.FILENAME),
ClusterMarkFile.LINK_FILENAME);

if (null == errorLog)
{
errorLog = new DistinctErrorLog(markFile.errorBuffer(), epochClock, US_ASCII);
Expand Down Expand Up @@ -931,6 +951,33 @@ public File clusterDir()
return clusterDir;
}

/**
* Get the directory in which the ClusterBackup will store mark file (i.e. {@code cluster-mark.dat}). It
* defaults to {@link #clusterDir()} if it is not set explicitly via the
* {@link ClusteredServiceContainer.Configuration#MARK_FILE_DIR_PROP_NAME}.
*
* @return the directory in which the ClusterBackup will store mark file (i.e. {@code cluster-mark.dat}).
* @see ClusteredServiceContainer.Configuration#MARK_FILE_DIR_PROP_NAME
* @see #clusterDir()
*/
public File markFileDir()
{
return markFileDir;
}

/**
* Set the directory in which the ClusterBackup will store mark file (i.e. {@code cluster-mark.dat}).
*
* @param markFileDir the directory in which the ClusterBackup will store mark file (i.e. {@code
* cluster-mark.dat}).
* @return this for a fluent API.
*/
public ClusterBackup.Context markFileDir(final File markFileDir)
{
this.markFileDir = markFileDir;
return this;
}

/**
* Set the {@link io.aeron.archive.client.AeronArchive.Context} used for communicating with the local Archive.
*
Expand Down
92 changes: 82 additions & 10 deletions aeron-cluster/src/main/java/io/aeron/cluster/ClusterTool.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.agrona.DirectBuffer;
import org.agrona.IoUtil;
import org.agrona.SystemUtil;
import org.agrona.collections.ArrayUtil;
import org.agrona.collections.MutableBoolean;
import org.agrona.collections.MutableLong;
import org.agrona.concurrent.AtomicBuffer;
Expand All @@ -50,13 +49,15 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Supplier;

import static io.aeron.Aeron.NULL_VALUE;
import static java.nio.ByteOrder.LITTLE_ENDIAN;
import static java.nio.charset.StandardCharsets.US_ASCII;
import static java.nio.file.StandardCopyOption.*;
import static java.nio.file.StandardOpenOption.CREATE_NEW;
import static java.nio.file.StandardOpenOption.WRITE;
Expand Down Expand Up @@ -456,7 +457,7 @@ public static void errors(final PrintStream out, final File clusterDir)
{
if (markFileExists(clusterDir) || TIMEOUT_MS > 0)
{
try (ClusterMarkFile markFile = openMarkFile(clusterDir, System.out::println))
try (ClusterMarkFile markFile = openMarkFile(clusterDir, out::println))
{
printTypeAndActivityTimestamp(out, markFile);
printErrors(out, markFile);
Expand Down Expand Up @@ -690,7 +691,8 @@ else if (ConsensusModule.Configuration.CONSENSUS_MODULE_STATE_TYPE_ID == typeId)
*/
public static boolean markFileExists(final File clusterDir)
{
final File markFile = new File(clusterDir, ClusterMarkFile.FILENAME);
final File markFileDir = resolveConsensusModuleMarkFileDir(clusterDir);
final File markFile = new File(markFileDir, ClusterMarkFile.FILENAME);

return markFile.exists();
}
Expand Down Expand Up @@ -1305,32 +1307,78 @@ static <T extends Enum<T>> boolean toggleState(

static ClusterMarkFile openMarkFile(final File clusterDir, final Consumer<String> logger)
{
return new ClusterMarkFile(clusterDir, ClusterMarkFile.FILENAME, System::currentTimeMillis, TIMEOUT_MS, logger);
final File markFileDir = resolveConsensusModuleMarkFileDir(clusterDir);
return new ClusterMarkFile(
markFileDir, ClusterMarkFile.FILENAME, System::currentTimeMillis, TIMEOUT_MS, logger);
}

private static ClusterMarkFile[] openServiceMarkFiles(final File clusterDir, final Consumer<String> logger)
{
String[] clusterMarkFileNames =
clusterDir.list((dir, name) ->
File[] clusterMarkFileNames =
clusterDir.listFiles((dir, name) ->
name.startsWith(ClusterMarkFile.SERVICE_FILENAME_PREFIX) &&
name.endsWith(ClusterMarkFile.FILE_EXTENSION));
(name.endsWith(ClusterMarkFile.FILE_EXTENSION) ||
name.endsWith(ClusterMarkFile.LINK_FILE_EXTENSION)));

if (null == clusterMarkFileNames)
{
clusterMarkFileNames = ArrayUtil.EMPTY_STRING_ARRAY;
clusterMarkFileNames = new File[0];
}

final ArrayList<File> resolvedMarkFileNames = new ArrayList<>();
resolveServiceMarkFileNames(clusterMarkFileNames, resolvedMarkFileNames);

final ClusterMarkFile[] clusterMarkFiles = new ClusterMarkFile[clusterMarkFileNames.length];

for (int i = 0, length = clusterMarkFiles.length; i < length; i++)
for (int i = 0, n = resolvedMarkFileNames.size(); i < n; i++)
{
final File resolvedMarkFile = resolvedMarkFileNames.get(i);
clusterMarkFiles[i] = new ClusterMarkFile(
clusterDir, clusterMarkFileNames[i], System::currentTimeMillis, TIMEOUT_MS, logger);
resolvedMarkFile.getParentFile(),
resolvedMarkFile.getName(),
System::currentTimeMillis,
TIMEOUT_MS,
logger);
}

return clusterMarkFiles;
}

private static void resolveServiceMarkFileNames(final File[] clusterMarkFiles, final ArrayList<File> resolvedFiles)
{
final HashSet<String> resolvedServices = new HashSet<>();

for (final File clusterMarkFile : clusterMarkFiles)
{
final String filename = clusterMarkFile.getName();
if (filename.endsWith(ClusterMarkFile.LINK_FILE_EXTENSION))
{
final String name = filename.substring(
0, filename.length() - ClusterMarkFile.LINK_FILE_EXTENSION.length());

final File markFileDir = resolveDirectoryFromLinkFile(clusterMarkFile);
resolvedFiles.add(new File(markFileDir, name + ClusterMarkFile.FILE_EXTENSION));
resolvedServices.add(name);
}
}

for (final File clusterMarkFile : clusterMarkFiles)
{
final String filename = clusterMarkFile.getName();
if (filename.endsWith(ClusterMarkFile.FILE_EXTENSION))
{
final String name = filename.substring(
0, filename.length() - ClusterMarkFile.FILE_EXTENSION.length());

if (!resolvedServices.contains(name))
{
resolvedFiles.add(clusterMarkFile);
resolvedServices.add(name);
}
}
}
}

private static void printTypeAndActivityTimestamp(final PrintStream out, final ClusterMarkFile markFile)
{
printTypeAndActivityTimestamp(
Expand Down Expand Up @@ -1437,6 +1485,30 @@ private static void updateRecordingLog(final File clusterDir, final List<Recordi
}
}

private static File resolveConsensusModuleMarkFileDir(final File clusterDir)
{
final File linkFile = new File(clusterDir, ClusterMarkFile.LINK_FILENAME);
return linkFile.exists() ? resolveDirectoryFromLinkFile(linkFile) : clusterDir;
}

private static File resolveDirectoryFromLinkFile(final File linkFile)
{
final File markFileDir;

try
{
final byte[] bytes = Files.readAllBytes(linkFile.toPath());
final String markFileDirPath = new String(bytes, US_ASCII).trim();
markFileDir = new File(markFileDirPath);
}
catch (final IOException ex)
{
throw new RuntimeException("failed to read link file=" + linkFile, ex);
}

return markFileDir;
}

static void exitWithErrorOnFailure(final boolean success)
{
if (!success)
Expand Down
Loading

0 comments on commit 905ef4f

Please sign in to comment.