Skip to content
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 111 additions & 24 deletions hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,6 @@
*/
package org.apache.hadoop.hbase;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import edu.umd.cs.findbugs.annotations.Nullable;
import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -133,9 +129,7 @@
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.zookeeper.EmptyWatcher;
import org.apache.hadoop.hbase.zookeeper.ZKConfig;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.hadoop.hbase.zookeeper.*;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoid star imports.

import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
Expand All @@ -144,8 +138,10 @@
import org.apache.hadoop.mapred.MiniMRCluster;
import org.apache.hadoop.mapred.TaskLog;
import org.apache.hadoop.minikdc.MiniKdc;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooKeeper.States;
Expand All @@ -154,6 +150,8 @@

import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;

import static org.junit.Assert.*;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto. And static imports should be place on top.


/**
* Facility for testing HBase. Replacement for old HBaseTestCase and HBaseClusterTestCase
* functionality. Create an instance and keep it around testing HBase.
Expand Down Expand Up @@ -220,6 +218,7 @@ public class HBaseTestingUtil extends HBaseZKTestingUtil {
/** This is for unit tests parameterized with a single boolean. */
public static final List<Object[]> MEMSTORETS_TAGS_PARAMETRIZED = memStoreTSAndTagsCombination();


/**
* Checks to see if a specific port is available.
* @param port the port number to check for availability
Expand Down Expand Up @@ -330,10 +329,13 @@ public HBaseTestingUtil(@Nullable Configuration conf) {
}
// Every cluster is a local cluster until we start DFS
// Note that conf could be null, but this.conf will not be
String dataTestDir = getDataTestDir().toString();
this.conf.set("fs.defaultFS", "file:///");
this.conf.set(HConstants.HBASE_DIR, "file://" + dataTestDir);
LOG.debug("Setting {} to {}", HConstants.HBASE_DIR, dataTestDir);
if (this.conf.get("fs.defaultFS") != null && !this.conf.get("fs.defaultFS").startsWith("hdfs://")) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess here you do not want to override the fs.defaultFS config if it is present? But the logic here will skip setting fs.defaultFS if it is not set? Is this correct?

String dataTestDir = getDataTestDir().toString();
this.conf.set("fs.defaultFS", "file:///");
this.conf.set(HConstants.HBASE_DIR, "file://" + dataTestDir);
LOG.debug("Setting {} to {}", HConstants.HBASE_DIR, dataTestDir);
}

this.conf.setBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, false);
// If the value for random ports isn't set set it to true, thus making
// tests opt-out for random port assignment
Expand Down Expand Up @@ -772,21 +774,24 @@ public SingleProcessHBaseCluster startMiniCluster(StartTestingClusterOption opti
}
miniClusterRunning = true;

setupClusterTestDir();
System.setProperty(TEST_DIRECTORY_KEY, this.clusterTestDir.getPath());

// Bring up mini dfs cluster. This spews a bunch of warnings about missing
// scheme. Complaints are 'Scheme is undefined for build/test/data/dfs/name1'.
if (dfsCluster == null) {
LOG.info("STARTING DFS");
dfsCluster = startMiniDFSCluster(option.getNumDataNodes(), option.getDataNodeHosts());
} else {
LOG.info("NOT STARTING DFS");
if (!option.isExternalDFS()) {
setupClusterTestDir();
System.setProperty(TEST_DIRECTORY_KEY, this.clusterTestDir.getPath());
// Bring up mini dfs cluster. This spews a bunch of warnings about missing
// scheme. Complaints are 'Scheme is undefined for build/test/data/dfs/name1'.
if (dfsCluster == null) {
LOG.info("STARTING DFS");
dfsCluster = startMiniDFSCluster(option.getNumDataNodes(), option.getDataNodeHosts());
} else {
LOG.info("NOT STARTING DFS");
}
}

// Start up a zk cluster.
if (getZkCluster() == null) {
startMiniZKCluster(option.getNumZkServers());
if (!option.isExternalZK()) {
// Start up a zk cluster.
if (getZkCluster() == null) {
startMiniZKCluster(option.getNumZkServers());
}
}

// Start the MiniHBaseCluster
Expand All @@ -802,6 +807,24 @@ public SingleProcessHBaseCluster startMiniCluster(StartTestingClusterOption opti
*/
public SingleProcessHBaseCluster startMiniHBaseCluster(StartTestingClusterOption option)
throws IOException, InterruptedException {

if (option.isExternalDFS()) {
assertNotNull("fs.defaultFS can not be null, if use external DFS", conf.get("fs.defaultFS"));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general I prefer we do not depend on junit methods in HBTU, as it will force our downstream users also use the same version of junit... But anyway, not your fault, I think we have already used it in this class...

if (System.getProperty("HADOOP_USER_NAME") == null) {
System.setProperty("HADOOP_USER_NAME", "hdfs");
}

// RS is started with a different user, @see #HBaseTestingUtil.getDifferentUser
// this is to ensure the user has permissions to read and write HDFS.
conf.set("fs.permissions.umask-mode", "000");
LOG.info("USING EXTERNAL DFS: {}, user: {}.",
conf.get("fs.defaultFS"), UserGroupInformation.getCurrentUser().getUserName());
}

if (option.isExternalZK()) {
assertNotNull("ZOOKEEPER_QUORUM can not be null, if use external ZK", conf.get(HConstants.ZOOKEEPER_QUORUM));
}

// Now do the mini hbase cluster. Set the hbase.rootdir in config.
createRootDir(option.isCreateRootDir());
if (option.isCreateWALDir()) {
Expand Down Expand Up @@ -977,6 +1000,28 @@ public SingleProcessHBaseCluster getMiniHBaseCluster() {
hbaseCluster + " not an instance of " + SingleProcessHBaseCluster.class.getName());
}

/**
* Stops mini hbase, zk, and hdfs clusters.
* If zk or hdfs is external, clean the znode or dfs path.
* @see #startMiniCluster(int)
*/
public void shutdownMiniCluster(StartTestingClusterOption option) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should know whether we have started DFS cluster and zk by our own? Or at least we can store the StartTestingClusterOption so users do not need to pass it again?

LOG.info("Shutting down minicluster");
shutdownMiniHBaseCluster(option);

if (!option.isExternalDFS()) {
shutdownMiniDFSCluster();
}
if (!option.isExternalZK()) {
shutdownMiniZKCluster();
}

cleanupTestDir();
resetUserGroupInformation();
miniClusterRunning = false;
LOG.info("Minicluster is down");
}

