diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaRegionsNotAvailableException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaRegionsNotAvailableException.java new file mode 100644 index 000000000000..add2edb7e8a4 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaRegionsNotAvailableException.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import org.apache.yetus.audience.InterfaceAudience; + +import java.io.IOException; + +/** + * Thrown by master when meta region locations are not cached whatever reason. + * Client is expected to retry when running into this. + */ +@InterfaceAudience.Private +public class MetaRegionsNotAvailableException extends IOException { + private static final long serialVersionUID = (1L << 14) - 1L; + + public MetaRegionsNotAvailableException(String msg) { + super(msg); + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java index 78fad9ea2354..7df16fddb0f9 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java @@ -394,4 +394,7 @@ public Hbck getHbck(ServerName masterServer) throws IOException { Optional getConnectionMetrics() { return metrics; } + + @VisibleForTesting + AsyncRegistry getRegistry() { return registry; } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegistry.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegistry.java index 96329dcfb878..e50a9f5b4c0b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegistry.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegistry.java @@ -26,7 +26,6 @@ /** * Implementations hold cluster information such as this cluster's id, location of hbase:meta, etc.. - * All stuffs that may be related to zookeeper at client side are placed here. *

* Internal use only. */ @@ -45,21 +44,11 @@ interface AsyncRegistry extends Closeable { */ CompletableFuture getClusterId(); - /** - * Get the number of 'running' regionservers. - */ - CompletableFuture getCurrentNrHRS(); - /** * Get the address of HMaster. */ CompletableFuture getMasterAddress(); - /** - * Get the info port of HMaster. - */ - CompletableFuture getMasterInfoPort(); - /** * Closes this instance and releases any system resources associated with it */ diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegistryFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegistryFactory.java index 28726ae5dd20..aebb1de5a4b0 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegistryFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegistryFactory.java @@ -27,7 +27,7 @@ @InterfaceAudience.Private final class AsyncRegistryFactory { - static final String REGISTRY_IMPL_CONF_KEY = "hbase.client.registry.impl"; + public static final String REGISTRY_IMPL_CONF_KEY = "hbase.client.registry.impl"; private AsyncRegistryFactory() { } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HMasterAsyncRegistry.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HMasterAsyncRegistry.java new file mode 100644 index 000000000000..05dc4e807ddd --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HMasterAsyncRegistry.java @@ -0,0 +1,236 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_RPC_TIMEOUT; +import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_TIMEOUT_KEY; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.MasterNotRunningException; +import org.apache.hadoop.hbase.MetaRegionsNotAvailableException; +import org.apache.hadoop.hbase.RegionLocations; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.ipc.HBaseRpcController; +import org.apache.hadoop.hbase.ipc.RpcClient; +import org.apache.hadoop.hbase.ipc.RpcClientFactory; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterIdRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterIdResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMetaLocationsRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMetaLocationsResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsActiveRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsActiveResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService; +import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; +import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; + +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Fetches the meta information directly from HMaster by making relevant RPCs. HMaster RPC end + * points are looked up via client configuration 'hbase.client.asyncregistry.masteraddrs' as + * comma separated list of host:port values. Should be set in conjunction with + * 'hbase.client.registry.impl' set to this class impl. + * + * The class does not cache anything. It is the responsibility of the callers to cache and + * avoid repeated requests. + */ +@InterfaceAudience.Private +public class HMasterAsyncRegistry implements AsyncRegistry { + private static final Logger LOG = LoggerFactory.getLogger(HMasterAsyncRegistry.class); + public static final String CONF_KEY = "hbase.client.asyncregistry.masteraddrs"; + private static final String DEFAULT_HOST_PORT = "localhost:" + HConstants.DEFAULT_MASTER_PORT; + + // Parsed list of host and ports for masters from hbase-site.xml + private final List masterServers; + final Configuration conf; + // RPC client used to talk to the master servers. This uses a stand alone RpcClient instance + // because AsyncRegistry is created prior to creating a cluster Connection. The client is torn + // down in close(). + final RpcClient rpcClient; + final int rpcTimeout; + + public HMasterAsyncRegistry(Configuration config) { + masterServers = new ArrayList<>(); + conf = config; + parseHortPorts(); + // Passing the default cluster ID means that the token based authentication does not work for + // certain client implementations. + // TODO(bharathv): Figure out a way to fetch the CLUSTER ID using a non authenticated way. + rpcClient = RpcClientFactory.createClient(conf, HConstants.CLUSTER_ID_DEFAULT); + rpcTimeout = (int) Math.min(Integer.MAX_VALUE, + TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_TIMEOUT_KEY, + DEFAULT_HBASE_RPC_TIMEOUT))); + } + + private void parseHortPorts() { + String hostPorts = conf.get(CONF_KEY, DEFAULT_HOST_PORT); + for (String hostPort: hostPorts.split(",")) { + masterServers.add(ServerName.valueOf(hostPort, ServerName.NON_STARTCODE)); + } + Preconditions.checkArgument(!masterServers.isEmpty(), String.format("%s is empty", CONF_KEY)); + // Randomize so that not every client sends requests in the same order. + Collections.shuffle(masterServers); + } + + /** + * Util that generates a master stub for a given ServerName. + */ + private MasterService.BlockingInterface getMasterStub(ServerName server) throws IOException { + return MasterService.newBlockingStub( + rpcClient.createBlockingRpcChannel(server, User.getCurrent(), rpcTimeout)); + } + + /** + * Blocking RPC to fetch the meta region locations using one of the masters from the parsed list. + */ + private RegionLocations getMetaRegionLocationsHelper() throws MetaRegionsNotAvailableException { + List result = null; + for (ServerName sname: masterServers) { + try { + MasterService.BlockingInterface stub = getMasterStub(sname); + HBaseRpcController rpcController = RpcControllerFactory.instantiate(conf).newController(); + GetMetaLocationsResponse resp = stub.getMetaLocations(rpcController, + GetMetaLocationsRequest.getDefaultInstance()); + result = resp.getLocationsList(); + } catch (Exception e) { + LOG.warn("Error fetch meta locations from master {}. Trying others.", sname, e); + } + } + if (result == null || result.isEmpty()) { + throw new MetaRegionsNotAvailableException(String.format( + "Meta locations not found. Probed masters: %s", conf.get(CONF_KEY, DEFAULT_HOST_PORT))); + } + List deserializedResult = new ArrayList<>(); + result.stream().forEach( + location -> deserializedResult.add(ProtobufUtil.toRegionLocation(location))); + return new RegionLocations(deserializedResult); + } + + /** + * Picks the first master entry from 'masterHortPorts' to fetch the meta region locations. + */ + @Override + public CompletableFuture getMetaRegionLocation() { + CompletableFuture result = new CompletableFuture<>(); + CompletableFuture.runAsync(() -> { + try { + result.complete(this.getMetaRegionLocationsHelper()); + } catch (Exception e) { + result.completeExceptionally(e); + } + }); + return result; + } + + /** + * Blocking RPC to get the cluster ID from the parsed master list. Returns null if no active + * master found. + */ + private String getClusterIdHelper() throws MasterNotRunningException { + // Loop through all the masters serially. We could be hitting some standby masters which cannot + // process this RPC, so we just skip them. + for (ServerName sname: masterServers) { + try { + MasterService.BlockingInterface stub = getMasterStub(sname); + HBaseRpcController rpcController = RpcControllerFactory.instantiate(conf).newController(); + GetClusterIdResponse resp = + stub.getClusterId(rpcController, GetClusterIdRequest.getDefaultInstance()); + return resp.getClusterId(); + } catch (IOException e) { + LOG.warn("Error fetching cluster ID from master: {}", sname, e); + } catch (ServiceException e) { + // This is probably a standby master, can be ignored. + LOG.debug("Error fetching cluster ID from server: {}" , sname, e); + } + } + // If it comes to this point, no active master could be found. + throw new MasterNotRunningException(String.format( + "No active master found. Probed masters: %s", conf.get(CONF_KEY, DEFAULT_HOST_PORT))); + } + + @Override + public CompletableFuture getClusterId() { + CompletableFuture result = new CompletableFuture<>(); + CompletableFuture.runAsync(() -> { + try { + result.complete(this.getClusterIdHelper()); + } catch (Exception e) { + result.completeExceptionally(e); + } + }); + return result; + } + + /** + * Blocking RPC to get the active master address from the parsed list of master servers. + */ + private ServerName getMasterAddressHelper() throws MasterNotRunningException { + for (ServerName sname: masterServers) { + try { + MasterService.BlockingInterface stub = getMasterStub(sname); + HBaseRpcController rpcController = RpcControllerFactory.instantiate(conf).newController(); + IsActiveResponse resp = stub.isActive(rpcController, IsActiveRequest.getDefaultInstance()); + if (resp.getIsMasterActive()) { + return ServerName.valueOf(sname.getHostname(), sname.getPort(), resp.getStartCode()); + } + } catch (Exception e) { + + } + } + throw new MasterNotRunningException(String.format("No active master found. Probed masters: %s", + conf.get(CONF_KEY, DEFAULT_HOST_PORT))); + } + + /** + * @return the active master among the configured master addresses in 'masterHortPorts'. + */ + @Override + public CompletableFuture getMasterAddress() { + CompletableFuture result = new CompletableFuture<>(); + CompletableFuture.runAsync(() -> { + try { + result.complete(this.getMasterAddressHelper()); + } catch (Exception e) { + result.completeExceptionally(e); + } + }); + return result; + } + + @Override + public void close() { + if (rpcClient != null) { + rpcClient.close(); + } + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKAsyncRegistry.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKAsyncRegistry.java index 36fa6bba7544..08cff2157e3c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKAsyncRegistry.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKAsyncRegistry.java @@ -210,11 +210,6 @@ public CompletableFuture getMetaRegionLocation() { return future; } - @Override - public CompletableFuture getCurrentNrHRS() { - return zk.exists(znodePaths.rsZNode).thenApply(s -> s != null ? s.getNumChildren() : 0); - } - private static ZooKeeperProtos.Master getMasterProto(byte[] data) throws IOException { if (data == null || data.length == 0) { return null; @@ -237,12 +232,6 @@ public CompletableFuture getMasterAddress() { }); } - @Override - public CompletableFuture getMasterInfoPort() { - return getAndConvert(znodePaths.masterAddressZNode, ZKAsyncRegistry::getMasterProto) - .thenApply(proto -> proto != null ? proto.getInfoPort() : 0); - } - @Override public void close() { zk.close(); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java index 81082174bc0a..6fe749bd08d8 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java @@ -3188,6 +3188,7 @@ public static org.apache.hadoop.hbase.client.RegionInfo toRegionInfo(final HBase } public static HBaseProtos.RegionLocation toRegionLocation(HRegionLocation loc) { + if (loc == null) return null; HBaseProtos.RegionLocation.Builder builder = HBaseProtos.RegionLocation.newBuilder(); builder.setRegionInfo(toRegionInfo(loc.getRegion())); if (loc.getServerName() != null) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZNodePaths.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZNodePaths.java index c5e510fe4b9c..5b3144dcc7d3 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZNodePaths.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZNodePaths.java @@ -182,6 +182,18 @@ public String getZNodeForReplica(int replicaId) { .orElseGet(() -> metaReplicaZNodes.get(DEFAULT_REPLICA_ID) + "-" + replicaId); } + /** + * Parses the meta replicaId from the passed path. + * @param path the name of the full path which includes baseZNode. + * @return replicaId + */ + public int getMetaReplicaIdFromPath(String path) { + // Extract the znode from path. The prefix is of the following format. + // baseZNode + PATH_SEPARATOR. + int prefixLen = baseZNode.length() + 1; + return getMetaReplicaIdFromZnode(path.substring(prefixLen)); + } + /** * Parse the meta replicaId from the passed znode * @param znode the name of the znode, does not include baseZNode diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/DoNothingAsyncRegistry.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/DoNothingAsyncRegistry.java index 66330687a26a..f3234b4c8072 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/DoNothingAsyncRegistry.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/DoNothingAsyncRegistry.java @@ -43,21 +43,11 @@ public CompletableFuture getClusterId() { return CompletableFuture.completedFuture(null); } - @Override - public CompletableFuture getCurrentNrHRS() { - return CompletableFuture.completedFuture(0); - } - @Override public CompletableFuture getMasterAddress() { return CompletableFuture.completedFuture(null); } - @Override - public CompletableFuture getMasterInfoPort() { - return CompletableFuture.completedFuture(0); - } - @Override public void close() { } diff --git a/hbase-protocol-shaded/src/main/protobuf/Master.proto b/hbase-protocol-shaded/src/main/protobuf/Master.proto index fee9ab8419d1..b8187f6c9487 100644 --- a/hbase-protocol-shaded/src/main/protobuf/Master.proto +++ b/hbase-protocol-shaded/src/main/protobuf/Master.proto @@ -585,6 +585,36 @@ message GetProceduresResponse { repeated Procedure procedure = 1; } +message IsActiveRequest { +} + +message IsActiveResponse { + required bool is_master_active = 1; + required uint64 start_code = 2; +} + +message GetMetaLocationsRequest { +} + +message GetMetaLocationsResponse { + repeated RegionLocation locations = 1; +} + +message GetPortInfoRequest { +} + +message GetPortInfoResponse { + required uint32 info_port = 1; + required uint32 master_port = 2; +} + +message GetClusterIdRequest { +} + +message GetClusterIdResponse { + required string cluster_id = 1; +} + message GetLocksRequest { } @@ -707,6 +737,19 @@ service MasterService { rpc GetClusterStatus(GetClusterStatusRequest) returns(GetClusterStatusResponse); + /** Returns whether this master is active or not. Served on both active/standby masters.*/ + rpc IsActive(IsActiveRequest) returns(IsActiveResponse); + + /** Used by client to get the location of meta replica(s). Served on both active/standby masters.*/ + rpc GetMetaLocations(GetMetaLocationsRequest) + returns(GetMetaLocationsResponse); + + /** Returns the ClusterId UUID for this cluster */ + rpc GetClusterId(GetClusterIdRequest) returns(GetClusterIdResponse); + + /** Returns the port numbers used by HMaster instances. Served on both active/standby masters.*/ + rpc GetPortInfo(GetPortInfoRequest) returns(GetPortInfoResponse); + /** return true if master is available */ rpc IsMasterRunning(IsMasterRunningRequest) returns(IsMasterRunningResponse); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index bb3c12c0b88c..11fe3030d4b9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -355,6 +355,9 @@ public void run() { // manager of assignment nodes in zookeeper private AssignmentManager assignmentManager; + // Cache of meta locations indexed by replicas + private MetaRegionLocationCache metaRegionLocationCache; + // manager of replication private ReplicationPeerManager replicationPeerManager; @@ -518,6 +521,8 @@ public HMaster(final Configuration conf) maintenanceMode = false; } + metaRegionLocationCache = new MetaRegionLocationCache(this.zooKeeper); + this.rsFatals = new MemoryBoundedLogMessageBuffer( conf.getLong("hbase.master.buffer.for.rs.fatals", 1 * 1024 * 1024)); LOG.info("hbase.rootdir=" + getRootDir() + @@ -3851,4 +3856,6 @@ public Map getWalGroupsReplicationStatus() { public HbckChore getHbckChore() { return this.hbckChore; } + + public MetaRegionLocationCache getMetaRegionLocationCache() { return this.metaRegionLocationCache; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java index 06a99fa5432d..3c284fa546ce 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.ClusterMetricsBuilder; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.MetaRegionsNotAvailableException; import org.apache.hadoop.hbase.MetaTableAccessor; import org.apache.hadoop.hbase.NamespaceDescriptor; import org.apache.hadoop.hbase.Server; @@ -184,14 +185,20 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FixMetaRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FixMetaResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterIdRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterIdResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMetaLocationsRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMetaLocationsResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetPortInfoRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetPortInfoResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProceduresRequest; @@ -205,6 +212,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableStateRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableStateResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.HbckService; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsActiveRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsActiveResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledRequest; @@ -977,6 +986,53 @@ public GetClusterStatusResponse getClusterStatus(RpcController controller, return response.build(); } + @Override + public GetMetaLocationsResponse getMetaLocations( + RpcController controller, GetMetaLocationsRequest req) throws ServiceException { + GetMetaLocationsResponse.Builder response = GetMetaLocationsResponse.newBuilder(); + try { + // We skip the master init check since this RPC is served on both the active and standby + // masters as long as their cache is populated. + response.addAllLocations(master.getMetaRegionLocationCache().getCachedMetaRegionLocations()); + } catch (MetaRegionsNotAvailableException e) { + throw new ServiceException(e); + } + return response.build(); + } + + @Override + public IsActiveResponse isActive(RpcController controller, IsActiveRequest req) + throws ServiceException { + IsActiveResponse.Builder response = IsActiveResponse.newBuilder(); + response.setIsMasterActive(master.isActiveMaster()); + response.setStartCode(master.getStartcode()); + return response.build(); + } + + @Override + public GetPortInfoResponse getPortInfo(RpcController controller, GetPortInfoRequest req) + throws ServiceException { + GetPortInfoResponse.Builder response = GetPortInfoResponse.newBuilder(); + response.setMasterPort( + master.getConfiguration().getInt(HConstants.MASTER_PORT, HConstants.DEFAULT_MASTER_PORT)); + response.setInfoPort(master.getConfiguration().getInt(HConstants.MASTER_INFO_PORT, + HConstants.DEFAULT_MASTER_INFOPORT)); + return response.build(); + } + + @Override + public GetClusterIdResponse getClusterId(RpcController controller, GetClusterIdRequest req) + throws ServiceException { + GetClusterIdResponse.Builder response = GetClusterIdResponse.newBuilder(); + try { + master.checkInitialized(); + response.setClusterId(master.getClusterId()); + } catch (IOException e) { + throw new ServiceException(e); + } + return response.build(); + } + /** * List the currently available/stored snapshots. Any in-progress snapshots are ignored */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MetaRegionLocationCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MetaRegionLocationCache.java new file mode 100644 index 000000000000..db55f66acb4f --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MetaRegionLocationCache.java @@ -0,0 +1,185 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.master; + +import org.apache.hadoop.hbase.MetaRegionsNotAvailableException; +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; + +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.types.CopyOnWriteArrayMap; +import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; +import org.apache.hadoop.hbase.zookeeper.ZKListener; +import org.apache.hadoop.hbase.zookeeper.ZKWatcher; +import org.apache.hadoop.hbase.zookeeper.ZNodePaths; +import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.zookeeper.KeeperException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ConcurrentNavigableMap; + +/** + * A cache of meta region location metadata. This cache is used to serve 'GetMetaLocations' RPCs + * from clients. Registers a listener on ZK to track changes to the meta table znodes. Clients + * are expected to retry if the meta information is stale. This class is thread-safe. + */ +@InterfaceAudience.Private +public class MetaRegionLocationCache extends ZKListener { + + private static final Logger LOG = LoggerFactory.getLogger(MetaRegionLocationCache.class); + + // Maximum number of times we retry when ZK operation times out. Should this be configurable? + private static final int MAX_ZK_META_FETCH_RETRIES = 10; + + private ZKWatcher watcher; + // Cached meta region locations indexed by replica ID. + // CopyOnWriteArrayMap ensures synchronization during updates and a consistent snapshot during + // client requests. Even though CopyOnWriteArrayMap copies the data structure for every write, + // that should be OK since the size of the list is often small and mutations are not too often + // and we do not need to block client requests while mutations are in progress. + CopyOnWriteArrayMap cachedMetaLocations; + + private enum ZNodeOpType { + INIT, + CREATED, + CHANGED, + DELETED + }; + + public MetaRegionLocationCache(ZKWatcher zkWatcher) { + super(zkWatcher); + watcher = zkWatcher; + cachedMetaLocations = new CopyOnWriteArrayMap<>(); + watcher.registerListener(this); + // Populate the initial snapshot of data from meta znodes. + // This is needed because stand-by masters can potentially start after the initial znode + // creation. + populateInitialMetaLocations(); + } + + private void populateInitialMetaLocations() { + int retries = 0; + while (retries++ < MAX_ZK_META_FETCH_RETRIES) { + try { + List znodes = watcher.getMetaReplicaNodes(); + for (String znode: znodes) { + String path = ZNodePaths.joinZNode(watcher.getZNodePaths().baseZNode, znode); + updateMetaLocation(path, ZNodeOpType.INIT); + } + break; + } catch (KeeperException.OperationTimeoutException e) { + LOG.warn("Timed out connecting to ZK cluster", e); + + } catch (KeeperException e) { + LOG.warn("Error populating initial meta locations", e); + break; + } + } + } + + private void updateMetaLocation(String path, ZNodeOpType opType) { + if (!isValidMetaZNode(path)) { + return; + } + LOG.info("Meta znode for path {}: {}", path, opType.name()); + int replicaId = watcher.getZNodePaths().getMetaReplicaIdFromPath(path); + if (opType == ZNodeOpType.DELETED) { + cachedMetaLocations.remove(replicaId); + return; + } + RegionState state = null; + int retries = 0; + while (retries++ < MAX_ZK_META_FETCH_RETRIES) { + try { + state = MetaTableLocator.getMetaRegionState(watcher, replicaId); + break; + } catch (KeeperException.OperationTimeoutException oe) { + // LOG and retry. + LOG.warn("Timed out fetching meta location information for path {}", path, oe); + } catch (KeeperException e) { + LOG.error("Error getting meta location for path {}", path, e); + break; + } + } + if (state == null) { + cachedMetaLocations.put(replicaId, null); + return; + } + cachedMetaLocations.put( + replicaId, new HRegionLocation(state.getRegion(), state.getServerName())); + } + + /** + * Converts the current cache snapshot into a GetMetaLocations() RPC return payload. + * @return Protobuf serialized list of cached meta HRegionLocations + * @throws MetaRegionsNotAvailableException if the cache is not populated. + */ + public List getCachedMetaRegionLocations() + throws MetaRegionsNotAvailableException { + ConcurrentNavigableMap snapshot = + cachedMetaLocations.tailMap(cachedMetaLocations.firstKey(), true); + if (snapshot == null || snapshot.isEmpty()) { + // This could be possible if the master has not successfully initialized yet or meta region + // is stuck in some weird state. + throw new MetaRegionsNotAvailableException("Meta cache is empty"); + } + List result = new ArrayList<>(); + // Handle missing replicas, if any? + snapshot.values().stream().forEach( + location -> result.add(ProtobufUtil.toRegionLocation(location))); + return result; + } + + /** + * Helper to check if the given 'path' corresponds to a meta znode. This listener is only + * interested in changes to meta znodes. + */ + private boolean isValidMetaZNode(String path) { + return watcher.getZNodePaths().isAnyMetaReplicaZNode(path); + } + + /** + * Test helper to invalidate cached metadata for a given meta replica ID. This is done + * synchronously with the meta region moves in tests to avoid any flaky tests. + */ + @VisibleForTesting + public void invalidateMetaReplica(int replicaId) { + String path = watcher.getZNodePaths().getZNodeForReplica(replicaId); + nodeDataChanged(path); + } + + @Override + public void nodeCreated(String path) { + updateMetaLocation(path, ZNodeOpType.CREATED); + } + + @Override + public void nodeDeleted(String path) { + updateMetaLocation(path, ZNodeOpType.DELETED); + } + + @Override + public void nodeDataChanged(String path) { + updateMetaLocation(path, ZNodeOpType.CHANGED); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index a50ac11db6eb..034c615bbf22 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -213,6 +213,10 @@ public class HBaseTestingUtility extends HBaseZKTestingUtility { private volatile AsyncClusterConnection asyncConnection; + // Tracks any other connections created with custom client config. Used for testing clients with custom + // configurations. Tracked here so that they can be cleaned up on close() / restart. + private List customConnections = Collections.synchronizedList(new ArrayList<>()); + /** Filesystem URI used for map-reduce mini-cluster setup */ private static String FS_URI; @@ -1252,6 +1256,10 @@ public void restartHBaseCluster(StartMiniClusterOption option) this.asyncConnection.close(); this.asyncConnection = null; } + for (AsyncClusterConnection connection: customConnections) { + Closeables.close(connection, true); + } + customConnections.clear(); this.hbaseCluster = new MiniHBaseCluster(this.conf, option.getNumMasters(), option.getNumRegionServers(), option.getRsPorts(), option.getMasterClass(), option.getRsClass()); @@ -3067,6 +3075,19 @@ public Connection getConnection() throws IOException { return this.asyncConnection.toConnection(); } + /** + * Creates a new Connection instance for this mini cluster with a given input config. Use it wisely since + * connection creation is expensive. For all practical purposes @link(getConnection()) should be good enough. This + * helper should only used if one wants to test a custom client side configuration that differs from the conf used to + * spawn the mini-cluster. + */ + public AsyncClusterConnection getCustomConnection(Configuration conf) throws IOException { + User user = UserProvider.instantiate(conf).getCurrent(); + AsyncClusterConnection connection = ClusterConnectionFactory.createAsyncClusterConnection(conf, null, user); + customConnections.add(connection); + return connection; + } + public AsyncClusterConnection getAsyncConnection() throws IOException { if (this.asyncConnection == null) { initConnection(); @@ -3077,6 +3098,9 @@ public AsyncClusterConnection getAsyncConnection() throws IOException { public void closeConnection() throws IOException { Closeables.close(hbaseAdmin, true); Closeables.close(asyncConnection, true); + for (AsyncClusterConnection connection: customConnections) { + Closeables.close(connection, true); + } this.hbaseAdmin = null; this.asyncConnection = null; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncRegistry.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncRegistry.java index e9ae25d2eaf0..eccb8ef42360 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncRegistry.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncRegistry.java @@ -39,21 +39,11 @@ public CompletableFuture getClusterId() { return null; } - @Override - public CompletableFuture getCurrentNrHRS() { - return null; - } - @Override public CompletableFuture getMasterAddress() { return null; } - @Override - public CompletableFuture getMasterInfoPort() { - return null; - } - @Override public void close() { } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/RegionReplicaTestHelper.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/RegionReplicaTestHelper.java index 57ff7b7e9c56..877f167d98d5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/RegionReplicaTestHelper.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/RegionReplicaTestHelper.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; +import org.apache.hadoop.hbase.master.MetaRegionLocationCache; final class RegionReplicaTestHelper { @@ -89,6 +90,13 @@ static ServerName moveRegion(HBaseTestingUtility util, HRegionLocation currentLo .map(t -> t.getRegionServer().getServerName()).filter(sn -> !sn.equals(serverName)).findAny() .get(); util.getAdmin().move(regionInfo.getEncodedNameAsBytes(), newServerName); + if (regionInfo.isMetaRegion()) { + // Invalidate the meta cache forcefully to avoid test races. Otherwise there might be a + // delay in master receiving the change event and the cache could be stale in that window. + MetaRegionLocationCache metaCache = + util.getMiniHBaseCluster().getMaster().getMetaRegionLocationCache(); + metaCache.invalidateMetaReplica(replicaId); + } util.waitFor(30000, new ExplainingPredicate() { @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocator.java index 1f0d40b5f4f7..8a4fbca68da8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocator.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocator.java @@ -20,6 +20,7 @@ import static org.apache.hadoop.hbase.client.RegionReplicaTestHelper.testLocator; import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; @@ -45,43 +46,56 @@ public class TestAsyncMetaRegionLocator { private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); - private static AsyncRegistry REGISTRY; - - private static AsyncMetaRegionLocator LOCATOR; - @BeforeClass public static void setUp() throws Exception { TEST_UTIL.getConfiguration().set(BaseLoadBalancer.TABLES_ON_MASTER, "none"); TEST_UTIL.getConfiguration().setInt(HConstants.META_REPLICAS_NUM, 3); TEST_UTIL.startMiniCluster(3); - REGISTRY = AsyncRegistryFactory.getRegistry(TEST_UTIL.getConfiguration()); - RegionReplicaTestHelper - .waitUntilAllMetaReplicasHavingRegionLocation(TEST_UTIL.getConfiguration(), REGISTRY, 3); + try (AsyncRegistry registry = AsyncRegistryFactory.getRegistry(TEST_UTIL.getConfiguration())) { + RegionReplicaTestHelper + .waitUntilAllMetaReplicasHavingRegionLocation(TEST_UTIL.getConfiguration(), registry, 3); + } TEST_UTIL.getAdmin().balancerSwitch(false, true); - LOCATOR = new AsyncMetaRegionLocator(REGISTRY); } @AfterClass public static void tearDown() throws Exception { - IOUtils.closeQuietly(REGISTRY); TEST_UTIL.shutdownMiniCluster(); } - @Test - public void test() throws Exception { + private void verifyLocator(AsyncMetaRegionLocator locator) throws Exception { testLocator(TEST_UTIL, TableName.META_TABLE_NAME, new Locator() { @Override public void updateCachedLocationOnError(HRegionLocation loc, Throwable error) throws Exception { - LOCATOR.updateCachedLocationOnError(loc, error); + locator.updateCachedLocationOnError(loc, error); } @Override public RegionLocations getRegionLocations(TableName tableName, int replicaId, boolean reload) throws Exception { - return LOCATOR.getRegionLocations(replicaId, reload).get(); + return locator.getRegionLocations(replicaId, reload).get(); } }); } + + @Test + public void testZkAsyncRegistry() throws Exception { + try (ZKAsyncRegistry registry = new ZKAsyncRegistry(TEST_UTIL.getConfiguration())) { + verifyLocator(new AsyncMetaRegionLocator(registry)); + } + } + + @Test + public void testHMasterAsyncRegistry() throws Exception { + Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); + String masterHostName = + TEST_UTIL.getMiniHBaseCluster().getMaster().getServerName().getHostname(); + int masterPort = TEST_UTIL.getMiniHBaseCluster().getMaster().getServerName().getPort(); + conf.set(HMasterAsyncRegistry.CONF_KEY, masterHostName + ":" + masterPort); + try (HMasterAsyncRegistry registry = new HMasterAsyncRegistry((conf))) { + verifyLocator(new AsyncMetaRegionLocator(registry)); + } + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHMasterAsyncRegistry.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHMasterAsyncRegistry.java new file mode 100644 index 000000000000..78b1ca16196c --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHMasterAsyncRegistry.java @@ -0,0 +1,140 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.com.google.common.base.Joiner; +import org.apache.hbase.thirdparty.com.google.common.net.HostAndPort; + +import java.util.ArrayList; +import java.util.List; + +import static junit.framework.TestCase.assertTrue; +import static org.apache.hadoop.hbase.HConstants.META_REPLICAS_NUM; +import static org.junit.Assert.assertEquals; + +@Category({ LargeTests.class, ClientTests.class }) +public class TestHMasterAsyncRegistry { + private static final Logger LOG = LoggerFactory.getLogger(TestHMasterAsyncRegistry.class); + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static final TableName TEST_TABLE = TableName.valueOf("foo"); + private static final byte[] COL_FAM = Bytes.toBytes("cf"); + private static final byte[] QUALIFIER = Bytes.toBytes("col"); + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestHMasterAsyncRegistry.class); + + // Automatically cleaned up on cluster shutdown. + private static AsyncClusterConnection customConnection; + private static HMasterAsyncRegistry REGISTRY; + + @BeforeClass + public static void setUp() throws Exception { + TEST_UTIL.getConfiguration().setInt(META_REPLICAS_NUM, 3); + TEST_UTIL.startMiniCluster(3); + Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); + List hostAndPorts = new ArrayList<>(); + String masterHostName = + TEST_UTIL.getMiniHBaseCluster().getMaster().getServerName().getHostname(); + int masterPort = TEST_UTIL.getMiniHBaseCluster().getMaster().getServerName().getPort(); + // Add some invalid hostAndPort configs. The implementation should be resilient enough to + // skip those and probe for the only working master. + // 1. Valid hostname but invalid port + hostAndPorts.add(HostAndPort.fromParts(masterHostName, 10001)); + // 2. Invalid hostname but valid port + hostAndPorts.add(HostAndPort.fromParts("foo.bar", masterPort)); + // 3. Invalid hostname and port. + hostAndPorts.add(HostAndPort.fromParts("foo.bar", 10003)); + // 4. Finally valid host:port + hostAndPorts.add(HostAndPort.fromParts(masterHostName, masterPort)); + final String config = Joiner.on(",").join(hostAndPorts); + conf.set(HMasterAsyncRegistry.CONF_KEY, config); + conf.set(AsyncRegistryFactory.REGISTRY_IMPL_CONF_KEY, HMasterAsyncRegistry.class.getName()); + // make sure that we do not depend on this config when getting locations for meta replicas, see + // HBASE-21658. + conf.setInt(META_REPLICAS_NUM, 1); + REGISTRY = new HMasterAsyncRegistry(conf); + customConnection = TEST_UTIL.getCustomConnection(conf); + } + + @AfterClass + public static void tearDown() throws Exception { + IOUtils.closeQuietly(REGISTRY); + TEST_UTIL.shutdownMiniCluster(); + } + + @Test + public void testRegistryImpl() throws Exception { + HMaster master = TEST_UTIL.getMiniHBaseCluster().getMaster(); + assertEquals(REGISTRY.getClusterId().get(), master.getClusterId()); + assertEquals(REGISTRY.getMasterAddress().get(), master.getServerName()); + } + + /** + * Tests basic create, put, scan operations using the connection. + */ + @Test + public void testCustomConnectionBasicOps() throws Exception { + // Verify that the right registry is in use. + assertTrue(customConnection instanceof AsyncClusterConnectionImpl); + assertTrue(((AsyncClusterConnectionImpl) customConnection).getRegistry() + instanceof HMasterAsyncRegistry); + Connection connection = customConnection.toConnection(); + // Create a test table. + Admin admin = connection.getAdmin(); + TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(TEST_TABLE). + setColumnFamily(ColumnFamilyDescriptorBuilder.of(COL_FAM)); + admin.createTable(builder.build()); + try (Table table = connection.getTable(TEST_TABLE)){ + // Insert one row each region + int insertNum = 10; + for (int i = 0; i < 10; i++) { + Put put = new Put(Bytes.toBytes("row" + String.format("%03d", i))); + put.addColumn(COL_FAM, QUALIFIER, QUALIFIER); + table.put(put); + } + // Verify the row count. + try (ResultScanner scanner = table.getScanner(new Scan())) { + int count = 0; + for (Result r : scanner) { + Assert.assertTrue(!r.isEmpty()); + count++; + } + assertEquals(insertNum, count); + } + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestZKAsyncRegistry.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestZKAsyncRegistry.java index 5a72daea2030..16f43bd9e00b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestZKAsyncRegistry.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestZKAsyncRegistry.java @@ -84,11 +84,8 @@ public void test() throws InterruptedException, ExecutionException, IOException String expectedClusterId = TEST_UTIL.getHBaseCluster().getMaster().getClusterId(); assertEquals("Expected " + expectedClusterId + ", found=" + clusterId, expectedClusterId, clusterId); - assertEquals(TEST_UTIL.getHBaseCluster().getClusterMetrics().getLiveServerMetrics().size(), - REGISTRY.getCurrentNrHRS().get().intValue()); assertEquals(TEST_UTIL.getHBaseCluster().getMaster().getServerName(), REGISTRY.getMasterAddress().get()); - assertEquals(-1, REGISTRY.getMasterInfoPort().get().intValue()); RegionReplicaTestHelper .waitUntilAllMetaReplicasHavingRegionLocation(TEST_UTIL.getConfiguration(), REGISTRY, 3); RegionLocations locs = REGISTRY.getMetaRegionLocation().get(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestHMasterAsyncRegistryRPCs.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestHMasterAsyncRegistryRPCs.java new file mode 100644 index 000000000000..549d370597bd --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestHMasterAsyncRegistryRPCs.java @@ -0,0 +1,214 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.ipc.RpcClient; +import org.apache.hadoop.hbase.ipc.RpcClientFactory; +import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.zookeeper.ZKWatcher; +import org.apache.hadoop.hbase.zookeeper.ZNodePaths; +import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel; +import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterIdRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterIdResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetPortInfoRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetPortInfoResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMetaLocationsRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMetaLocationsResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsActiveRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsActiveResponse; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +@Category({LargeTests.class, MasterTests.class}) +public class TestHMasterAsyncRegistryRPCs { + private static final Logger LOG = LoggerFactory.getLogger(TestHMasterAsyncRegistryRPCs.class); + + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static RpcClient rpcClient; + private static HMaster activeMaster; + private static List standByMasters; + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestHMasterAsyncRegistryRPCs.class); + + @BeforeClass + public static void setUp() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + conf.set(HConstants.MASTER_PORT, "0"); + conf.setStrings(HConstants.ZOOKEEPER_ZNODE_PARENT, "/metacachetest"); + TEST_UTIL.startMiniCluster(2); + activeMaster = TEST_UTIL.getHBaseCluster().getMaster(); + standByMasters = new ArrayList<>(); + // Create a few standby masters. + for (int i = 0; i < 2; i++) { + standByMasters.add(TEST_UTIL.getHBaseCluster().startMaster().getMaster()); + } + rpcClient = RpcClientFactory.createClient(conf, HConstants.CLUSTER_ID_DEFAULT); + } + + @AfterClass + public static void tearDown() throws Exception { + if (rpcClient != null) { + rpcClient.close(); + } + TEST_UTIL.shutdownMiniCluster(); + } + + /** + * Verifies that getMetaReplicaIdFromPath() parses the full meta znode paths correctly. + * @throws IOException + */ + @Test + public void TestGetMetaReplicaIdFromPath() throws IOException { + ZKWatcher zk = TEST_UTIL.getZooKeeperWatcher(); + String metaZnode= + ZNodePaths.joinZNode(zk.getZNodePaths().baseZNode, zk.getZNodePaths().metaZNodePrefix); + assertEquals(0, zk.getZNodePaths().getMetaReplicaIdFromPath(metaZnode)); + for (int i = 1; i < 10; i++) { + assertEquals(i, zk.getZNodePaths().getMetaReplicaIdFromPath(metaZnode + "-" +i)); + } + for (String suffix : new String[]{"foo", "1234", "foo-123", "123-foo-234"}) { + try { + final String input = metaZnode + suffix; + zk.getZNodePaths().getMetaReplicaIdFromZnode(input); + fail("Exception not hit getMetaReplicaIdFromZnode(): " + input); + } catch (NumberFormatException e) { + // Expected + } + } + } + + private void verifyGetCachedMetadataLocations(HMaster master) throws IOException { + try { + ServerName sm = master.getServerName(); + BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(sm, User.getCurrent(), 0); + MasterProtos.MasterService.BlockingInterface stub = + MasterProtos.MasterService.newBlockingStub(channel); + GetMetaLocationsResponse response = stub.getMetaLocations(null, + GetMetaLocationsRequest.getDefaultInstance()); + assertEquals(1, response.getLocationsCount()); + HBaseProtos.RegionLocation location = response.getLocations(0); + assertEquals(sm.getHostname(), location.getServerName().getHostName()); + } catch (ServiceException e) { + LOG.error( + "Error in GetCachedMetadataLocations. Active master: {}", master.isActiveMaster(), e); + fail("Error calling GetCachedMetadataLocations()"); + } + } + + /** + * Verifies that both active and standby masters + * @throws IOException + */ + @Test + public void TestGetCachedMetadataLocations() throws IOException { + // Verify that the active and standby HMasters start correctly. + assertTrue(activeMaster.serviceStarted); + assertTrue(activeMaster.isActiveMaster()); + verifyGetCachedMetadataLocations(activeMaster); + for (HMaster standByMaster: standByMasters) { + assertTrue(!standByMaster.serviceStarted); + assertTrue(!standByMaster.isActiveMaster()); + verifyGetCachedMetadataLocations(standByMaster); + } + } + + private void verifyIsMasterActive(HMaster master, boolean expectedResult) throws Exception { + ServerName sm = master.getServerName(); + BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(sm, User.getCurrent(), 0); + MasterProtos.MasterService.BlockingInterface stub = + MasterProtos.MasterService.newBlockingStub(channel); + IsActiveResponse response = stub.isActive(null, IsActiveRequest.getDefaultInstance()); + assertEquals(expectedResult, response.getIsMasterActive()); + } + + @Test + public void TestIsActiveRPC() throws Exception { + verifyIsMasterActive(activeMaster, true); + for (HMaster master: standByMasters) verifyIsMasterActive(master, false); + } + + private void verifyMasterPorts(HMaster master) throws Exception { + ServerName sm = master.getServerName(); + BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(sm, User.getCurrent(), 0); + MasterProtos.MasterService.BlockingInterface stub = + MasterProtos.MasterService.newBlockingStub(channel); + GetPortInfoResponse response = stub.getPortInfo(null, GetPortInfoRequest.getDefaultInstance()); + Configuration conf = master.getConfiguration(); + assertEquals(response.getMasterPort(), + conf.getInt(HConstants.MASTER_PORT, HConstants.DEFAULT_MASTER_PORT)); + assertEquals(response.getInfoPort(), + conf.getInt(HConstants.MASTER_INFO_PORT, HConstants.DEFAULT_MASTER_INFOPORT)); + } + + @Test + public void TestGetPortInfoRPC() throws Exception { + verifyMasterPorts(activeMaster); + for (HMaster master: standByMasters) verifyMasterPorts(master); + } + + public String getClusterId(HMaster master) throws IOException, ServiceException { + ServerName sm = master.getServerName(); + BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(sm, User.getCurrent(), 0); + MasterProtos.MasterService.BlockingInterface stub = + MasterProtos.MasterService.newBlockingStub(channel); + GetClusterIdResponse response = + stub.getClusterId(null, GetClusterIdRequest.getDefaultInstance()); + return response.getClusterId(); + } + + @Test + public void TestClusterIdRPC() throws Exception { + assertEquals(activeMaster.getClusterId(), getClusterId(activeMaster)); + try { + assertTrue(standByMasters.size() > 0); + getClusterId(standByMasters.get(0)); + fail("No exception thrown while fetching ClusterId for standby master: " + + standByMasters.get(0).getServerName().toString()); + } catch (ServiceException e) { + // Expected. + assertTrue(e.getMessage().contains("Server is not running yet")); + } + } +}