Skip to content

Commit

Permalink
HDFS-7686. Re-add rapid rescan of possibly corrupt block feature to t…
Browse files Browse the repository at this point in the history
…he block scanner (cmccabe)
  • Loading branch information
Colin Patrick Mccabe authored and zhe-thoughts committed Feb 16, 2015
1 parent 7ae0d42 commit 904786f
Show file tree
Hide file tree
Showing 5 changed files with 268 additions and 34 deletions.
3 changes: 3 additions & 0 deletions hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -959,6 +959,9 @@ Release 2.7.0 - UNRELEASED
HDFS-7776. Adding additional unit tests for Quota By Storage Type.
(Xiaoyu Yao via Arpit Agarwal)

HDFS-7686. Re-add rapid rescan of possibly corrupt block feature to the
block scanner (cmccabe)

Release 2.6.1 - UNRELEASED

INCOMPATIBLE CHANGES
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.concurrent.TimeUnit;

import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.VolumeScanner.ScanResultHandler;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.Uninterruptibles;
Expand Down Expand Up @@ -278,6 +279,37 @@ synchronized void printStats(StringBuilder p) {
}
}

/**
* Mark a block as "suspect."
*
* This means that we should try to rescan it soon. Note that the
* VolumeScanner keeps a list of recently suspicious blocks, which
* it uses to avoid rescanning the same block over and over in a short
* time frame.
*
* @param storageId The ID of the storage where the block replica
* is being stored.
* @param block The block's ID and block pool id.
*/
synchronized void markSuspectBlock(String storageId, ExtendedBlock block) {
if (!isEnabled()) {
LOG.info("Not scanning suspicious block {} on {}, because the block " +
"scanner is disabled.", block, storageId);
return;
}
VolumeScanner scanner = scanners.get(storageId);
if (scanner == null) {
// This could happen if the volume is in the process of being removed.
// The removal process shuts down the VolumeScanner, but the volume
// object stays around as long as there are references to it (which
// should not be that long.)
LOG.info("Not scanning suspicious block {} on {}, because there is no " +
"volume scanner for that storageId.", block, storageId);
return;
}
scanner.markSuspectBlock(block);
}

