Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,11 @@
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.replication.HReplicationServer;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
import org.apache.hadoop.hbase.util.JVMClusterUtil.ReplicationServerThread;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
Expand All @@ -60,7 +62,10 @@
public class LocalHBaseCluster {
private static final Logger LOG = LoggerFactory.getLogger(LocalHBaseCluster.class);
private final List<JVMClusterUtil.MasterThread> masterThreads = new CopyOnWriteArrayList<>();
private final List<JVMClusterUtil.RegionServerThread> regionThreads = new CopyOnWriteArrayList<>();
private final List<JVMClusterUtil.RegionServerThread> regionThreads =
new CopyOnWriteArrayList<>();
private final List<JVMClusterUtil.ReplicationServerThread> replicationThreads =
new CopyOnWriteArrayList<>();
private final static int DEFAULT_NO = 1;
/** local mode */
public static final String LOCAL = "local";
Expand Down Expand Up @@ -259,6 +264,26 @@ public JVMClusterUtil.MasterThread run() throws Exception {
});
}

@SuppressWarnings("unchecked")
public JVMClusterUtil.ReplicationServerThread addReplicationServer(
Configuration config, final int index) throws IOException {
// Create each replication server with its own Configuration instance so each has
// its Connection instance rather than share (see HBASE_INSTANCES down in
// the guts of ConnectionManager).
JVMClusterUtil.ReplicationServerThread rst =
JVMClusterUtil.createReplicationServerThread(config, index);
this.replicationThreads.add(rst);
return rst;
}

public JVMClusterUtil.ReplicationServerThread addReplicationServer(
final Configuration config, final int index, User user)
throws IOException, InterruptedException {
return user.runAs(
(PrivilegedExceptionAction<ReplicationServerThread>) () -> addReplicationServer(config,
index));
}

/**
* @param serverNumber
* @return region server
Expand Down Expand Up @@ -289,6 +314,40 @@ public List<JVMClusterUtil.RegionServerThread> getLiveRegionServers() {
return liveServers;
}

/**
* @param serverNumber replication server number
* @return replication server
*/
public HReplicationServer getReplicationServer(int serverNumber) {
return replicationThreads.get(serverNumber).getReplicationServer();
}

/**
* @return Read-only list of replication server threads.
*/
public List<JVMClusterUtil.ReplicationServerThread> getReplicationServers() {
return Collections.unmodifiableList(this.replicationThreads);
}

/**
* @return List of running servers (Some servers may have been killed or
* aborted during lifetime of cluster; these servers are not included in this
* list).
*/
public List<JVMClusterUtil.ReplicationServerThread> getLiveReplicationServers() {
List<JVMClusterUtil.ReplicationServerThread> liveServers = new ArrayList<>();
List<ReplicationServerThread> list = getReplicationServers();
for (JVMClusterUtil.ReplicationServerThread rst: list) {
if (rst.isAlive()) {
liveServers.add(rst);
}
else {
LOG.info("Not alive {}", rst.getName());
}
}
return liveServers;
}

