From 2d6b5634819422adbbca9d4b6323f290bf82acb7 Mon Sep 17 00:00:00 2001 From: Bharath Vissapragada Date: Mon, 28 Oct 2019 23:26:13 -0700 Subject: [PATCH] HBASE-18095: Zookeeper-less client connection implementation Generally when an HBaseClient tries to create a cluster Connection, it fetches a bunch of metadata from Zookeeper (like active master address, clusterID, meta locations etc) before it creates the underlying transport. However exposing ZK to all the clients is a DDOS risk and ZK connections in the past have caused issues by not timing out on blocking RPCs (more context in the JIRA). This patch attempts to remove this ZK dependency by making the client fetch all the meta information directly from list of client configured HMaster endpoints. The patch adds a a new AsyncRegistry implementation that encapsulates this logic of fetching this meta information from the provided master end points. New RPCs are added to the HMasters to help fetch this information. Meta HRL caching: ---------------- One critical piece of metadata needed by clients to query tables is meta HRegionLocations. These are fetched from ZK by default. Since this patch moves away from ZK, it adds an in-memory cache of these locations on both Active/StandBy HMasters. ZK Listeners are registered to keep the cache up-to-date. New client configs: ------------------ - 'hbase.client.asyncregistry.masteraddrs' Should be set to a list of comma separated HMaster host:port addresses. - Should be used in conjunction with 'hbase.client.registry.impl' set to HMasterAsyncRegistry class. --- .../MetaRegionsNotAvailableException.java | 35 +++ .../hbase/client/AsyncConnectionImpl.java | 3 + .../hadoop/hbase/client/AsyncRegistry.java | 11 - .../hbase/client/AsyncRegistryFactory.java | 2 +- .../hbase/client/HMasterAsyncRegistry.java | 236 ++++++++++++++++++ .../hadoop/hbase/client/ZKAsyncRegistry.java | 11 - .../hbase/shaded/protobuf/ProtobufUtil.java | 1 + .../hadoop/hbase/zookeeper/ZNodePaths.java | 12 + .../hbase/client/DoNothingAsyncRegistry.java | 10 - .../src/main/protobuf/Master.proto | 43 ++++ .../apache/hadoop/hbase/master/HMaster.java | 7 + .../hbase/master/MasterRpcServices.java | 56 +++++ .../hbase/master/MetaRegionLocationCache.java | 185 ++++++++++++++ .../hadoop/hbase/HBaseTestingUtility.java | 24 ++ .../hbase/client/DummyAsyncRegistry.java | 10 - .../hbase/client/RegionReplicaTestHelper.java | 8 + .../client/TestAsyncMetaRegionLocator.java | 40 ++- .../client/TestHMasterAsyncRegistry.java | 140 +++++++++++ .../hbase/client/TestZKAsyncRegistry.java | 3 - .../master/TestHMasterAsyncRegistryRPCs.java | 214 ++++++++++++++++ 20 files changed, 992 insertions(+), 59 deletions(-) create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/MetaRegionsNotAvailableException.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/HMasterAsyncRegistry.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/master/MetaRegionLocationCache.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHMasterAsyncRegistry.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestHMasterAsyncRegistryRPCs.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaRegionsNotAvailableException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaRegionsNotAvailableException.java new file mode 100644 index 000000000000..add2edb7e8a4 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaRegionsNotAvailableException.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import org.apache.yetus.audience.InterfaceAudience; + +import java.io.IOException; + +/** + * Thrown by master when meta region locations are not cached whatever reason. + * Client is expected to retry when running into this. + */ +@InterfaceAudience.Private +public class MetaRegionsNotAvailableException extends IOException { + private static final long serialVersionUID = (1L << 14) - 1L; + + public MetaRegionsNotAvailableException(String msg) { + super(msg); + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java index 78fad9ea2354..7df16fddb0f9 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java @@ -394,4 +394,7 @@ public Hbck getHbck(ServerName masterServer) throws IOException { Optional getConnectionMetrics() { return metrics; } + + @VisibleForTesting + AsyncRegistry getRegistry() { return registry; } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegistry.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegistry.java index 96329dcfb878..e50a9f5b4c0b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegistry.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegistry.java @@ -26,7 +26,6 @@ /** * Implementations hold cluster information such as this cluster's id, location of hbase:meta, etc.. - * All stuffs that may be related to zookeeper at client side are placed here. *

* Internal use only. */ @@ -45,21 +44,11 @@ interface AsyncRegistry extends Closeable { */ CompletableFuture getClusterId(); - /** - * Get the number of 'running' regionservers. - */ - CompletableFuture getCurrentNrHRS(); - /** * Get the address of HMaster. */ CompletableFuture getMasterAddress(); - /** - * Get the info port of HMaster. - */ - CompletableFuture getMasterInfoPort(); - /** * Closes this instance and releases any system resources associated with it */ diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegistryFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegistryFactory.java index 28726ae5dd20..aebb1de5a4b0 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegistryFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegistryFactory.java @@ -27,7 +27,7 @@ @InterfaceAudience.Private final class AsyncRegistryFactory { - static final String REGISTRY_IMPL_CONF_KEY = "hbase.client.registry.impl"; + public static final String REGISTRY_IMPL_CONF_KEY = "hbase.client.registry.impl"; private AsyncRegistryFactory() { } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HMasterAsyncRegistry.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HMasterAsyncRegistry.java new file mode 100644 index 000000000000..05dc4e807ddd --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HMasterAsyncRegistry.java @@ -0,0 +1,236 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_RPC_TIMEOUT; +import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_TIMEOUT_KEY; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.MasterNotRunningException; +import org.apache.hadoop.hbase.MetaRegionsNotAvailableException; +import org.apache.hadoop.hbase.RegionLocations; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.ipc.HBaseRpcController; +import org.apache.hadoop.hbase.ipc.RpcClient; +import org.apache.hadoop.hbase.ipc.RpcClientFactory; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterIdRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterIdResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMetaLocationsRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMetaLocationsResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsActiveRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsActiveResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService; +import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; +import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; + +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Fetches the meta information directly from HMaster by making relevant RPCs. HMaster RPC end + * points are looked up via client configuration 'hbase.client.asyncregistry.masteraddrs' as + * comma separated list of host:port values. Should be set in conjunction with + * 'hbase.client.registry.impl' set to this class impl. + * + * The class does not cache anything. It is the responsibility of the callers to cache and + * avoid repeated requests. + */ +@InterfaceAudience.Private +public class HMasterAsyncRegistry implements AsyncRegistry { + private static final Logger LOG = LoggerFactory.getLogger(HMasterAsyncRegistry.class); + public static final String CONF_KEY = "hbase.client.asyncregistry.masteraddrs"; + private static final String DEFAULT_HOST_PORT = "localhost:" + HConstants.DEFAULT_MASTER_PORT; + + // Parsed list of host and ports for masters from hbase-site.xml + private final List masterServers; + final Configuration conf; + // RPC client used to talk to the master servers. This uses a stand alone RpcClient instance + // because AsyncRegistry is created prior to creating a cluster Connection. The client is torn + // down in close(). + final RpcClient rpcClient; + final int rpcTimeout; + + public HMasterAsyncRegistry(Configuration config) { + masterServers = new ArrayList<>(); + conf = config; + parseHortPorts(); + // Passing the default cluster ID means that the token based authentication does not work for + // certain client implementations. + // TODO(bharathv): Figure out a way to fetch the CLUSTER ID using a non authenticated way. + rpcClient = RpcClientFactory.createClient(conf, HConstants.CLUSTER_ID_DEFAULT); + rpcTimeout = (int) Math.min(Integer.MAX_VALUE, + TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_TIMEOUT_KEY, + DEFAULT_HBASE_RPC_TIMEOUT))); + } + + private void parseHortPorts() { + String hostPorts = conf.get(CONF_KEY, DEFAULT_HOST_PORT); + for (String hostPort: hostPorts.split(",")) { + masterServers.add(ServerName.valueOf(hostPort, ServerName.NON_STARTCODE)); + } + Preconditions.checkArgument(!masterServers.isEmpty(), String.format("%s is empty", CONF_KEY)); + // Randomize so that not every client sends requests in the same order. + Collections.shuffle(masterServers); + } + + /** + * Util that generates a master stub for a given ServerName. + */ + private MasterService.BlockingInterface getMasterStub(ServerName server) throws IOException { + return MasterService.newBlockingStub( + rpcClient.createBlockingRpcChannel(server, User.getCurrent(), rpcTimeout)); + } + + /** + * Blocking RPC to fetch the meta region locations using one of the masters from the parsed list. + */ + private RegionLocations getMetaRegionLocationsHelper() throws MetaRegionsNotAvailableException { + List result = null; + for (ServerName sname: masterServers) { + try { + MasterService.BlockingInterface stub = getMasterStub(sname); + HBaseRpcController rpcController = RpcControllerFactory.instantiate(conf).newController(); + GetMetaLocationsResponse resp = stub.getMetaLocations(rpcController, + GetMetaLocationsRequest.getDefaultInstance()); + result = resp.getLocationsList(); + } catch (Exception e) { + LOG.warn("Error fetch meta locations from master {}. Trying others.", sname, e); + } + } + if (result == null || result.isEmpty()) { + throw new MetaRegionsNotAvailableException(String.format( + "Meta locations not found. Probed masters: %s", conf.get(CONF_KEY, DEFAULT_HOST_PORT))); + } + List deserializedResult = new ArrayList<>(); + result.stream().forEach( + location -> deserializedResult.add(ProtobufUtil.toRegionLocation(location))); + return new RegionLocations(deserializedResult); + } + + /** + * Picks the first master entry from 'masterHortPorts' to fetch the meta region locations. + */ + @Override + public CompletableFuture getMetaRegionLocation() { + CompletableFuture result = new CompletableFuture<>(); + CompletableFuture.runAsync(() -> { + try { + result.complete(this.getMetaRegionLocationsHelper()); + } catch (Exception e) { + result.completeExceptionally(e); + } + }); + return result; + } + + /** + * Blocking RPC to get the cluster ID from the parsed master list. Returns null if no active + * master found. + */ + private String getClusterIdHelper() throws MasterNotRunningException { + // Loop through all the masters serially. We could be hitting some standby masters which cannot + // process this RPC, so we just skip them. + for (ServerName sname: masterServers) { + try { + MasterService.BlockingInterface stub = getMasterStub(sname); + HBaseRpcController rpcController = RpcControllerFactory.instantiate(conf).newController(); + GetClusterIdResponse resp = + stub.getClusterId(rpcController, GetClusterIdRequest.getDefaultInstance()); + return resp.getClusterId(); + } catch (IOException e) { + LOG.warn("Error fetching cluster ID from master: {}", sname, e); + } catch (ServiceException e) { + // This is probably a standby master, can be ignored. + LOG.debug("Error fetching cluster ID from server: {}" , sname, e); + } + } + // If it comes to this point, no active master could be found. + throw new MasterNotRunningException(String.format( + "No active master found. Probed masters: %s", conf.get(CONF_KEY, DEFAULT_HOST_PORT))); + } + + @Override + public CompletableFuture getClusterId() { + CompletableFuture result = new CompletableFuture<>(); + CompletableFuture.runAsync(() -> { + try { + result.complete(this.getClusterIdHelper()); + } catch (Exception e) { + result.completeExceptionally(e); + } + }); + return result; + } + + /** + * Blocking RPC to get the active master address from the parsed list of master servers. + */ + private ServerName getMasterAddressHelper() throws MasterNotRunningException { + for (ServerName sname: masterServers) { + try { + MasterService.BlockingInterface stub = getMasterStub(sname); + HBaseRpcController rpcController = RpcControllerFactory.instantiate(conf).newController(); + IsActiveResponse resp = stub.isActive(rpcController, IsActiveRequest.getDefaultInstance()); + if (resp.getIsMasterActive()) { + return ServerName.valueOf(sname.getHostname(), sname.getPort(), resp.getStartCode()); + } + } catch (Exception e) { + + } + } + throw new MasterNotRunningException(String.format("No active master found. Probed masters: %s", + conf.get(CONF_KEY, DEFAULT_HOST_PORT))); + } + + /** + * @return the active master among the configured master addresses in 'masterHortPorts'. + */ + @Override + public CompletableFuture getMasterAddress() { + CompletableFuture result = new CompletableFuture<>(); + CompletableFuture.runAsync(() -> { + try { + result.complete(this.getMasterAddressHelper()); + } catch (Exception e) { + result.completeExceptionally(e); + } + }); + return result; + } + + @Override + public void close() { + if (rpcClient != null) { + rpcClient.close(); + } + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKAsyncRegistry.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKAsyncRegistry.java index 36fa6bba7544..08cff2157e3c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKAsyncRegistry.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKAsyncRegistry.java @@ -210,11 +210,6 @@ public CompletableFuture getMetaRegionLocation() { return future; } - @Override - public CompletableFuture getCurrentNrHRS() { - return zk.exists(znodePaths.rsZNode).thenApply(s -> s != null ? s.getNumChildren() : 0); - } - private static ZooKeeperProtos.Master getMasterProto(byte[] data) throws IOException { if (data == null || data.length == 0) { return null; @@ -237,12 +232,6 @@ public CompletableFuture getMasterAddress() { }); } - @Override - public CompletableFuture getMasterInfoPort() { - return getAndConvert(znodePaths.masterAddressZNode, ZKAsyncRegistry::getMasterProto) - .thenApply(proto -> proto != null ? proto.getInfoPort() : 0); - } - @Override public void close() { zk.close(); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java index 81082174bc0a..6fe749bd08d8 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java @@ -3188,6 +3188,7 @@ public static org.apache.hadoop.hbase.client.RegionInfo toRegionInfo(final HBase } public static HBaseProtos.RegionLocation toRegionLocation(HRegionLocation loc) { + if (loc == null) return null; HBaseProtos.RegionLocation.Builder builder = HBaseProtos.RegionLocation.newBuilder(); builder.setRegionInfo(toRegionInfo(loc.getRegion())); if (loc.getServerName() != null) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZNodePaths.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZNodePaths.java index c5e510fe4b9c..5b3144dcc7d3 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZNodePaths.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZNodePaths.java @@ -182,6 +182,18 @@ public String getZNodeForReplica(int replicaId) { .orElseGet(() -> metaReplicaZNodes.get(DEFAULT_REPLICA_ID) + "-" + replicaId); } + /** + * Parses the meta replicaId from the passed path. + * @param path the name of the full path which includes baseZNode. + * @return replicaId + */ + public int getMetaReplicaIdFromPath(String path) { + // Extract the znode from path. The prefix is of the following format. + // baseZNode + PATH_SEPARATOR. + int prefixLen = baseZNode.length() + 1; + return getMetaReplicaIdFromZnode(path.substring(prefixLen)); + } + /** * Parse the meta replicaId from the passed znode * @param znode the name of the znode, does not include baseZNode diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/DoNothingAsyncRegistry.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/DoNothingAsyncRegistry.java index 66330687a26a..f3234b4c8072 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/DoNothingAsyncRegistry.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/DoNothingAsyncRegistry.java @@ -43,21 +43,11 @@ public CompletableFuture getClusterId() { return CompletableFuture.completedFuture(null); } - @Override - public CompletableFuture getCurrentNrHRS() { - return CompletableFuture.completedFuture(0); - } - @Override public CompletableFuture getMasterAddress() { return CompletableFuture.completedFuture(null); } - @Override - public CompletableFuture getMasterInfoPort() { - return CompletableFuture.completedFuture(0); - } - @Override public void close() { } diff --git a/hbase-protocol-shaded/src/main/protobuf/Master.proto b/hbase-protocol-shaded/src/main/protobuf/Master.proto index fee9ab8419d1..b8187f6c9487 100644 --- a/hbase-protocol-shaded/src/main/protobuf/Master.proto +++ b/hbase-protocol-shaded/src/main/protobuf/Master.proto @@ -585,6 +585,36 @@ message GetProceduresResponse { repeated Procedure procedure = 1; } +message IsActiveRequest { +} + +message IsActiveResponse { + required bool is_master_active = 1; + required uint64 start_code = 2; +} + +message GetMetaLocationsRequest { +} + +message GetMetaLocationsResponse { + repeated RegionLocation locations = 1; +} + +message GetPortInfoRequest { +} + +message GetPortInfoResponse { + required uint32 info_port = 1; + required uint32 master_port = 2; +} + +message GetClusterIdRequest { +} + +message GetClusterIdResponse { + required string cluster_id = 1; +} + message GetLocksRequest { } @@ -707,6 +737,19 @@ service MasterService { rpc GetClusterStatus(GetClusterStatusRequest) returns(GetClusterStatusResponse); + /** Returns whether this master is active or not. Served on both active/standby masters.*/ + rpc IsActive(IsActiveRequest) returns(IsActiveResponse); + + /** Used by client to get the location of meta replica(s). Served on both active/standby masters.*/ + rpc GetMetaLocations(GetMetaLocationsRequest) + returns(GetMetaLocationsResponse); + + /** Returns the ClusterId UUID for this cluster */ + rpc GetClusterId(GetClusterIdRequest) returns(GetClusterIdResponse); + + /** Returns the port numbers used by HMaster instances. Served on both active/standby masters.*/ + rpc GetPortInfo(GetPortInfoRequest) returns(GetPortInfoResponse); + /** return true if master is available */ rpc IsMasterRunning(IsMasterRunningRequest) returns(IsMasterRunningResponse); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index bb3c12c0b88c..11fe3030d4b9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -355,6 +355,9 @@ public void run() { // manager of assignment nodes in zookeeper private AssignmentManager assignmentManager; + // Cache of meta locations indexed by replicas + private MetaRegionLocationCache metaRegionLocationCache; + // manager of replication private ReplicationPeerManager replicationPeerManager; @@ -518,6 +521,8 @@ public HMaster(final Configuration conf) maintenanceMode = false; } + metaRegionLocationCache = new MetaRegionLocationCache(this.zooKeeper); + this.rsFatals = new MemoryBoundedLogMessageBuffer( conf.getLong("hbase.master.buffer.for.rs.fatals", 1 * 1024 * 1024)); LOG.info("hbase.rootdir=" + getRootDir() + @@ -3851,4 +3856,6 @@ public Map getWalGroupsReplicationStatus() { public HbckChore getHbckChore() { return this.hbckChore; } + + public MetaRegionLocationCache getMetaRegionLocationCache() { return this.metaRegionLocationCache; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java index 06a99fa5432d..3c284fa546ce 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.ClusterMetricsBuilder; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.MetaRegionsNotAvailableException; import org.apache.hadoop.hbase.MetaTableAccessor; import org.apache.hadoop.hbase.NamespaceDescriptor; import org.apache.hadoop.hbase.Server; @@ -184,14 +185,20 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FixMetaRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FixMetaResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterIdRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterIdResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMetaLocationsRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMetaLocationsResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetPortInfoRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetPortInfoResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProceduresRequest; @@ -205,6 +212,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableStateRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableStateResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.HbckService; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsActiveRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsActiveResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledRequest; @@ -977,6 +986,53 @@ public GetClusterStatusResponse getClusterStatus(RpcController controller, return response.build(); } + @Override + public GetMetaLocationsResponse getMetaLocations( + RpcController controller, GetMetaLocationsRequest req) throws ServiceException { + GetMetaLocationsResponse.Builder response = GetMetaLocationsResponse.newBuilder(); + try { + // We skip the master init check since this RPC is served on both the active and standby + // masters as long as their cache is populated. + response.addAllLocations(master.getMetaRegionLocationCache().getCachedMetaRegionLocations()); + } catch (MetaRegionsNotAvailableException e) { + throw new ServiceException(e); + } + return response.build(); + } + + @Override + public IsActiveResponse isActive(RpcController controller, IsActiveRequest req) + throws ServiceException { + IsActiveResponse.Builder response = IsActiveResponse.newBuilder(); + response.setIsMasterActive(master.isActiveMaster()); + response.setStartCode(master.getStartcode()); + return response.build(); + } + + @Override + public GetPortInfoResponse getPortInfo(RpcController controller, GetPortInfoRequest req) + throws ServiceException { + GetPortInfoResponse.Builder response = GetPortInfoResponse.newBuilder(); + response.setMasterPort( + master.getConfiguration().getInt(HConstants.MASTER_PORT, HConstants.DEFAULT_MASTER_PORT)); + response.setInfoPort(master.getConfiguration().getInt(HConstants.MASTER_INFO_PORT, + HConstants.DEFAULT_MASTER_INFOPORT)); + return response.build(); + } + + @Override + public GetClusterIdResponse getClusterId(RpcController controller, GetClusterIdRequest req) + throws ServiceException { + GetClusterIdResponse.Builder response = GetClusterIdResponse.newBuilder(); + try { + master.checkInitialized(); + response.setClusterId(master.getClusterId()); + } catch (IOException e) { + throw new ServiceException(e); + } + return response.build(); + } + /** * List the currently available/stored snapshots. Any in-progress snapshots are ignored */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MetaRegionLocationCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MetaRegionLocationCache.java new file mode 100644 index 000000000000..db55f66acb4f --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MetaRegionLocationCache.java @@ -0,0 +1,185 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.master; + +import org.apache.hadoop.hbase.MetaRegionsNotAvailableException; +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; + +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.types.CopyOnWriteArrayMap; +import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; +import org.apache.hadoop.hbase.zookeeper.ZKListener; +import org.apache.hadoop.hbase.zookeeper.ZKWatcher; +import org.apache.hadoop.hbase.zookeeper.ZNodePaths; +import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.zookeeper.KeeperException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ConcurrentNavigableMap; + +/** + * A cache of meta region location metadata. This cache is used to serve 'GetMetaLocations' RPCs + * from clients. Registers a listener on ZK to track changes to the meta table znodes. Clients + * are expected to retry if the meta information is stale. This class is thread-safe. + */ +@InterfaceAudience.Private +public class MetaRegionLocationCache extends ZKListener { + + private static final Logger LOG = LoggerFactory.getLogger(MetaRegionLocationCache.class); + + // Maximum number of times we retry when ZK operation times out. Should this be configurable? + private static final int MAX_ZK_META_FETCH_RETRIES = 10; + + private ZKWatcher watcher; + // Cached meta region locations indexed by replica ID. + // CopyOnWriteArrayMap ensures synchronization during updates and a consistent snapshot during + // client requests. Even though CopyOnWriteArrayMap copies the data structure for every write, + // that should be OK since the size of the list is often small and mutations are not too often + // and we do not need to block client requests while mutations are in progress. + CopyOnWriteArrayMap cachedMetaLocations; + + private enum ZNodeOpType { + INIT, + CREATED, + CHANGED, + DELETED + }; + + public MetaRegionLocationCache(ZKWatcher zkWatcher) { + super(zkWatcher); + watcher = zkWatcher; + cachedMetaLocations = new CopyOnWriteArrayMap<>(); + watcher.registerListener(this); + // Populate the initial snapshot of data from meta znodes. + // This is needed because stand-by masters can potentially start after the initial znode + // creation. + populateInitialMetaLocations(); + } + + private void populateInitialMetaLocations() { + int retries = 0; + while (retries++ < MAX_ZK_META_FETCH_RETRIES) { + try { + List znodes = watcher.getMetaReplicaNodes(); + for (String znode: znodes) { + String path = ZNodePaths.joinZNode(watcher.getZNodePaths().baseZNode, znode); + updateMetaLocation(path, ZNodeOpType.INIT); + } + break; + } catch (KeeperException.OperationTimeoutException e) { + LOG.warn("Timed out connecting to ZK cluster", e); + + } catch (KeeperException e) { + LOG.warn("Error populating initial meta locations", e); + break; + } + } + } + + private void updateMetaLocation(String path, ZNodeOpType opType) { + if (!isValidMetaZNode(path)) { + return; + } + LOG.info("Meta znode for path {}: {}", path, opType.name()); + int replicaId = watcher.getZNodePaths().getMetaReplicaIdFromPath(path); + if (opType == ZNodeOpType.DELETED) { + cachedMetaLocations.remove(replicaId); + return; + } + RegionState state = null; + int retries = 0; + while (retries++ < MAX_ZK_META_FETCH_RETRIES) { + try { + state = MetaTableLocator.getMetaRegionState(watcher, replicaId); + break; + } catch (KeeperException.OperationTimeoutException oe) { + // LOG and retry. + LOG.warn("Timed out fetching meta location information for path {}", path, oe); + } catch (KeeperException e) { + LOG.error("Error getting meta location for path {}", path, e); + break; + } + } + if (state == null) { + cachedMetaLocations.put(replicaId, null); + return; + } + cachedMetaLocations.put( + replicaId, new HRegionLocation(state.getRegion(), state.getServerName())); + } + + /** + * Converts the current cache snapshot into a GetMetaLocations() RPC return payload. + * @return Protobuf serialized list of cached meta HRegionLocations + * @throws MetaRegionsNotAvailableException if the cache is not populated. + */ + public List getCachedMetaRegionLocations() + throws MetaRegionsNotAvailableException { + ConcurrentNavigableMap snapshot = + cachedMetaLocations.tailMap(cachedMetaLocations.firstKey(), true); + if (snapshot == null || snapshot.isEmpty()) { + // This could be possible if the master has not successfully initialized yet or meta region + // is stuck in some weird state. + throw new MetaRegionsNotAvailableException("Meta cache is empty"); + } + List result = new ArrayList<>(); + // Handle missing replicas, if any? + snapshot.values().stream().forEach( + location -> result.add(ProtobufUtil.toRegionLocation(location))); + return result; + } + + /** + * Helper to check if the given 'path' corresponds to a meta znode. This listener is only + * interested in changes to meta znodes. + */ + private boolean isValidMetaZNode(String path) { + return watcher.getZNodePaths().isAnyMetaReplicaZNode(path); + } + + /** + * Test helper to invalidate cached metadata for a given meta replica ID. This is done + * synchronously with the meta region moves in tests to avoid any flaky tests. + */ + @VisibleForTesting + public void invalidateMetaReplica(int replicaId) { + String path = watcher.getZNodePaths().getZNodeForReplica(replicaId); + nodeDataChanged(path); + } + + @Override + public void nodeCreated(String path) { + updateMetaLocation(path, ZNodeOpType.CREATED); + } + + @Override + public void nodeDeleted(String path) { + updateMetaLocation(path, ZNodeOpType.DELETED); + } + + @Override + public void nodeDataChanged(String path) { + updateMetaLocation(path, ZNodeOpType.CHANGED); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index a50ac11db6eb..034c615bbf22 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -213,6 +213,10 @@ public class HBaseTestingUtility extends HBaseZKTestingUtility { private volatile AsyncClusterConnection asyncConnection; + // Tracks any other connections created with custom client config. Used for testing clients with custom + // configurations. Tracked here so that they can be cleaned up on close() / restart. + private List customConnections = Collections.synchronizedList(new ArrayList<>()); + /** Filesystem URI used for map-reduce mini-cluster setup */ private static String FS_URI; @@ -1252,6 +1256,10 @@ public void restartHBaseCluster(StartMiniClusterOption option) this.asyncConnection.close(); this.asyncConnection = null; } + for (AsyncClusterConnection connection: customConnections) { + Closeables.close(connection, true); + } + customConnections.clear(); this.hbaseCluster = new MiniHBaseCluster(this.conf, option.getNumMasters(), option.getNumRegionServers(), option.getRsPorts(), option.getMasterClass(), option.getRsClass()); @@ -3067,6 +3075,19 @@ public Connection getConnection() throws IOException { return this.asyncConnection.toConnection(); } + /** + * Creates a new Connection instance for this mini cluster with a given input config. Use it wisely since + * connection creation is expensive. For all practical purposes @link(getConnection()) should be good enough. This + * helper should only used if one wants to test a custom client side configuration that differs from the conf used to + * spawn the mini-cluster. + */ + public AsyncClusterConnection getCustomConnection(Configuration conf) throws IOException { + User user = UserProvider.instantiate(conf).getCurrent(); + AsyncClusterConnection connection = ClusterConnectionFactory.createAsyncClusterConnection(conf, null, user); + customConnections.add(connection); + return connection; + } + public AsyncClusterConnection getAsyncConnection() throws IOException { if (this.asyncConnection == null) { initConnection(); @@ -3077,6 +3098,9 @@ public AsyncClusterConnection getAsyncConnection() throws IOException { public void closeConnection() throws IOException { Closeables.close(hbaseAdmin, true); Closeables.close(asyncConnection, true); + for (AsyncClusterConnection connection: customConnections) { + Closeables.close(connection, true); + } this.hbaseAdmin = null; this.asyncConnection = null; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncRegistry.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncRegistry.java index e9ae25d2eaf0..eccb8ef42360 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncRegistry.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncRegistry.java @@ -39,21 +39,11 @@ public CompletableFuture getClusterId() { return null; } - @Override - public CompletableFuture getCurrentNrHRS() { - return null; - } - @Override public CompletableFuture getMasterAddress() { return null; } - @Override - public CompletableFuture getMasterInfoPort() { - return null; - } - @Override public void close() { } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/RegionReplicaTestHelper.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/RegionReplicaTestHelper.java index 57ff7b7e9c56..877f167d98d5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/RegionReplicaTestHelper.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/RegionReplicaTestHelper.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; +import org.apache.hadoop.hbase.master.MetaRegionLocationCache; final class RegionReplicaTestHelper { @@ -89,6 +90,13 @@ static ServerName moveRegion(HBaseTestingUtility util, HRegionLocation currentLo .map(t -> t.getRegionServer().getServerName()).filter(sn -> !sn.equals(serverName)).findAny() .get(); util.getAdmin().move(regionInfo.getEncodedNameAsBytes(), newServerName); + if (regionInfo.isMetaRegion()) { + // Invalidate the meta cache forcefully to avoid test races. Otherwise there might be a + // delay in master receiving the change event and the cache could be stale in that window. + MetaRegionLocationCache metaCache = + util.getMiniHBaseCluster().getMaster().getMetaRegionLocationCache(); + metaCache.invalidateMetaReplica(replicaId); + } util.waitFor(30000, new ExplainingPredicate() { @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocator.java index 1f0d40b5f4f7..8a4fbca68da8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocator.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocator.java @@ -20,6 +20,7 @@ import static org.apache.hadoop.hbase.client.RegionReplicaTestHelper.testLocator; import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; @@ -45,43 +46,56 @@ public class TestAsyncMetaRegionLocator { private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); - private static AsyncRegistry REGISTRY; - - private static AsyncMetaRegionLocator LOCATOR; - @BeforeClass public static void setUp() throws Exception { TEST_UTIL.getConfiguration().set(BaseLoadBalancer.TABLES_ON_MASTER, "none"); TEST_UTIL.getConfiguration().setInt(HConstants.META_REPLICAS_NUM, 3); TEST_UTIL.startMiniCluster(3); - REGISTRY = AsyncRegistryFactory.getRegistry(TEST_UTIL.getConfiguration()); - RegionReplicaTestHelper - .waitUntilAllMetaReplicasHavingRegionLocation(TEST_UTIL.getConfiguration(), REGISTRY, 3); + try (AsyncRegistry registry = AsyncRegistryFactory.getRegistry(TEST_UTIL.getConfiguration())) { + RegionReplicaTestHelper + .waitUntilAllMetaReplicasHavingRegionLocation(TEST_UTIL.getConfiguration(), registry, 3); + } TEST_UTIL.getAdmin().balancerSwitch(false, true); - LOCATOR = new AsyncMetaRegionLocator(REGISTRY); } @AfterClass public static void tearDown() throws Exception { - IOUtils.closeQuietly(REGISTRY); TEST_UTIL.shutdownMiniCluster(); } - @Test - public void test() throws Exception { + private void verifyLocator(AsyncMetaRegionLocator locator) throws Exception { testLocator(TEST_UTIL, TableName.META_TABLE_NAME, new Locator() { @Override public void updateCachedLocationOnError(HRegionLocation loc, Throwable error) throws Exception { - LOCATOR.updateCachedLocationOnError(loc, error); + locator.updateCachedLocationOnError(loc, error); } @Override public RegionLocations getRegionLocations(TableName tableName, int replicaId, boolean reload) throws Exception { - return LOCATOR.getRegionLocations(replicaId, reload).get(); + return locator.getRegionLocations(replicaId, reload).get(); } }); } + + @Test + public void testZkAsyncRegistry() throws Exception { + try (ZKAsyncRegistry registry = new ZKAsyncRegistry(TEST_UTIL.getConfiguration())) { + verifyLocator(new AsyncMetaRegionLocator(registry)); + } + } + + @Test + public void testHMasterAsyncRegistry() throws Exception { + Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); + String masterHostName = + TEST_UTIL.getMiniHBaseCluster().getMaster().getServerName().getHostname(); + int masterPort = TEST_UTIL.getMiniHBaseCluster().getMaster().getServerName().getPort(); + conf.set(HMasterAsyncRegistry.CONF_KEY, masterHostName + ":" + masterPort); + try (HMasterAsyncRegistry registry = new HMasterAsyncRegistry((conf))) { + verifyLocator(new AsyncMetaRegionLocator(registry)); + } + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHMasterAsyncRegistry.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHMasterAsyncRegistry.java new file mode 100644 index 000000000000..78b1ca16196c --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHMasterAsyncRegistry.java @@ -0,0 +1,140 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.com.google.common.base.Joiner; +import org.apache.hbase.thirdparty.com.google.common.net.HostAndPort; + +import java.util.ArrayList; +import java.util.List; + +import static junit.framework.TestCase.assertTrue; +import static org.apache.hadoop.hbase.HConstants.META_REPLICAS_NUM; +import static org.junit.Assert.assertEquals; + +@Category({ LargeTests.class, ClientTests.class }) +public class TestHMasterAsyncRegistry { + private static final Logger LOG = LoggerFactory.getLogger(TestHMasterAsyncRegistry.class); + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static final TableName TEST_TABLE = TableName.valueOf("foo"); + private static final byte[] COL_FAM = Bytes.toBytes("cf"); + private static final byte[] QUALIFIER = Bytes.toBytes("col"); + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestHMasterAsyncRegistry.class); + + // Automatically cleaned up on cluster shutdown. + private static AsyncClusterConnection customConnection; + private static HMasterAsyncRegistry REGISTRY; + + @BeforeClass + public static void setUp() throws Exception { + TEST_UTIL.getConfiguration().setInt(META_REPLICAS_NUM, 3); + TEST_UTIL.startMiniCluster(3); + Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); + List hostAndPorts = new ArrayList<>(); + String masterHostName = + TEST_UTIL.getMiniHBaseCluster().getMaster().getServerName().getHostname(); + int masterPort = TEST_UTIL.getMiniHBaseCluster().getMaster().getServerName().getPort(); + // Add some invalid hostAndPort configs. The implementation should be resilient enough to + // skip those and probe for the only working master. + // 1. Valid hostname but invalid port + hostAndPorts.add(HostAndPort.fromParts(masterHostName, 10001)); + // 2. Invalid hostname but valid port + hostAndPorts.add(HostAndPort.fromParts("foo.bar", masterPort)); + // 3. Invalid hostname and port. + hostAndPorts.add(HostAndPort.fromParts("foo.bar", 10003)); + // 4. Finally valid host:port + hostAndPorts.add(HostAndPort.fromParts(masterHostName, masterPort)); + final String config = Joiner.on(",").join(hostAndPorts); + conf.set(HMasterAsyncRegistry.CONF_KEY, config); + conf.set(AsyncRegistryFactory.REGISTRY_IMPL_CONF_KEY, HMasterAsyncRegistry.class.getName()); + // make sure that we do not depend on this config when getting locations for meta replicas, see + // HBASE-21658. + conf.setInt(META_REPLICAS_NUM, 1); + REGISTRY = new HMasterAsyncRegistry(conf); + customConnection = TEST_UTIL.getCustomConnection(conf); + } + + @AfterClass + public static void tearDown() throws Exception { + IOUtils.closeQuietly(REGISTRY); + TEST_UTIL.shutdownMiniCluster(); + } + + @Test + public void testRegistryImpl() throws Exception { + HMaster master = TEST_UTIL.getMiniHBaseCluster().getMaster(); + assertEquals(REGISTRY.getClusterId().get(), master.getClusterId()); + assertEquals(REGISTRY.getMasterAddress().get(), master.getServerName()); + } + + /** + * Tests basic create, put, scan operations using the connection. + */ + @Test + public void testCustomConnectionBasicOps() throws Exception { + // Verify that the right registry is in use. + assertTrue(customConnection instanceof AsyncClusterConnectionImpl); + assertTrue(((AsyncClusterConnectionImpl) customConnection).getRegistry() + instanceof HMasterAsyncRegistry); + Connection connection = customConnection.toConnection(); + // Create a test table. + Admin admin = connection.getAdmin(); + TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(TEST_TABLE). + setColumnFamily(ColumnFamilyDescriptorBuilder.of(COL_FAM)); + admin.createTable(builder.build()); + try (Table table = connection.getTable(TEST_TABLE)){ + // Insert one row each region + int insertNum = 10; + for (int i = 0; i < 10; i++) { + Put put = new Put(Bytes.toBytes("row" + String.format("%03d", i))); + put.addColumn(COL_FAM, QUALIFIER, QUALIFIER); + table.put(put); + } + // Verify the row count. + try (ResultScanner scanner = table.getScanner(new Scan())) { + int count = 0; + for (Result r : scanner) { + Assert.assertTrue(!r.isEmpty()); + count++; + } + assertEquals(insertNum, count); + } + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestZKAsyncRegistry.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestZKAsyncRegistry.java index 5a72daea2030..16f43bd9e00b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestZKAsyncRegistry.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestZKAsyncRegistry.java @@ -84,11 +84,8 @@ public void test() throws InterruptedException, ExecutionException, IOException String expectedClusterId = TEST_UTIL.getHBaseCluster().getMaster().getClusterId(); assertEquals("Expected " + expectedClusterId + ", found=" + clusterId, expectedClusterId, clusterId); - assertEquals(TEST_UTIL.getHBaseCluster().getClusterMetrics().getLiveServerMetrics().size(), - REGISTRY.getCurrentNrHRS().get().intValue()); assertEquals(TEST_UTIL.getHBaseCluster().getMaster().getServerName(), REGISTRY.getMasterAddress().get()); - assertEquals(-1, REGISTRY.getMasterInfoPort().get().intValue()); RegionReplicaTestHelper .waitUntilAllMetaReplicasHavingRegionLocation(TEST_UTIL.getConfiguration(), REGISTRY, 3); RegionLocations locs = REGISTRY.getMetaRegionLocation().get(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestHMasterAsyncRegistryRPCs.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestHMasterAsyncRegistryRPCs.java new file mode 100644 index 000000000000..549d370597bd --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestHMasterAsyncRegistryRPCs.java @@ -0,0 +1,214 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.ipc.RpcClient; +import org.apache.hadoop.hbase.ipc.RpcClientFactory; +import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.zookeeper.ZKWatcher; +import org.apache.hadoop.hbase.zookeeper.ZNodePaths; +import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel; +import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterIdRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterIdResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetPortInfoRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetPortInfoResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMetaLocationsRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMetaLocationsResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsActiveRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsActiveResponse; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +@Category({LargeTests.class, MasterTests.class}) +public class TestHMasterAsyncRegistryRPCs { + private static final Logger LOG = LoggerFactory.getLogger(TestHMasterAsyncRegistryRPCs.class); + + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static RpcClient rpcClient; + private static HMaster activeMaster; + private static List standByMasters; + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestHMasterAsyncRegistryRPCs.class); + + @BeforeClass + public static void setUp() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + conf.set(HConstants.MASTER_PORT, "0"); + conf.setStrings(HConstants.ZOOKEEPER_ZNODE_PARENT, "/metacachetest"); + TEST_UTIL.startMiniCluster(2); + activeMaster = TEST_UTIL.getHBaseCluster().getMaster(); + standByMasters = new ArrayList<>(); + // Create a few standby masters. + for (int i = 0; i < 2; i++) { + standByMasters.add(TEST_UTIL.getHBaseCluster().startMaster().getMaster()); + } + rpcClient = RpcClientFactory.createClient(conf, HConstants.CLUSTER_ID_DEFAULT); + } + + @AfterClass + public static void tearDown() throws Exception { + if (rpcClient != null) { + rpcClient.close(); + } + TEST_UTIL.shutdownMiniCluster(); + } + + /** + * Verifies that getMetaReplicaIdFromPath() parses the full meta znode paths correctly. + * @throws IOException + */ + @Test + public void TestGetMetaReplicaIdFromPath() throws IOException { + ZKWatcher zk = TEST_UTIL.getZooKeeperWatcher(); + String metaZnode= + ZNodePaths.joinZNode(zk.getZNodePaths().baseZNode, zk.getZNodePaths().metaZNodePrefix); + assertEquals(0, zk.getZNodePaths().getMetaReplicaIdFromPath(metaZnode)); + for (int i = 1; i < 10; i++) { + assertEquals(i, zk.getZNodePaths().getMetaReplicaIdFromPath(metaZnode + "-" +i)); + } + for (String suffix : new String[]{"foo", "1234", "foo-123", "123-foo-234"}) { + try { + final String input = metaZnode + suffix; + zk.getZNodePaths().getMetaReplicaIdFromZnode(input); + fail("Exception not hit getMetaReplicaIdFromZnode(): " + input); + } catch (NumberFormatException e) { + // Expected + } + } + } + + private void verifyGetCachedMetadataLocations(HMaster master) throws IOException { + try { + ServerName sm = master.getServerName(); + BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(sm, User.getCurrent(), 0); + MasterProtos.MasterService.BlockingInterface stub = + MasterProtos.MasterService.newBlockingStub(channel); + GetMetaLocationsResponse response = stub.getMetaLocations(null, + GetMetaLocationsRequest.getDefaultInstance()); + assertEquals(1, response.getLocationsCount()); + HBaseProtos.RegionLocation location = response.getLocations(0); + assertEquals(sm.getHostname(), location.getServerName().getHostName()); + } catch (ServiceException e) { + LOG.error( + "Error in GetCachedMetadataLocations. Active master: {}", master.isActiveMaster(), e); + fail("Error calling GetCachedMetadataLocations()"); + } + } + + /** + * Verifies that both active and standby masters + * @throws IOException + */ + @Test + public void TestGetCachedMetadataLocations() throws IOException { + // Verify that the active and standby HMasters start correctly. + assertTrue(activeMaster.serviceStarted); + assertTrue(activeMaster.isActiveMaster()); + verifyGetCachedMetadataLocations(activeMaster); + for (HMaster standByMaster: standByMasters) { + assertTrue(!standByMaster.serviceStarted); + assertTrue(!standByMaster.isActiveMaster()); + verifyGetCachedMetadataLocations(standByMaster); + } + } + + private void verifyIsMasterActive(HMaster master, boolean expectedResult) throws Exception { + ServerName sm = master.getServerName(); + BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(sm, User.getCurrent(), 0); + MasterProtos.MasterService.BlockingInterface stub = + MasterProtos.MasterService.newBlockingStub(channel); + IsActiveResponse response = stub.isActive(null, IsActiveRequest.getDefaultInstance()); + assertEquals(expectedResult, response.getIsMasterActive()); + } + + @Test + public void TestIsActiveRPC() throws Exception { + verifyIsMasterActive(activeMaster, true); + for (HMaster master: standByMasters) verifyIsMasterActive(master, false); + } + + private void verifyMasterPorts(HMaster master) throws Exception { + ServerName sm = master.getServerName(); + BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(sm, User.getCurrent(), 0); + MasterProtos.MasterService.BlockingInterface stub = + MasterProtos.MasterService.newBlockingStub(channel); + GetPortInfoResponse response = stub.getPortInfo(null, GetPortInfoRequest.getDefaultInstance()); + Configuration conf = master.getConfiguration(); + assertEquals(response.getMasterPort(), + conf.getInt(HConstants.MASTER_PORT, HConstants.DEFAULT_MASTER_PORT)); + assertEquals(response.getInfoPort(), + conf.getInt(HConstants.MASTER_INFO_PORT, HConstants.DEFAULT_MASTER_INFOPORT)); + } + + @Test + public void TestGetPortInfoRPC() throws Exception { + verifyMasterPorts(activeMaster); + for (HMaster master: standByMasters) verifyMasterPorts(master); + } + + public String getClusterId(HMaster master) throws IOException, ServiceException { + ServerName sm = master.getServerName(); + BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(sm, User.getCurrent(), 0); + MasterProtos.MasterService.BlockingInterface stub = + MasterProtos.MasterService.newBlockingStub(channel); + GetClusterIdResponse response = + stub.getClusterId(null, GetClusterIdRequest.getDefaultInstance()); + return response.getClusterId(); + } + + @Test + public void TestClusterIdRPC() throws Exception { + assertEquals(activeMaster.getClusterId(), getClusterId(activeMaster)); + try { + assertTrue(standByMasters.size() > 0); + getClusterId(standByMasters.get(0)); + fail("No exception thrown while fetching ClusterId for standby master: " + + standByMasters.get(0).getServerName().toString()); + } catch (ServiceException e) { + // Expected. + assertTrue(e.getMessage().contains("Server is not running yet")); + } + } +}