Skip to content

Commit

Permalink
Add replay statistics for journal startup
Browse files Browse the repository at this point in the history
This change adds some new logging messages to the master while
replaying the journal. Specifically, it will now log the rate of
journal entries applied to the state machine during the startup process.
The number of entries in a particular interval are logged along with the
current commit index and the expected entries remaining and estimated
time left.

In the embedded journal Instead, we use the Ratis "Commit Index"
which represents batches of journal entries applied to the raft log. We
are
able to get the last commit index as well as track the most recently
applied index which gives us the ability to provide the estimated time
remaining. The last SN is more readily available in the UFSJournal
since we control that implementation.

A new function for the UfsJournalReader needed to be added in order to
find the highest sequence number is within a particular journal. This
allows us to get the estimated entries remaining and time left during
the replay. The method should take O(N) with the number of journal
log files. It does not read data from the log files. In the case where
there is an error determining the highest SN, an empty optional is
returned in which case the information about expected finish time and
number of entries remaining is omitted.

There is a case where an error occurs when retrieving the final commit
index. In this case, the optional variable with the index will not be
populated and the estimated time remaining statistics will not be
calculated, only the measured rate of entries being applied.

pr-link: Alluxio#13107
change-id: cid-1c072dd3380c5316fbe63a3fcd98c2a5c11c8416
  • Loading branch information
ZacBlanco authored Apr 6, 2021
1 parent 6231603 commit a067b94
Show file tree
Hide file tree
Showing 7 changed files with 303 additions and 8 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* The Alluxio Open Foundation licenses this work under the Apache License, version 2.0
* (the "License"). You may not use this work except in compliance with the License, which is
* available at www.apache.org/licenses/LICENSE-2.0
*
* This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied, as more fully set forth in the License.
*
* See the NOTICE file distributed with this work for information regarding copyright ownership.
*/

package alluxio.master.journal;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.OptionalLong;
import java.util.StringJoiner;