/**
* Stops mini hbase, zk, and hdfs clusters.
* @see #startMiniCluster(int)
Expand All @@ -992,6 +1037,42 @@ public void shutdownMiniCluster() throws IOException {
LOG.info("Minicluster is down");
}

/**
* Shutdown HBase mini cluster.Does not shutdown zk or dfs if running.
* If use external dfs or zk, clean the directory.
* @throws java.io.IOException in case command is unsuccessful
*/
public void shutdownMiniHBaseCluster(StartTestingClusterOption option) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto.

cleanup();
if (this.hbaseCluster != null) {
this.hbaseCluster.shutdown();
// Wait till hbase is down before going on to shutdown zk.
this.hbaseCluster.waitUntilShutDown();
this.hbaseCluster = null;
}
if (zooKeeperWatcher != null) {
zooKeeperWatcher.close();
zooKeeperWatcher = null;
}

// clean external dfs dir and znode
if (option.isExternalDFS()) {
FileSystem fs = FileSystem.get(this.conf);
fs.delete(new Path(this.conf.get(HConstants.HBASE_DIR)).getParent(), true);
fs.close();
}
if (option.isExternalZK()) {
try (ZKWatcher watcher = new ZKWatcher(this.conf, "", null)) {
String znode = this.conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT, HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT);
if (ZKUtil.checkExists(watcher, znode) != -1) {
ZKUtil.deleteNodeRecursively(watcher, znode);
}
} catch(KeeperException e) {
throw new IOException(e.getMessage(), e);
}
}
}

/**
* Shutdown HBase mini cluster.Does not shutdown zk or dfs if running.
* @throws java.io.IOException in case command is unsuccessful
Expand Down Expand Up @@ -1034,6 +1115,12 @@ private void cleanup() throws IOException {
conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, -1);
}

// When we use external HDFS, we should use an authorised user.
// If UGI is not reseted, setting hadoop user with HADOOP_USER_NAME does not work.
public void resetUserGroupInformation() {
UserGroupInformation.reset();
}

/**
* Returns the path to the default root dir the minicluster uses. If <code>create</code> is true,
* a new root directory path is fetched irrespective of whether it has been fetched before or not.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,14 +106,23 @@ public final class StartTestingClusterOption {
*/
private final boolean createWALDir;

/**
* Whether to use external DFS.
*/
private boolean externalDFS = false;
/**
* Whether to use external ZK.
*/
private boolean externalZK = false;

/**
* Private constructor. Use {@link Builder#build()}.
*/
private StartTestingClusterOption(int numMasters, int numAlwaysStandByMasters,
Class<? extends HMaster> masterClass, int numRegionServers, List<Integer> rsPorts,
Class<? extends SingleProcessHBaseCluster.MiniHBaseClusterRegionServer> rsClass,
int numDataNodes, String[] dataNodeHosts, int numZkServers, boolean createRootDir,
boolean createWALDir) {
boolean createWALDir, boolean externalDFS, boolean externalZK) {
this.numMasters = numMasters;
this.numAlwaysStandByMasters = numAlwaysStandByMasters;
this.masterClass = masterClass;
Expand All @@ -125,6 +134,8 @@ private StartTestingClusterOption(int numMasters, int numAlwaysStandByMasters,
this.numZkServers = numZkServers;
this.createRootDir = createRootDir;
this.createWALDir = createWALDir;
this.externalDFS = externalDFS;
this.externalZK = externalZK;
}

public int getNumMasters() {
Expand Down Expand Up @@ -171,13 +182,22 @@ public boolean isCreateWALDir() {
return createWALDir;
}

public boolean isExternalDFS() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer here we pass in the necessary configs to connecting an external HDFS, and set them into the Configuration object.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be complicated to pass all the required configuration in kv mode, especially HA or Kerberos, I want to pass an HDFS conf and add it to this.conf, is that ok?

return externalDFS;
}

public boolean isExternalZK() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto for zookeeper.

return externalZK;
}

