Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,28 @@ public static void assertExceptionContains(String expectedText,
public static void waitFor(final Supplier<Boolean> check,
final long checkEveryMillis, final long waitForMillis)
throws TimeoutException, InterruptedException {
waitFor(check, checkEveryMillis, waitForMillis, null);
}

/**
* Wait for the specified test to return true. The test will be performed
* initially and then every {@code checkEveryMillis} until at least
* {@code waitForMillis} time has expired. If {@code check} is null or
* {@code waitForMillis} is less than {@code checkEveryMillis} this method
* will throw an {@link IllegalArgumentException}.
*
* @param check the test to perform.
* @param checkEveryMillis how often to perform the test.
* @param waitForMillis the amount of time after which no more tests will be
* performed.
* @param errorMsg error message to provide in TimeoutException.
* @throws TimeoutException if the test does not return true in the allotted
* time.
* @throws InterruptedException if the method is interrupted while waiting.
*/
public static void waitFor(final Supplier<Boolean> check,
final long checkEveryMillis, final long waitForMillis,
final String errorMsg) throws TimeoutException, InterruptedException {
Objects.requireNonNull(check, ERROR_MISSING_ARGUMENT);
if (waitForMillis < checkEveryMillis) {
throw new IllegalArgumentException(ERROR_INVALID_ARGUMENT);
Expand All @@ -432,9 +454,12 @@ public static void waitFor(final Supplier<Boolean> check,
}

if (!result) {
throw new TimeoutException("Timed out waiting for condition. " +
"Thread diagnostics:\n" +
TimedOutTestsListener.buildThreadDiagnosticString());
final String exceptionErrorMsg = "Timed out waiting for condition. "
+ (org.apache.commons.lang3.StringUtils.isNotEmpty(errorMsg)
? "Error Message: " + errorMsg : "")
+ "\nThread diagnostics:\n" +
TimedOutTestsListener.buildThreadDiagnosticString();
throw new TimeoutException(exceptionErrorMsg);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@
import java.util.Arrays;
import java.util.List;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.commons.io.output.ByteArrayOutputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
Expand Down Expand Up @@ -76,7 +79,8 @@ public class TestDecommissioningStatus {
private FileSystem fileSys;
private HostsFileWriter hostsFileWriter;
private Configuration conf;
private Logger LOG;
private static final Logger LOG =
LoggerFactory.getLogger(TestDecommissioningStatus.class);

final ArrayList<String> decommissionedNodes = new ArrayList<String>(numDatanodes);

Expand Down Expand Up @@ -110,7 +114,6 @@ protected Configuration setupConfig() throws Exception {
conf.setLong(DFSConfigKeys.DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_KEY, 1);
GenericTestUtils.setLogLevel(
LoggerFactory.getLogger(DatanodeAdminManager.class), Level.DEBUG);
LOG = LoggerFactory.getLogger(TestDecommissioningStatus.class);
return conf;
}

Expand Down Expand Up @@ -165,17 +168,30 @@ protected void decommissionNode(String dnName)

protected void checkDecommissionStatus(DatanodeDescriptor decommNode,
int expectedUnderRep, int expectedDecommissionOnly,
int expectedUnderRepInOpenFiles) {
assertEquals("Unexpected num under-replicated blocks",
expectedUnderRep,
decommNode.getLeavingServiceStatus().getUnderReplicatedBlocks());
assertEquals("Unexpected number of decom-only replicas",
expectedDecommissionOnly,
decommNode.getLeavingServiceStatus().getOutOfServiceOnlyReplicas());
assertEquals(
"Unexpected number of replicas in under-replicated open files",
expectedUnderRepInOpenFiles,
decommNode.getLeavingServiceStatus().getUnderReplicatedInOpenFiles());
int expectedUnderRepInOpenFiles) throws TimeoutException,
InterruptedException {
String errorMsg;
errorMsg = "Under replicated blocks. Expected: "
+ expectedUnderRep + " , Actual: "
+ decommNode.getLeavingServiceStatus().getUnderReplicatedBlocks();
GenericTestUtils.waitFor(
() -> expectedUnderRep == decommNode.getLeavingServiceStatus()
.getUnderReplicatedBlocks(),
1000, TimeUnit.SECONDS.toMillis(10), errorMsg);
errorMsg = "OutOfService only replicas. Expected: "
+ expectedDecommissionOnly + " , Actual: "
+ decommNode.getLeavingServiceStatus().getOutOfServiceOnlyReplicas();
GenericTestUtils.waitFor(
() -> expectedDecommissionOnly == decommNode.getLeavingServiceStatus()
.getOutOfServiceOnlyReplicas(),
1000, TimeUnit.SECONDS.toMillis(10), errorMsg);
errorMsg = "UnderReplicated in open files. Expected: "
+ expectedUnderRepInOpenFiles + " , Actual: "
+ decommNode.getLeavingServiceStatus().getUnderReplicatedInOpenFiles();
GenericTestUtils.waitFor(
() -> expectedUnderRepInOpenFiles == decommNode
.getLeavingServiceStatus().getUnderReplicatedInOpenFiles(),
1000, TimeUnit.SECONDS.toMillis(10), errorMsg);
}

protected void checkDFSAdminDecommissionStatus(
Expand Down Expand Up @@ -270,6 +286,7 @@ public void testDecommissionStatus() throws Exception {

FSNamesystem fsn = cluster.getNamesystem();
final DatanodeManager dm = fsn.getBlockManager().getDatanodeManager();
verifyInitialState(fsn, dm);
for (int iteration = 0; iteration < numDatanodes; iteration++) {
String downnode = decommissionNode(client, iteration);
dm.refreshNodes(conf);
Expand All @@ -278,14 +295,13 @@ public void testDecommissionStatus() throws Exception {
// Block until the admin's monitor updates the number of tracked nodes.
waitForDecommissionedNodes(dm.getDatanodeAdminManager(), iteration + 1);
final List<DatanodeDescriptor> decommissioningNodes = dm.getDecommissioningNodes();
assertEquals(decommissioningNodes.size(), iteration + 1);
if (iteration == 0) {
assertEquals(decommissioningNodes.size(), 1);
DatanodeDescriptor decommNode = decommissioningNodes.get(0);
checkDecommissionStatus(decommNode, 3, 0, 1);
checkDFSAdminDecommissionStatus(decommissioningNodes.subList(0, 1),
fileSys, admin);
} else {
assertEquals(decommissioningNodes.size(), 2);
DatanodeDescriptor decommNode1 = decommissioningNodes.get(0);
DatanodeDescriptor decommNode2 = decommissioningNodes.get(1);
// This one is still 3,3,1 since it passed over the UC block
Expand All @@ -307,6 +323,69 @@ public void testDecommissionStatus() throws Exception {
AdminStatesBaseTest.cleanupFile(fileSys, file2);
}

// Why do we verify initial state of DataNodes here?
// Before we start actual decommission testing, we should ensure that
// total 8 blocks (original 4 blocks of 2 files and 4 replicas) are
// present over two Datanodes available. If we don't wait until all 8 blocks
// are reported live by BlockManager, we might get to a situation
// where one of the replicas might not yet been present on any of Datanodes
// and we start decommissioning process, and then it would result in
// flaky test because total (no of under replicated blocks, no of outOfService
// only replicas, no of under replicated in open files) counts would be
// incorrect.
protected void verifyInitialState(FSNamesystem fsn, DatanodeManager dm)
throws InterruptedException {
dm.getDatanodes().forEach(datanodeDescriptor -> {
try {
checkDecommissionStatus(datanodeDescriptor, 0, 0, 0);
} catch (TimeoutException | InterruptedException e) {
throw new AssertionError("Datanode not in good state.", e);
}
});
int c = 0;
int totalBlocks;
long totalReplicatedBlocks;
while (true) {
totalBlocks = fsn.getBlockManager().getTotalBlocks();
totalReplicatedBlocks = fsn.getBlockManager().getTotalReplicatedBlocks();
if (totalBlocks == 4 && totalReplicatedBlocks == 4) {
break;
} else {
if (c == 4) {
throw new AssertionError("Unexpected Total blocks " + totalBlocks
+ " and replicated blocks " + totalReplicatedBlocks);
}
Thread.sleep(3000);
}
c++;
}
c = 0;
AtomicInteger total = new AtomicInteger(0);
AtomicInteger sufficientBlocksSuccess = new AtomicInteger(0);
while (true) {
total.set(0);
sufficientBlocksSuccess.set(0);
dm.getDatanodes().forEach(
datanodeDescriptor -> {
total.addAndGet(datanodeDescriptor.numBlocks());
if (datanodeDescriptor.numBlocks() == 4) {
sufficientBlocksSuccess.incrementAndGet();
}
});
if (total.get() == 8 && sufficientBlocksSuccess.get() == 2) {
break;
} else {
if (c == 4) {
throw new AssertionError("Unexpected Total blocks " + total.get()
+ " from Datanode Storage. 4 blocks per Datanode Storage"
+ " expected from each DataNode");
}
Thread.sleep(3000);
}
c++;
}
}

/**
* Verify a DN remains in DECOMMISSION_INPROGRESS state if it is marked
* as dead before decommission has completed. That will allow DN to resume
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ public void testDecommissionStatus() throws Exception {

FSNamesystem fsn = cluster.getNamesystem();
final DatanodeManager dm = fsn.getBlockManager().getDatanodeManager();
verifyInitialState(fsn, dm);
for (int iteration = 0; iteration < numDatanodes; iteration++) {
String downnode = decommissionNode(client, iteration);
dm.refreshNodes(conf);
Expand Down