/**
* Class to abstract out progress logging for journal replay.
*/
public abstract class AbstractJournalProgressLogger {
private static final Logger LOG = LoggerFactory.getLogger(AbstractJournalProgressLogger.class);

// could have been configurable. decided it is not really necessary.
/** Max time to wait before actually logging. */
public static final long MAX_LOG_INTERVAL_MS = 30_000;

private long mLastMeasuredTime;
private long mLastCommitIdx;
private long mLogCount;

private final OptionalLong mEndCommitIdx;

/**
* Creates a new instance of {@link AbstractJournalProgressLogger}.
*
* @param endComitIdx the final commit index in the journal. Used to estimate completion time
*/
public AbstractJournalProgressLogger(OptionalLong endComitIdx) {
mEndCommitIdx = endComitIdx;
mLastMeasuredTime = System.currentTimeMillis();
mLastCommitIdx = 0L;
mLogCount = 0;
}

/**
* @return the last applied commit index to a journal
*/
public abstract long getLastAppliedIndex();

/**
* @return the name of the journal
*/
public abstract String getJournalName();

/**
* Logs the progress of journal replay.
*
* This method rate limits itself on when it actually calculates and logs the message. If it is
* called too frequently, then it will essentially be a no-op. The return value indicates whether
* a message was logged or not as a result of calling the method.
*
* @return true is a message is logged, false otherwise
*/
public boolean logProgress() {
long now = System.currentTimeMillis();
long nextLogTime =
1000L * Math.min(1L << (mLogCount > 30 ? 30 : mLogCount), MAX_LOG_INTERVAL_MS);
// Exit early if log is called too fast.
if ((now - mLastMeasuredTime) < nextLogTime) {
return false;
}
long currCommitIdx = getLastAppliedIndex();
long timeSinceLastMeasure = (now - mLastMeasuredTime);
long commitIdxRead = currCommitIdx - mLastCommitIdx;

double commitIdxRateMs = ((double) commitIdxRead) / timeSinceLastMeasure;
StringJoiner logMsg = new StringJoiner("|");
logMsg.add(getJournalName());
logMsg.add(String.format("current SN: %d", currCommitIdx));
logMsg.add(String.format("entries in last %dms=%d", timeSinceLastMeasure, commitIdxRead));
if (mEndCommitIdx.isPresent()) {
long commitsRemaining = mEndCommitIdx.getAsLong() - currCommitIdx;
double expectedTimeRemaining = ((double) commitsRemaining) / commitIdxRateMs;
if (commitsRemaining > 0) {
logMsg.add(String.format("est. commits left: %d", commitsRemaining));
}
if (!Double.isNaN(expectedTimeRemaining) && !Double.isInfinite(expectedTimeRemaining)
&& expectedTimeRemaining > 0) {
logMsg.add(String.format("est. time remaining: %.2fms", expectedTimeRemaining));
}
}
mLogCount++;
LOG.info(logMsg.toString());
mLastMeasuredTime = now;
mLastCommitIdx = currCommitIdx;
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -615,6 +615,13 @@ public long getLastPrimaryStartSequenceNumber() {
return mLastPrimaryStartSequenceNumber;
}

/**
* @return the last raft log index which was applied to the state machine
*/
public long getLastAppliedCommitIndex() {
return mLastAppliedCommitIndex;
}

/**
* @return whether the state machine is in the process of taking a snapshot
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* The Alluxio Open Foundation licenses this work under the Apache License, version 2.0
* (the "License"). You may not use this work except in compliance with the License, which is
* available at www.apache.org/licenses/LICENSE-2.0
*
* This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied, as more fully set forth in the License.
*
* See the NOTICE file distributed with this work for information regarding copyright ownership.
*/

package alluxio.master.journal.raft;

import alluxio.master.journal.AbstractJournalProgressLogger;

import java.util.OptionalLong;

/**
* Logs journal replay progress for the {@link RaftJournalSystem}.
*/
public class RaftJournalProgressLogger extends AbstractJournalProgressLogger {

private final JournalStateMachine mJournal;

/**
* Creates a new instance of the logger.
*
* @param journal the journal being replayed
* @param endCommitIdx the final commit index in the log to estimate completion times
*/
public RaftJournalProgressLogger(JournalStateMachine journal, OptionalLong endCommitIdx) {
super(endCommitIdx);
mJournal = journal;
}

@Override
public long getLastAppliedIndex() {
return mJournal.getLastAppliedCommitIndex();
}

@Override
public String getJournalName() {
return "embedded journal";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.GroupInfoReply;
import org.apache.ratis.protocol.GroupInfoRequest;
import org.apache.ratis.protocol.LeaderNotReadyException;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftClientRequest;
Expand Down Expand Up @@ -80,6 +81,8 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -559,6 +562,8 @@ public synchronized Map<ServiceType, GrpcService> getJournalServices() {
return services;
}

private static final long JOURNAL_STAT_LOG_MAX_INTERVAL_MS = 30000L;

/**
* Attempts to catch up. If the master loses leadership during this method, it will return early.
*
Expand All @@ -572,6 +577,30 @@ private void catchUp(JournalStateMachine stateMachine, LocalFirstRaftClient clie
// Wait for any outstanding snapshot to complete.
CommonUtils.waitFor("snapshotting to finish", () -> !stateMachine.isSnapshotting(),
WaitForOptions.defaults().setTimeoutMs(10 * Constants.MINUTE_MS));
OptionalLong endCommitIndex = OptionalLong.empty();
try {
// raft peer IDs are unique, so there should really only ever be one result.
// If for some reason there is more than one..it should be fine as it only
// affects the completion time estimate in the logs.
synchronized (this) { // synchronized to appease findbugs; shouldn't make any difference
RaftPeerId serverId = mServer.getId();
Optional<RaftProtos.CommitInfoProto> commitInfo = getGroupInfo().getCommitInfos().stream()
.filter(commit -> serverId.equals(RaftPeerId.valueOf(commit.getServer().getId())))
.findFirst();
if (commitInfo.isPresent()) {
endCommitIndex = OptionalLong.of(commitInfo.get().getCommitIndex());
} else {
throw new IOException("Commit info was not present. Couldn't find the current server's "
+ "latest commit");
}
}
} catch (IOException e) {
LogUtils.warnWithException(LOG, "Failed to get raft log information before replay."
+ " Replay statistics will not be available", e);
}

RaftJournalProgressLogger progressLogger =
new RaftJournalProgressLogger(mStateMachine, endCommitIndex);

// Loop until we lose leadership or convince ourselves that we are caught up and we are the only
// master serving. To convince ourselves of this, we need to accomplish three steps:
Expand Down Expand Up @@ -604,8 +633,14 @@ private void catchUp(JournalStateMachine stateMachine, LocalFirstRaftClient clie
} catch (TimeoutException | ExecutionException | IOException e) {
ex = e;
}

if (ex != null) {
LOG.info("Exception submitting term start entry: {}", ex.toString());
// LeaderNotReadyException typically indicates Ratis is still replaying the journal.
if (ex instanceof LeaderNotReadyException) {
progressLogger.logProgress();
} else {
LOG.info("Exception submitting term start entry: {}", ex.toString());
}
// avoid excessive retries when server is not ready
Thread.sleep(waitBeforeRetry);
continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import alluxio.master.journal.JournalUtils;
import alluxio.master.journal.sink.JournalSink;
import alluxio.proto.journal.Journal.JournalEntry;
import alluxio.retry.ExponentialBackoffRetry;
import alluxio.util.CommonUtils;
import alluxio.util.ExceptionUtils;

Expand All @@ -27,6 +28,7 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.OptionalLong;
import java.util.Set;
import java.util.function.Supplier;

Expand Down Expand Up @@ -82,6 +84,9 @@ public final class UfsJournalCheckpointThread extends Thread {
/** A supplier of journal sinks for this journal. */
private final Supplier<Set<JournalSink>> mJournalSinks;

/** The last sequence number applied to the journal. */
private volatile long mLastAppliedSN;

/**
* The state of the journal catchup.
*/
Expand Down Expand Up @@ -168,12 +173,34 @@ public long getNextSequenceNumber() {

@Override
public void run() {
// Start a new thread which tracks the journal replay statistics
ExponentialBackoffRetry retry = new ExponentialBackoffRetry(1000, 30000, Integer.MAX_VALUE);
long start = System.currentTimeMillis();
final OptionalLong finalSN = UfsJournalReader.getLastSN(mJournal);
Thread t = new Thread(() -> {
UfsJournalProgressLogger progressLogger =
new UfsJournalProgressLogger(mJournal, finalSN, () -> mLastAppliedSN);
while (!Thread.currentThread().isInterrupted() && retry.attempt()) {
// log current stats
progressLogger.logProgress();
}
});
try {
t.start();
runInternal();
} catch (Throwable e) {
t.interrupt();
ProcessUtils.fatalError(LOG, e, "%s: Failed to run journal checkpoint thread, crashing.",
mMaster.getName());
System.exit(-1);
} finally {
t.interrupt();
try {
t.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.warn("interrupted while waiting for journal stats thread to shut down.");
}
}
}

Expand Down Expand Up @@ -213,6 +240,7 @@ private void runInternal() {
LOG.info("Quiet period interrupted by new journal entry");
quietPeriodWaited = false;
}
mLastAppliedSN = entry.getSequenceNumber();
break;
default:
mCatchupState = CatchupState.DONE;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* The Alluxio Open Foundation licenses this work under the Apache License, version 2.0
* (the "License"). You may not use this work except in compliance with the License, which is
* available at www.apache.org/licenses/LICENSE-2.0
*
* This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied, as more fully set forth in the License.
*
* See the NOTICE file distributed with this work for information regarding copyright ownership.
*/

package alluxio.master.journal.ufs;

import alluxio.master.journal.AbstractJournalProgressLogger;

import java.util.OptionalLong;
import java.util.function.Supplier;

/**
* Journal replay logger for the {@link UfsJournal}.
*/
public class UfsJournalProgressLogger extends AbstractJournalProgressLogger {

private final UfsJournal mJournal;
private final Supplier<Long> mCommitSupplier;

/**
* Creates a new instance of the journal replay progress logger.
*
* @param journal the journal being replayed
* @param endCommitIdx the final commit index (sequence number) in the journal
* @param lastIndexSupplier supplier that gives the last applied commit (sequence number)
*/
public UfsJournalProgressLogger(UfsJournal journal, OptionalLong endCommitIdx,
Supplier<Long> lastIndexSupplier) {
super(endCommitIdx);
mJournal = journal;
mCommitSupplier = lastIndexSupplier;
}

@Override
public long getLastAppliedIndex() {
return mCommitSupplier.get();
}

@Override
public String getJournalName() {
return mJournal.toString();
}
}
Loading

0 comments on commit a067b94

Please sign in to comment.