Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,12 @@
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.namenode.startupprogress.Phase;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
import org.apache.hadoop.hdfs.server.namenode.top.metrics.TopMetrics;
import org.apache.hadoop.hdfs.server.namenode.visitor.INodeCountVisitor;
import org.apache.hadoop.hdfs.server.namenode.visitor.INodeCountVisitor.Counts;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.util.GSet;
import org.apache.hadoop.util.StringUtils;
Expand All @@ -40,15 +44,21 @@
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.atomic.AtomicInteger;

import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ENABLE_RETRY_CACHE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_READ_LOCK_REPORTING_THRESHOLD_MS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_WRITE_LOCK_REPORTING_THRESHOLD_MS_KEY;
import static org.apache.hadoop.hdfs.server.namenode.FsImageValidation.Cli.println;
import static org.apache.hadoop.util.Time.now;

/**
Expand Down Expand Up @@ -134,6 +144,25 @@ static String toCommaSeparatedNumber(long n) {
}
return b.insert(0, n).toString();
}

/** @return a filter for the given type. */
static FilenameFilter newFilenameFilter(NameNodeFile type) {
final String prefix = type.getName() + "_";
return new FilenameFilter() {
@Override
public boolean accept(File dir, String name) {
if (!name.startsWith(prefix)) {
return false;
}
for (int i = prefix.length(); i < name.length(); i++) {
if (!Character.isDigit(name.charAt(i))) {
return false;
}
}
return true;
}
};
}
}

private final File fsImageFile;
Expand All @@ -142,21 +171,44 @@ static String toCommaSeparatedNumber(long n) {
this.fsImageFile = fsImageFile;
}