@Override
public String toString() {
return "StartMiniClusterOption{" + "numMasters=" + numMasters + ", masterClass=" + masterClass +
", numRegionServers=" + numRegionServers + ", rsPorts=" + StringUtils.join(rsPorts) +
", rsClass=" + rsClass + ", numDataNodes=" + numDataNodes + ", dataNodeHosts=" +
Arrays.toString(dataNodeHosts) + ", numZkServers=" + numZkServers + ", createRootDir=" +
createRootDir + ", createWALDir=" + createWALDir + '}';
createRootDir + ", createWALDir=" + createWALDir + ", externalDFS=" + externalDFS +
", externalZK=" + externalZK +'}';
}

/**
Expand Down Expand Up @@ -205,6 +225,8 @@ public static final class Builder {
private int numZkServers = 1;
private boolean createRootDir = false;
private boolean createWALDir = false;
private boolean externalDFS = false;
private boolean externalZK = false;

private Builder() {
}
Expand All @@ -215,7 +237,7 @@ public StartTestingClusterOption build() {
}
return new StartTestingClusterOption(numMasters, numAlwaysStandByMasters, masterClass,
numRegionServers, rsPorts, rsClass, numDataNodes, dataNodeHosts, numZkServers,
createRootDir, createWALDir);
createRootDir, createWALDir, externalDFS, externalZK);
}

public Builder numMasters(int numMasters) {
Expand Down Expand Up @@ -273,6 +295,16 @@ public Builder createWALDir(boolean createWALDir) {
this.createWALDir = createWALDir;
return this;
}

public Builder externalDFS(boolean externalDFS) {
this.externalDFS = externalDFS;
return this;
}

public Builder externalZK(boolean externalZK) {
this.externalZK = externalZK;
return this;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.io.File;
import java.util.List;
import java.util.Random;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
Expand All @@ -41,6 +42,8 @@
import org.apache.hadoop.hbase.testclassification.MiscTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.security.ssl.SSLFactory;
import org.junit.ClassRule;
Expand All @@ -54,6 +57,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* Test our testing utility class
*/
Expand Down Expand Up @@ -140,6 +144,44 @@ public void testMultiClusters() throws Exception {
}
}

@Test
public void testMiniClusterWithExternalDFSAndZK() throws Exception {
HBaseTestingUtil hbt1 = new HBaseTestingUtil();

// Let's say this is external Zk and DFS
MiniDFSCluster dfsCluster = hbt1.startMiniDFSCluster(3);
MiniZooKeeperCluster zkCluster = hbt1.startMiniZKCluster();

Configuration conf = HBaseConfiguration.create();
conf.set("fs.defaultFS", new Path(dfsCluster.getFileSystem().getUri()).toString());

conf.set(HConstants.ZOOKEEPER_QUORUM, zkCluster.getAddress().getHostName());
conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, zkCluster.getAddress().getPort());
conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/hbase_test");

HBaseTestingUtil hbt2 = new HBaseTestingUtil(conf);
StartTestingClusterOption option = StartTestingClusterOption.builder()
.externalZK(true).externalDFS(true).build();
SingleProcessHBaseCluster cluster = hbt2.startMiniCluster(option);
String hbaseRootDir = hbt2.conf.get(HConstants.HBASE_DIR);
String znode = hbt2.conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT);
try {
assertEquals(1, cluster.getLiveRegionServerThreads().size());
} finally {
hbt2.shutdownMiniCluster(option);
}

// check if we cleaned the dir or znode created by miniHbaseCluster
FileSystem fs = FileSystem.get(conf);
assertFalse(fs.exists(new Path(hbaseRootDir).getParent()));

ZKWatcher watcher = new ZKWatcher(hbt1.conf, "TestHbase", null);
assertEquals(ZKUtil.checkExists(watcher, znode), -1);

hbt1.shutdownMiniDFSCluster();
hbt1.shutdownMiniZKCluster();
}

@Test public void testMiniCluster() throws Exception {
HBaseTestingUtil hbt = new HBaseTestingUtil();

Expand Down Expand Up @@ -406,6 +448,7 @@ public void testMiniZooKeeperWithMultipleClientPorts() throws Exception {
assertTrue(!fs.exists(testdir));
assertTrue(fs.mkdirs(testdir));
assertTrue(hbt.cleanupTestDir());
hbt.resetUserGroupInformation();
}

@Test public void testResolvePortConflict() throws Exception {
Expand Down
Loading