@InterfaceAudience.Private
public static class Servlet extends HttpServlet {
private static final long serialVersionUID = 1L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -601,6 +601,9 @@ private int sendPacket(ByteBuffer pkt, int maxChunks, OutputStream out,
if (!ioem.startsWith("Broken pipe") && !ioem.startsWith("Connection reset")) {
LOG.error("BlockSender.sendChunks() exception: ", e);
}
datanode.getBlockScanner().markSuspectBlock(
volumeRef.getVolume().getStorageID(),
block);
}
throw ioeToSocketException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,15 @@
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.BlockScanner.Conf;
Expand Down Expand Up @@ -116,6 +119,21 @@ public class VolumeScanner extends Thread {
private final List<BlockIterator> blockIters =
new LinkedList<BlockIterator>();

/**
* Blocks which are suspect.
* The scanner prioritizes scanning these blocks.
*/
private final LinkedHashSet<ExtendedBlock> suspectBlocks =
new LinkedHashSet<ExtendedBlock>();

/**
* Blocks which were suspect which we have scanned.
* This is used to avoid scanning the same suspect block over and over.
*/
private final Cache<ExtendedBlock, Boolean> recentSuspectBlocks =
CacheBuilder.newBuilder().maximumSize(1000)
.expireAfterAccess(10, TimeUnit.MINUTES).build();

/**
* The current block iterator, or null if there is none.
*/
Expand Down Expand Up @@ -458,10 +476,13 @@ static boolean calculateShouldScan(String storageId, long targetBytesPerSec,
/**
* Run an iteration of the VolumeScanner loop.
*
* @param suspectBlock A suspect block which we should scan, or null to
* scan the next regularly scheduled block.
*
* @return The number of milliseconds to delay before running the loop
* again, or 0 to re-run the loop immediately.
*/
private long runLoop() {
private long runLoop(ExtendedBlock suspectBlock) {
long bytesScanned = -1;
boolean scanError = false;
ExtendedBlock block = null;
Expand All @@ -477,40 +498,43 @@ private long runLoop() {
}

// Find a usable block pool to scan.
if ((curBlockIter == null) || curBlockIter.atEnd()) {
long timeout = findNextUsableBlockIter();
if (timeout > 0) {
LOG.trace("{}: no block pools are ready to scan yet. Waiting " +
"{} ms.", this, timeout);
if (suspectBlock != null) {
block = suspectBlock;
} else {
if ((curBlockIter == null) || curBlockIter.atEnd()) {
long timeout = findNextUsableBlockIter();
if (timeout > 0) {
LOG.trace("{}: no block pools are ready to scan yet. Waiting " +
"{} ms.", this, timeout);
synchronized (stats) {
stats.nextBlockPoolScanStartMs = Time.monotonicNow() + timeout;
}
return timeout;
}
synchronized (stats) {
stats.nextBlockPoolScanStartMs = Time.monotonicNow() + timeout;
stats.scansSinceRestart++;
stats.blocksScannedInCurrentPeriod = 0;
stats.nextBlockPoolScanStartMs = -1;
}
return timeout;
return 0L;
}
synchronized (stats) {
stats.scansSinceRestart++;
stats.blocksScannedInCurrentPeriod = 0;
stats.nextBlockPoolScanStartMs = -1;
try {
block = curBlockIter.nextBlock();
} catch (IOException e) {
// There was an error listing the next block in the volume. This is a
// serious issue.
LOG.warn("{}: nextBlock error on {}", this, curBlockIter);
// On the next loop iteration, curBlockIter#eof will be set to true, and
// we will pick a different block iterator.
return 0L;
}
if (block == null) {
// The BlockIterator is at EOF.
LOG.info("{}: finished scanning block pool {}",
this, curBlockIter.getBlockPoolId());
saveBlockIterator(curBlockIter);
return 0;
}
return 0L;
}

try {
block = curBlockIter.nextBlock();
} catch (IOException e) {
// There was an error listing the next block in the volume. This is a
// serious issue.
LOG.warn("{}: nextBlock error on {}", this, curBlockIter);
// On the next loop iteration, curBlockIter#eof will be set to true, and
// we will pick a different block iterator.
return 0L;
}
if (block == null) {
// The BlockIterator is at EOF.
LOG.info("{}: finished scanning block pool {}",
this, curBlockIter.getBlockPoolId());
saveBlockIterator(curBlockIter);
return 0;
}
long saveDelta = monotonicMs - curBlockIter.getLastSavedMs();
if (saveDelta >= conf.cursorSaveMs) {
Expand All @@ -529,7 +553,7 @@ private long runLoop() {
} finally {
synchronized (stats) {
stats.bytesScannedInPastHour = scannedBytesSum;
if (bytesScanned >= 0) {
if (bytesScanned > 0) {
stats.blocksScannedInCurrentPeriod++;
stats.blocksScannedSinceRestart++;
}
Expand All @@ -551,6 +575,20 @@ private long runLoop() {
}
}

/**
* If there are elements in the suspectBlocks list, removes
* and returns the first one. Otherwise, returns null.
*/
private synchronized ExtendedBlock popNextSuspectBlock() {
Iterator<ExtendedBlock> iter = suspectBlocks.iterator();
if (!iter.hasNext()) {
return null;
}
ExtendedBlock block = iter.next();
iter.remove();
return block;
}

@Override
public void run() {
// Record the minute on which the scanner started.
Expand All @@ -563,7 +601,9 @@ public void run() {
try {
long timeout = 0;
while (true) {
// Take the lock to check if we should stop.
ExtendedBlock suspectBlock = null;
// Take the lock to check if we should stop, and access the
// suspect block list.
synchronized (this) {
if (stopping) {
break;
Expand All @@ -574,8 +614,9 @@ public void run() {
break;
}
}
suspectBlock = popNextSuspectBlock();
}
timeout = runLoop();
timeout = runLoop(suspectBlock);
}
} catch (InterruptedException e) {
// We are exiting because of an InterruptedException,
Expand Down Expand Up @@ -612,6 +653,30 @@ public synchronized void shutdown() {
this.interrupt();
}


public synchronized void markSuspectBlock(ExtendedBlock block) {
if (stopping) {
LOG.info("{}: Not scheduling suspect block {} for " +
"rescanning, because this volume scanner is stopping.", this, block);
return;
}
Boolean recent = recentSuspectBlocks.getIfPresent(block);
if (recent != null) {
LOG.info("{}: Not scheduling suspect block {} for " +
"rescanning, because we rescanned it recently.", this, block);
return;
}
if (suspectBlocks.contains(block)) {
LOG.info("{}: suspect block {} is already queued for " +
"rescanning.", this, block);
return;
}
suspectBlocks.add(block);
recentSuspectBlocks.put(block, true);
LOG.info("{}: Scheduling suspect block {} for rescanning.", this, block);
notify(); // wake scanner thread.
}

/**
* Allow the scanner to scan the given block pool.
*
Expand Down
Loading

0 comments on commit 904786f

Please sign in to comment.