int checkINodeReference(Configuration conf) throws Exception {
int run() throws Exception {
return run(new Configuration(), new AtomicInteger());
}

int run(AtomicInteger errorCount) throws Exception {
return run(new Configuration(), errorCount);
}

int run(Configuration conf, AtomicInteger errorCount) throws Exception {
final int initCount = errorCount.get();
LOG.info(Util.memoryInfo());
initConf(conf);

// check INodeReference
final FSNamesystem namesystem = checkINodeReference(conf, errorCount);

// check INodeMap
INodeMapValidation.run(namesystem.getFSDirectory(), errorCount);
LOG.info(Util.memoryInfo());

final int d = errorCount.get() - initCount;
if (d > 0) {
Cli.println("Found %d error(s) in %s", d, fsImageFile.getAbsolutePath());
}
return d;
}

private FSNamesystem loadImage(Configuration conf) throws IOException {
final TimerTask checkProgress = new TimerTask() {
@Override
public void run() {
final double percent = NameNode.getStartupProgress().createView()
.getPercentComplete(Phase.LOADING_FSIMAGE);
LOG.info(String.format("%s Progress: %.1f%%",
Phase.LOADING_FSIMAGE, 100*percent));
LOG.info(String.format("%s Progress: %.1f%% (%s)",
Phase.LOADING_FSIMAGE, 100*percent, Util.memoryInfo()));
}
};

INodeReferenceValidation.start();
final Timer t = new Timer();
t.scheduleAtFixedRate(checkProgress, 0, 60_000);
final long loadStart = now();
Expand Down Expand Up @@ -197,10 +249,42 @@ public void run() {
t.cancel();
Cli.println("Loaded %s %s successfully in %s",
FS_IMAGE, fsImageFile, StringUtils.formatTime(now() - loadStart));
return namesystem;
}

FSNamesystem checkINodeReference(Configuration conf,
AtomicInteger errorCount) throws Exception {
INodeReferenceValidation.start();
final FSNamesystem namesystem = loadImage(conf);
LOG.info(Util.memoryInfo());
final int errorCount = INodeReferenceValidation.end();
INodeReferenceValidation.end(errorCount);
LOG.info(Util.memoryInfo());
return errorCount;
return namesystem;
}

static class INodeMapValidation {
static Iterable<INodeWithAdditionalFields> iterate(INodeMap map) {
return new Iterable<INodeWithAdditionalFields>() {
@Override
public Iterator<INodeWithAdditionalFields> iterator() {
return map.getMapIterator();
}
};
}

static void run(FSDirectory fsdir, AtomicInteger errorCount) {
final int initErrorCount = errorCount.get();
final Counts counts = INodeCountVisitor.countTree(fsdir.getRoot());
for (INodeWithAdditionalFields i : iterate(fsdir.getINodeMap())) {
if (counts.getCount(i) == 0) {
Cli.printError(errorCount, "%s (%d) is inaccessible (%s)",
i, i.getId(), i.getFullPathName());
}
}
println("%s ended successfully: %d error(s) found.",
INodeMapValidation.class.getSimpleName(),
errorCount.get() - initErrorCount);
}
}

static class Cli extends Configured implements Tool {
Expand All @@ -217,9 +301,10 @@ public int run(String[] args) throws Exception {
initLogLevels();

final FsImageValidation validation = FsImageValidation.newInstance(args);
final int errorCount = validation.checkINodeReference(getConf());
final AtomicInteger errorCount = new AtomicInteger();
validation.run(getConf(), errorCount);
println("Error Count: %s", errorCount);
return errorCount == 0? 0: 1;
return errorCount.get() == 0? 0: 1;
}

static String parse(String... args) {
Expand All @@ -240,19 +325,68 @@ static String parse(String... args) {
return f;
}

static void println(String format, Object... args) {
static synchronized void println(String format, Object... args) {
final String s = String.format(format, args);
System.out.println(s);
LOG.info(s);
}

static void printError(String message, Throwable t) {
static synchronized void warn(String format, Object... args) {
final String s = "WARN: " + String.format(format, args);
System.out.println(s);
LOG.warn(s);
}

static synchronized void printError(String message, Throwable t) {
System.out.println(message);
if (t != null) {
t.printStackTrace(System.out);
}
LOG.error(message, t);
}

static synchronized void printError(AtomicInteger errorCount,
String format, Object... args) {
final int count = errorCount.incrementAndGet();
final String s = "FSIMAGE_ERROR " + count + ": "
+ String.format(format, args);
System.out.println(s);
LOG.info(s);
}
}

public static int validate(FSNamesystem namesystem) throws Exception {
final AtomicInteger errorCount = new AtomicInteger();
final NNStorage nnStorage = namesystem.getFSImage().getStorage();
for(Storage.StorageDirectory sd : nnStorage.getStorageDirs()) {
validate(sd.getCurrentDir(), errorCount);
}
return errorCount.get();
}

public static void validate(File path, AtomicInteger errorCount)
throws Exception {
if (path.isFile()) {
new FsImageValidation(path).run(errorCount);
} else if (path.isDirectory()) {
final File[] images = path.listFiles(
Util.newFilenameFilter(NameNodeFile.IMAGE));
if (images == null || images.length == 0) {
Cli.warn("%s not found in %s", FSImage.class.getSimpleName(),
path.getAbsolutePath());
return;
}

Arrays.sort(images, Collections.reverseOrder());
for (int i = 0; i < images.length; i++) {
final File image = images[i];
Cli.println("%s %d) %s", FSImage.class.getSimpleName(),
i, image.getAbsolutePath());
FsImageValidation.validate(image, errorCount);
}
}

Cli.warn("%s is neither a file nor a directory", path.getAbsolutePath());
}

public static void main(String[] args) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,18 +46,20 @@ public class INodeReferenceValidation {

public static void start() {
INSTANCE.compareAndSet(null, new INodeReferenceValidation());
println("Validation started");
println("%s started", INodeReferenceValidation.class.getSimpleName());
}

public static int end() {
public static void end(AtomicInteger errorCount) {
final INodeReferenceValidation instance = INSTANCE.getAndSet(null);
if (instance == null) {
return 0;
return;
}

final int errorCount = instance.assertReferences();
println("Validation ended successfully: %d error(s) found.", errorCount);
return errorCount;
final int initCount = errorCount.get();
instance.assertReferences(errorCount);
println("%s ended successfully: %d error(s) found.",
INodeReferenceValidation.class.getSimpleName(),
errorCount.get() - initCount);
}

static <REF extends INodeReference> void add(REF ref, Class<REF> clazz) {
Expand Down Expand Up @@ -153,7 +155,7 @@ <REF extends INodeReference> ReferenceSet<REF> getReferences(
throw new IllegalArgumentException("References not found for " + clazz);
}

private int assertReferences() {
private void assertReferences(AtomicInteger errorCount) {
final int p = Runtime.getRuntime().availableProcessors();
LOG.info("Available Processors: {}", p);
final ExecutorService service = Executors.newFixedThreadPool(p);
Expand All @@ -168,7 +170,6 @@ public void run() {
final Timer t = new Timer();
t.scheduleAtFixedRate(checkProgress, 0, 1_000);

final AtomicInteger errorCount = new AtomicInteger();
try {
dstReferences.submit(errorCount, service);
withCounts.submit(errorCount, service);
Expand All @@ -183,7 +184,6 @@ public void run() {
service.shutdown();
t.cancel();
}
return errorCount.get();
}

static <REF extends INodeReference> List<Task<REF>> createTasks(
Expand Down Expand Up @@ -215,7 +215,7 @@ public Integer call() throws Exception {
try {
ref.assertReferences();
} catch (Throwable t) {
println("%d: %s", errorCount.incrementAndGet(), t);
printError(errorCount, "%s", t);
}
}
return references.size();
Expand Down
Loading