Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1239,6 +1239,13 @@ public void restartHBaseCluster(int servers) throws IOException, InterruptedExce

public void restartHBaseCluster(int servers, List<Integer> ports)
throws IOException, InterruptedException {
StartMiniClusterOption option =
StartMiniClusterOption.builder().numRegionServers(servers).rsPorts(ports).build();
restartHBaseCluster(option);
}

public void restartHBaseCluster(StartMiniClusterOption option)
throws IOException, InterruptedException {
if (hbaseAdmin != null) {
hbaseAdmin.close();
hbaseAdmin = null;
Expand All @@ -1247,7 +1254,9 @@ public void restartHBaseCluster(int servers, List<Integer> ports)
this.asyncConnection.close();
this.asyncConnection = null;
}
this.hbaseCluster = new MiniHBaseCluster(this.conf, 1, servers, ports, null, null);
this.hbaseCluster =
new MiniHBaseCluster(this.conf, option.getNumMasters(), option.getNumRegionServers(),
option.getRsPorts(), option.getMasterClass(), option.getRsClass());
// Don't leave here till we've done a successful scan of the hbase:meta
Connection conn = ConnectionFactory.createConnection(this.conf);
Table t = conn.getTable(TableName.META_TABLE_NAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,32 @@
*/
package org.apache.hadoop.hbase.master;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;

import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.stream.Collectors;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.StartMiniClusterOption;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
import org.apache.hadoop.hbase.master.assignment.ServerState;
import org.apache.hadoop.hbase.master.assignment.ServerStateNode;
import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.junit.Assert;
import org.apache.zookeeper.KeeperException;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
Expand All @@ -45,6 +58,9 @@ public class TestClusterRestartFailover extends AbstractTestRestartCluster {

private static final Logger LOG = LoggerFactory.getLogger(TestClusterRestartFailover.class);

private static CountDownLatch SCP_LATCH;
private static ServerName SERVER_FOR_TEST;

@Override
protected boolean splitWALCoordinatedByZk() {
return true;
Expand All @@ -55,60 +71,111 @@ private ServerStateNode getServerStateNode(ServerName serverName) {
.getServerNode(serverName);
}

/**
* Test for HBASE-22964
*/
@Test
public void test() throws Exception {
UTIL.startMiniCluster(3);
setupCluster();
setupTable();

SERVER_FOR_TEST = UTIL.getHBaseCluster().getRegionServer(0).getServerName();
UTIL.waitFor(60000, () -> getServerStateNode(SERVER_FOR_TEST) != null);
ServerStateNode serverNode = getServerStateNode(SERVER_FOR_TEST);
assertNotNull(serverNode);
assertTrue("serverNode should be ONLINE when cluster runs normally",
serverNode.isInState(ServerState.ONLINE));

SCP_LATCH = new CountDownLatch(1);

// Shutdown cluster and restart
List<Integer> ports =
UTIL.getHBaseCluster().getMaster().getServerManager().getOnlineServersList().stream()
.map(serverName -> serverName.getPort()).collect(Collectors.toList());
LOG.info("Shutting down cluster");
UTIL.getHBaseCluster().killAll();
UTIL.getHBaseCluster().waitUntilShutDown();
LOG.info("Restarting cluster");
UTIL.restartHBaseCluster(StartMiniClusterOption.builder().masterClass(HMasterForTest.class)
.numMasters(1).numRegionServers(3).rsPorts(ports).build());
UTIL.waitFor(60000, () -> UTIL.getHBaseCluster().getMaster().isInitialized());

UTIL.waitFor(60000, () -> getServerStateNode(SERVER_FOR_TEST) != null);
serverNode = getServerStateNode(SERVER_FOR_TEST);
assertFalse("serverNode should not be ONLINE during SCP processing",
serverNode.isInState(ServerState.ONLINE));
Optional<Procedure<?>> procedure = UTIL.getHBaseCluster().getMaster().getProcedures().stream()
.filter(p -> (p instanceof ServerCrashProcedure) &&
((ServerCrashProcedure) p).getServerName().equals(SERVER_FOR_TEST)).findAny();
assertTrue("Should have one SCP for " + SERVER_FOR_TEST, procedure.isPresent());
assertFalse("Submit the SCP for the same serverName " + SERVER_FOR_TEST + " which should fail",
UTIL.getHBaseCluster().getMaster().getServerManager().expireServer(SERVER_FOR_TEST));

// Wait the SCP to finish
SCP_LATCH.countDown();
UTIL.waitFor(60000, () -> procedure.get().isFinished());

assertFalse("Even when the SCP is finished, the duplicate SCP should not be scheduled for " +
SERVER_FOR_TEST,
UTIL.getHBaseCluster().getMaster().getServerManager().expireServer(SERVER_FOR_TEST));
serverNode = UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates()
.getServerNode(SERVER_FOR_TEST);
assertNull("serverNode should be deleted after SCP finished", serverNode);
}

private void setupCluster() throws Exception {
UTIL.startMiniCluster(
StartMiniClusterOption.builder().masterClass(HMasterForTest.class).numMasters(1)
.numRegionServers(3).build());
UTIL.waitFor(60000, () -> UTIL.getMiniHBaseCluster().getMaster().isInitialized());
// wait for all SCPs finished
UTIL.waitFor(60000, () -> UTIL.getHBaseCluster().getMaster().getProcedures().stream()
.noneMatch(p -> p instanceof ServerCrashProcedure));
.noneMatch(p -> p instanceof ServerCrashProcedure));
}

private void setupTable() throws Exception {
TableName tableName = TABLES[0];
ServerName testServer = UTIL.getHBaseCluster().getRegionServer(0).getServerName();
UTIL.waitFor(30000, () -> getServerStateNode(testServer) != null);
ServerStateNode serverNode = getServerStateNode(testServer);
Assert.assertNotNull(serverNode);
Assert.assertTrue("serverNode should be ONLINE when cluster runs normally",
serverNode.isInState(ServerState.ONLINE));
UTIL.createMultiRegionTable(tableName, FAMILY);
UTIL.waitTableEnabled(tableName);
UTIL.waitTableAvailable(tableName);
Table table = UTIL.getConnection().getTable(tableName);
for (int i = 0; i < 100; i++) {
UTIL.loadTable(table, FAMILY);
}
List<Integer> ports =
UTIL.getHBaseCluster().getMaster().getServerManager().getOnlineServersList().stream()
.map(serverName -> serverName.getPort()).collect(Collectors.toList());
LOG.info("Shutting down cluster");
UTIL.getHBaseCluster().killAll();
UTIL.getHBaseCluster().waitUntilShutDown();
LOG.info("Starting cluster the second time");
UTIL.restartHBaseCluster(3, ports);
UTIL.waitFor(30000, () -> UTIL.getHBaseCluster().getMaster().isInitialized());
serverNode = UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates()
.getServerNode(testServer);
Assert.assertNotNull("serverNode should not be null when restart whole cluster", serverNode);
Assert.assertFalse(serverNode.isInState(ServerState.ONLINE));
LOG.info("start to find the procedure of SCP for the severName we choose");
UTIL.waitFor(60000,
() -> UTIL.getHBaseCluster().getMaster().getProcedures().stream()
.anyMatch(procedure -> (procedure instanceof ServerCrashProcedure) &&
((ServerCrashProcedure) procedure).getServerName().equals(testServer)));
Assert.assertFalse("serverNode should not be ONLINE during SCP processing",
serverNode.isInState(ServerState.ONLINE));
LOG.info("start to submit the SCP for the same serverName {} which should fail", testServer);
Assert
.assertFalse(UTIL.getHBaseCluster().getMaster().getServerManager().expireServer(testServer));
Procedure<?> procedure = UTIL.getHBaseCluster().getMaster().getProcedures().stream()
.filter(p -> (p instanceof ServerCrashProcedure) &&
((ServerCrashProcedure) p).getServerName().equals(testServer))
.findAny().get();
UTIL.waitFor(60000, () -> procedure.isFinished());
LOG.info("even when the SCP is finished, the duplicate SCP should not be scheduled for {}",
testServer);
Assert
.assertFalse(UTIL.getHBaseCluster().getMaster().getServerManager().expireServer(testServer));
serverNode = UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates()
.getServerNode(testServer);
Assert.assertNull("serverNode should be deleted after SCP finished", serverNode);
}

public static final class HMasterForTest extends HMaster {

public HMasterForTest(Configuration conf) throws IOException, KeeperException {
super(conf);
}

@Override
protected AssignmentManager createAssignmentManager(MasterServices master) {
return new AssignmentManagerForTest(master);
}
}

private static final class AssignmentManagerForTest extends AssignmentManager {

public AssignmentManagerForTest(MasterServices master) {
super(master);
}

@Override
public List<RegionInfo> getRegionsOnServer(ServerName serverName) {
List<RegionInfo> regions = super.getRegionsOnServer(serverName);
// ServerCrashProcedure will call this method, so wait the CountDownLatch here
if (SCP_LATCH != null && SERVER_FOR_TEST != null && serverName.equals(SERVER_FOR_TEST)) {
try {
LOG.info("ServerCrashProcedure wait the CountDownLatch here");
SCP_LATCH.await();
LOG.info("Continue the ServerCrashProcedure");
SCP_LATCH = null;
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
return regions;
}
}
}