Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,11 @@ public static InetSocketAddress createSocketAddr(String target) {
private final StorageLocationChecker storageLocationChecker;

private final DatasetVolumeChecker volumeChecker;
volatile FsDatasetSpi<? extends FsVolumeSpi> allData = null;
private Thread checkDiskThread = null;
private final int checkDiskInterval = 5*1000;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is better as one parameter to config.

private Object checkDiskMutex = new Object();
private List<StorageLocation> errorDisk = null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

allData = data + errorDisk right? It seems that allData is redundant variable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

allData = data + errorDisk is right. allData will use in volumeChecker.checkAllVolumes(allData).
If no allData, we must make a variable before volumeChecker.checkAllVolumes, the variable is data + errorDisk. So I think allData is ok .


private final SocketFactory socketFactory;

Expand Down Expand Up @@ -798,6 +803,7 @@ private synchronized void refreshVolumes(String newVolumes) throws IOException {
public IOException call() {
try {
data.addVolume(location, nsInfos);
allData.addVolume(location, nsInfos);
} catch (IOException e) {
return e;
}
Expand All @@ -819,6 +825,14 @@ public IOException call() {
} else {
effectiveVolumes.add(volume.toString());
LOG.info("Successfully added volume: {}", volume);
if (errorDisk != null && !errorDisk.isEmpty()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may be not necessary to check if errorDisk is empty.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so. If refreshVolumes is invoked by reconfigurePropertyImpl, errorDisk may be null.

  public String reconfigurePropertyImpl(String property, String newVal)
      throws ReconfigurationException {
    switch (property) {
      case DFS_DATANODE_DATA_DIR_KEY: {
        IOException rootException = null;
        try {
          LOG.info("Reconfiguring {} to {}", property, newVal);
          this.refreshVolumes(newVal);
...
  }

LOG.debug("check errorDisk for {} disk ", volume);
if (errorDisk.contains(volume)) {
errorDisk.remove(volume);
LOG.info("Remove {} from errorDisk, " +
"because of the repaired disk ", volume);
}
}
}
} catch (Exception e) {
errorMessageBuilder.append(
Expand Down Expand Up @@ -1694,6 +1708,8 @@ void initBlockPool(BPOfferService bpos) throws IOException {
// In the case that this is the first block pool to connect, initialize
// the dataset, block scanners, etc.
initStorage(nsInfo);
// start check disk thread.
startCheckDiskThread();

// Exclude failed disks before initializing the block pools to avoid startup
// failures.
Expand Down Expand Up @@ -1772,6 +1788,7 @@ private void initStorage(final NamespaceInfo nsInfo) throws IOException {
synchronized(this) {
if (data == null) {
data = factory.newInstance(this, storage, getConf());
allData = factory.newInstance(this, storage, getConf());
}
}
}
Expand Down Expand Up @@ -2191,6 +2208,44 @@ public void shutdown() {
tracer.close();
}

public void startCheckDiskThread() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest to use ScheduledExecutorService to schedule this task at fixed rate. since,

  1. same target,
  2. simpler,

checkDiskMutex, it seems to me that it it no need to concurrency control.

if (checkDiskThread == null) {
synchronized (checkDiskMutex) {
if (checkDiskThread == null) {
checkDiskThread = new Thread(new Runnable() {
@Override
public void run() {
while (shouldRun) {
LOG.info("CheckDiskThread running ");
if (errorDisk != null && !errorDisk.isEmpty()) {
try {
checkDiskError();
} catch (Exception e) {
LOG.warn("Unexpected exception occurred while" +
" checking disk error "+ e);
checkDiskThread = null;
return;
}
lastDiskErrorCheck = Time.monotonicNow();
}
try {
Thread.sleep(checkDiskInterval);
} catch (InterruptedException e) {
LOG.debug("InterruptedException in check disk error thread",
e);
checkDiskThread = null;
return;
}
}
}
});
checkDiskThread.start();
LOG.info("Starting CheckDiskError Thread");
}
}
}
}

/**
* Check if there is a disk failure asynchronously
* and if so, handle the error.
Expand Down Expand Up @@ -3409,19 +3464,55 @@ public ShortCircuitRegistry getShortCircuitRegistry() {
public void checkDiskError() throws IOException {
Set<FsVolumeSpi> unhealthyVolumes;
try {
unhealthyVolumes = volumeChecker.checkAllVolumes(data);
// check all volume
unhealthyVolumes = volumeChecker.checkAllVolumes(allData);
lastDiskErrorCheck = Time.monotonicNow();
} catch (InterruptedException e) {
LOG.error("Interruped while running disk check", e);
LOG.error("Interrupted while running disk check", e);
throw new IOException("Interrupted while running disk check", e);
}

if (unhealthyVolumes.size() > 0) {
if (errorDisk == null) {
errorDisk = new ArrayList<>();
}
List<StorageLocation> tmpDisk = Lists.newArrayList(errorDisk);
errorDisk = null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe this statement is unexpected.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

errorDisk=null should be errorDisk.clear()

for (FsVolumeSpi vol : unhealthyVolumes) {
LOG.info("Add error disk " + vol.getStorageLocation()
+ " to errorDisk - " + vol.getStorageLocation());
errorDisk.add(vol.getStorageLocation());
if (tmpDisk.contains(vol.getStorageLocation())) {
tmpDisk.remove(vol.getStorageLocation());
}
}
LOG.warn("checkDiskError got {} failed volumes - {}",
unhealthyVolumes.size(), unhealthyVolumes);
handleVolumeFailures(unhealthyVolumes);
if (!tmpDisk.isEmpty()) {
try {
Configuration conf = getConf();
String newDataDirs = conf.get(DFS_DATANODE_DATA_DIR_KEY)
+ "," + Joiner.on(",").join(tmpDisk);
refreshVolumes(newDataDirs);
} catch (IOException e) {
LOG.error("Auto refreshVolumes error : ", e);
}
}
} else {
LOG.debug("checkDiskError encountered no failures");
LOG.debug("checkDiskError encountered no failures," +
"then check errorDisk");
if (errorDisk != null && !errorDisk.isEmpty()) {
try {
Configuration conf = getConf();
String newDataDirs = conf.get(DFS_DATANODE_DATA_DIR_KEY)
+ "," + Joiner.on(",").join(errorDisk);
refreshVolumes(newDataDirs);
errorDisk = null;
} catch (IOException e) {
LOG.error("Auto refreshVolumes error : ", e);
}
}
}
}

Expand Down