/**
* @return the Configuration used by this LocalHBaseCluster
*/
Expand Down Expand Up @@ -430,7 +489,7 @@ public void join() {
* Start the cluster.
*/
public void startup() throws IOException {
JVMClusterUtil.startup(this.masterThreads, this.regionThreads);
JVMClusterUtil.startup(this.masterThreads, this.regionThreads, this.replicationThreads);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,19 @@ public boolean isStopped() {
return this.stopped;
}

public void waitForServerOnline(){
while (!isStopped() && !isOnline()) {
synchronized (online) {
try {
online.wait(msgInterval);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
break;
}
}
}
}

/**
* Setup WAL log and replication if enabled. Replication setup is done in here because it wants to
* be hooked up to WAL.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.function.Supplier;

import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.replication.HReplicationServer;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -71,6 +72,33 @@ public void waitForServerOnline() {
}
}

/**
* Datastructure to hold ReplicationServer Thread and ReplicationServer instance
*/
public static class ReplicationServerThread extends Thread {
private final HReplicationServer replicationServer;

public ReplicationServerThread(final HReplicationServer r, final int index) {
super(r, "ReplicationServer:" + index + ";" + r.getServerName().toShortString());
this.replicationServer = r;
}

/**
* @return the replication server
*/
public HReplicationServer getReplicationServer() {
return this.replicationServer;
}

/**
* Block until the replication server has come online, indicating it is ready
* to be used.
*/
public void waitForServerOnline() {
replicationServer.waitForServerOnline();
}
}

/**
* Creates a {@link RegionServerThread}.
* Call 'start' on the returned thread to make it run.
Expand Down Expand Up @@ -98,6 +126,24 @@ public static JVMClusterUtil.RegionServerThread createRegionServerThread(final C
return new JVMClusterUtil.RegionServerThread(server, index);
}

/**
* Creates a {@link ReplicationServerThread}.
* Call 'start' on the returned thread to make it run.
* @param c Configuration to use.
* @param index Used distinguishing the object returned.
* @throws IOException
* @return Replication server added.
*/
public static JVMClusterUtil.ReplicationServerThread createReplicationServerThread(
final Configuration c, final int index) throws IOException {
HReplicationServer server;
try {
server = new HReplicationServer(c);
} catch (Exception e) {
throw new IOException(e);
}
return new JVMClusterUtil.ReplicationServerThread(server, index);
}

/**
* Datastructure to hold Master Thread and Master instance
Expand All @@ -122,7 +168,7 @@ public HMaster getMaster() {
* @param c Configuration to use.
* @param hmc Class to create.
* @param index Used distinguishing the object returned.
* @throws IOException
* @throws IOException exception
* @return Master added.
*/
public static JVMClusterUtil.MasterThread createMasterThread(final Configuration c,
Expand Down Expand Up @@ -165,7 +211,8 @@ private static JVMClusterUtil.MasterThread findActiveMaster(
* @return Address to use contacting primary master.
*/
public static String startup(final List<JVMClusterUtil.MasterThread> masters,
final List<JVMClusterUtil.RegionServerThread> regionservers) throws IOException {
final List<JVMClusterUtil.RegionServerThread> regionservers,
final List<JVMClusterUtil.ReplicationServerThread> replicationServers) throws IOException {
// Implementation note: This method relies on timed sleeps in a loop. It's not great, and
// should probably be re-written to use actual synchronization objects, but it's ok for now

Expand Down Expand Up @@ -193,6 +240,12 @@ public static String startup(final List<JVMClusterUtil.MasterThread> masters,
}
}

if (replicationServers != null) {
for (JVMClusterUtil.ReplicationServerThread t: replicationServers) {
t.start();
}
}

// Wait for an active master to be initialized (implies being master)
// with this, when we return the cluster is complete
final int initTimeout = configuration != null ? Integer.parseInt(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1115,8 +1115,8 @@ public MiniHBaseCluster startMiniHBaseCluster(StartMiniClusterOption option)
Configuration c = new Configuration(this.conf);
TraceUtil.initTracer(c);
this.hbaseCluster = new MiniHBaseCluster(c, option.getNumMasters(),
option.getNumAlwaysStandByMasters(), option.getNumRegionServers(), option.getRsPorts(),
option.getMasterClass(), option.getRsClass());
option.getNumAlwaysStandByMasters(), option.getNumRegionServers(), option.getRsPorts(),
option.getNumReplicationServers(), option.getMasterClass(), option.getRsClass());
// Populate the master address configuration from mini cluster configuration.
conf.set(HConstants.MASTER_ADDRS_KEY, MasterRegistry.getMasterAddr(c));
// Don't leave here till we've done a successful scan of the hbase:meta
Expand Down Expand Up @@ -1241,8 +1241,8 @@ public void restartHBaseCluster(StartMiniClusterOption option)
closeConnection();
this.hbaseCluster =
new MiniHBaseCluster(this.conf, option.getNumMasters(), option.getNumAlwaysStandByMasters(),
option.getNumRegionServers(), option.getRsPorts(), option.getMasterClass(),
option.getRsClass());
option.getNumRegionServers(), option.getRsPorts(), option.getNumReplicationServers(),
option.getMasterClass(), option.getRsClass());
// Don't leave here till we've done a successful scan of the hbase:meta
Connection conn = ConnectionFactory.createConnection(this.conf);
Table t = conn.getTable(TableName.META_TABLE_NAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,13 @@
import org.apache.hadoop.hbase.regionserver.HRegion.FlushResult;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.replication.HReplicationServer;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.test.MetricsAssertHelper;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
import org.apache.hadoop.hbase.util.JVMClusterUtil.ReplicationServerThread;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
Expand Down Expand Up @@ -86,31 +88,33 @@ public MiniHBaseCluster(Configuration conf, int numMasters, int numRegionServers
* @param numRegionServers initial number of region servers to start.
*/
public MiniHBaseCluster(Configuration conf, int numMasters, int numRegionServers,
Class<? extends HMaster> masterClass,
Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass)
Class<? extends HMaster> masterClass,
Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass)
throws IOException, InterruptedException {
this(conf, numMasters, 0, numRegionServers, null, masterClass, regionserverClass);
this(conf, numMasters, 0, numRegionServers, null, 0, masterClass, regionserverClass);
}

/**
* @param rsPorts Ports that RegionServer should use; pass ports if you want to test cluster
* restart where for sure the regionservers come up on same address+port (but
* just with different startcode); by default mini hbase clusters choose new
* arbitrary ports on each cluster start.
* @param numReplicationServers initial number of replication servers to start.
* @throws IOException
* @throws InterruptedException
*/
public MiniHBaseCluster(Configuration conf, int numMasters, int numAlwaysStandByMasters,
int numRegionServers, List<Integer> rsPorts, Class<? extends HMaster> masterClass,
Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass)
int numRegionServers, List<Integer> rsPorts, int numReplicationServers,
Class<? extends HMaster> masterClass,
Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass)
throws IOException, InterruptedException {
super(conf);

// Hadoop 2
CompatibilityFactory.getInstance(MetricsAssertHelper.class).init();

init(numMasters, numAlwaysStandByMasters, numRegionServers, rsPorts, masterClass,
regionserverClass);
init(numMasters, numAlwaysStandByMasters, numRegionServers, rsPorts, numReplicationServers,
masterClass, regionserverClass);
this.initialClusterStatus = getClusterMetrics();
}

Expand Down Expand Up @@ -227,7 +231,8 @@ public void run() {
}

private void init(final int nMasterNodes, final int numAlwaysStandByMasters,
final int nRegionNodes, List<Integer> rsPorts, Class<? extends HMaster> masterClass,
final int nRegionNodes, List<Integer> rsPorts, int numReplicationServers,
Class<? extends HMaster> masterClass,
Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass)
throws IOException, InterruptedException {
try {
Expand All @@ -248,11 +253,17 @@ private void init(final int nMasterNodes, final int numAlwaysStandByMasters,
if (rsPorts != null) {
rsConf.setInt(HConstants.REGIONSERVER_PORT, rsPorts.get(i));
}
User user = HBaseTestingUtility.getDifferentUser(rsConf,
".hfs."+index++);
User user = HBaseTestingUtility.getDifferentUser(rsConf, ".hfs." + index++);
hbaseCluster.addRegionServer(rsConf, i, user);
}

// manually add the replication servers as other users
for (int i = 0; i < numReplicationServers; i++) {
Configuration rsConf = HBaseConfiguration.create(conf);
User user = HBaseTestingUtility.getDifferentUser(rsConf, ".hfs." + index++);
hbaseCluster.addReplicationServer(rsConf, i, user);
}

hbaseCluster.startup();
} catch (IOException e) {
shutdown();
Expand Down Expand Up @@ -791,7 +802,7 @@ public List<JVMClusterUtil.RegionServerThread> getLiveRegionServerThreads() {

/**
* Grab a numbered region server of your choice.
* @param serverNumber
* @param serverNumber region server number
* @return region server
*/
public HRegionServer getRegionServer(int serverNumber) {
Expand All @@ -805,6 +816,43 @@ public HRegionServer getRegionServer(ServerName serverName) {
.findFirst().orElse(null);
}

/**
* @return Number of live replication servers in the cluster currently.
*/
public int getNumLiveReplicationServers() {
return this.hbaseCluster.getLiveReplicationServers().size();
}

/**
* @return List of replication server threads.
*/
public List<JVMClusterUtil.ReplicationServerThread> getReplicationServerThreads() {
return this.hbaseCluster.getReplicationServers();
}

/**
* @return List of live replication server threads (skips the aborted and the killed)
*/
public List<JVMClusterUtil.ReplicationServerThread> getLiveReplicationServerThreads() {
return this.hbaseCluster.getLiveReplicationServers();
}

/**
* Grab a numbered replication server of your choice.
* @param serverNumber
* @return replication server
*/
public HReplicationServer getReplicationServer(int serverNumber) {
return hbaseCluster.getReplicationServer(serverNumber);
}

public HReplicationServer getReplicationServer(ServerName serverName) {
return hbaseCluster.getReplicationServers().stream()
.map(ReplicationServerThread::getReplicationServer)
.filter(r -> r.getServerName().equals(serverName))
.findFirst().orElse(null);
}

public List<HRegion> getRegions(byte[] tableName) {
return getRegions(TableName.valueOf(tableName));
}
Expand Down
Loading