Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
package org.apache.hadoop.hbase;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ThreadFactory;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
Expand Down Expand Up @@ -203,19 +203,19 @@ private void updateMetaLocation(String path, ZNodeOpType opType) {
* @return Optional list of HRegionLocations for meta replica(s), null if the cache is empty.
*
*/
public Optional<List<HRegionLocation>> getMetaRegionLocations() {
public List<HRegionLocation> getMetaRegionLocations() {
ConcurrentNavigableMap<Integer, HRegionLocation> snapshot =
cachedMetaLocations.tailMap(cachedMetaLocations.firstKey());
if (snapshot.isEmpty()) {
// This could be possible if the master has not successfully initialized yet or meta region
// is stuck in some weird state.
return Optional.empty();
return Collections.emptyList();
}
List<HRegionLocation> result = new ArrayList<>();
// Explicitly iterate instead of new ArrayList<>(snapshot.values()) because the underlying
// ArrayValueCollection does not implement toArray().
snapshot.values().forEach(location -> result.add(location));
return Optional.of(result);
return result;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,9 @@
package org.apache.hadoop.hbase.client;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.security.PrivilegedExceptionAction;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.hadoop.hbase.util.ReflectionUtils;
Expand Down Expand Up @@ -71,15 +69,13 @@ public static AsyncClusterConnection createAsyncClusterConnection(Configuration
}

/**
* Create a new {@link AsyncClusterConnection} instance for a region server.
* Create a new {@link AsyncClusterConnection} instance to be used at server side where we have a
* {@link ConnectionRegistryEndpoint}.
*/
public static AsyncClusterConnection createAsyncClusterConnection(HRegionServer regionServer)
public static AsyncClusterConnection createAsyncClusterConnection(
ConnectionRegistryEndpoint endpoint, Configuration conf, SocketAddress localAddress, User user)
throws IOException {
RegionServerRegistry registry = new RegionServerRegistry(regionServer);
Configuration conf = regionServer.getConfiguration();
InetSocketAddress localAddress =
new InetSocketAddress(regionServer.getRSRpcServices().getSocketAddress().getAddress(), 0);
User user = regionServer.getUserProvider().getCurrent();
ShortCircuitConnectionRegistry registry = new ShortCircuitConnectionRegistry(endpoint);
return createAsyncClusterConnection(conf, registry, localAddress, user);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;

import java.util.List;
import java.util.Optional;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.ServerName;
import org.apache.yetus.audience.InterfaceAudience;

/**
* Define the necessary method for carrying {@code ClientMetaService}.
*/
@InterfaceAudience.Private
public interface ConnectionRegistryEndpoint {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't you just do, this doesn't work for any reason?

public interface ServerConnectionRegistry extends ConnectionRegistry {}

HRegionServer implements ServerConnectionRegistry {

  setupConnection(this);
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think having two interfaces that look very much alike is only adding to the confusion of the code readers.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a prior work for undoing HMaster extends HRegionServer. If HMaster does not extends HRegionServer, then it can not use RegionServerRegistry any more. So I agree with you that in the current code it is not necessary to do this refactoring, but I think it is also no big harm? If we could land this first, the final patch for HBASE-25288 could be smaller. But anyway, I respect your decision, if you do not think this is good to land, I’m OK that we just close this one and include this patch in the final patch for HBASE-25288.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that we should land this (thanks for your other patch btw). All I'm saying is perhaps we could do this refactor without creating another new interface ConnectionRegistryEndpoint that looks exactly like ConnectionRegistry? (like I mentioned in my first comment, pasting again below). This seems simpler than your patch, right, you can use 'this' instance as the registry object in both HMaster and HRegionServer, don't have to construct something explicit? or did I miss something?

public interface ServerConnectionRegistry extends ConnectionRegistry {}

HMaster implements ServerConnectionRegistry {

 <Implement master based registry server side>
}

HRegionServer implements ServerConnectionRegistry {

<Implement RegionServerRegistry server side here>
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, got it. Let me have a try. Was wondering that returning a CompletableFuture at server side while there is no actual asynchronous in code is a bit strange, but seems no big real harm.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I know why I introduce a special interface then, we need to use these methods in RSRpcServices and MasterRpcServices too, to return these information to connection registry running at client side.

It will be a bit strange that, we get a CompletableFuture which is not async actually...

WDYT? You can take a look at the code in RSRpcService.

https://github.com/apache/hbase/blob/master/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java#L4092


/**
* Get cluster id.
*/
String getClusterId();

/**
* Get active master address.
*/
Optional<ServerName> getActiveMaster();

/**
* Get backup masters address.
*/
List<ServerName> getBackupMasters();

/**
* Get all the region servers address.
*/
List<ServerName> getRegionServers();

/**
* Get the location of meta regions.
*/
List<HRegionLocation> getMetaLocations();
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,48 +24,42 @@
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.yetus.audience.InterfaceAudience;

/**
* Connection registry implementation for region server.
* A {@link ConnectionRegistry} implementation used at server side, where we could use the
* {@link ConnectionRegistryEndpoint} directly, without any rpcs.
*/
@InterfaceAudience.Private
public class RegionServerRegistry implements ConnectionRegistry {
class ShortCircuitConnectionRegistry implements ConnectionRegistry {

private final HRegionServer regionServer;
private final ConnectionRegistryEndpoint endpoint;

public RegionServerRegistry(HRegionServer regionServer) {
this.regionServer = regionServer;
public ShortCircuitConnectionRegistry(ConnectionRegistryEndpoint endpoint) {
this.endpoint = endpoint;
}

@Override
public CompletableFuture<RegionLocations> getMetaRegionLocations() {
CompletableFuture<RegionLocations> future = new CompletableFuture<>();
Optional<List<HRegionLocation>> locs =
regionServer.getMetaRegionLocationCache().getMetaRegionLocations();
if (locs.isPresent()) {
List<HRegionLocation> list = locs.get();
if (list.isEmpty()) {
future.completeExceptionally(new IOException("no meta location available"));
} else {
future.complete(new RegionLocations(list));
}
} else {
List<HRegionLocation> locs = endpoint.getMetaLocations();
if (locs.isEmpty()) {
future.completeExceptionally(new IOException("no meta location available"));
} else {
future.complete(new RegionLocations(locs));
}
return future;
}

@Override
public CompletableFuture<String> getClusterId() {
return CompletableFuture.completedFuture(regionServer.getClusterId());
return CompletableFuture.completedFuture(endpoint.getClusterId());
}

@Override
public CompletableFuture<ServerName> getActiveMaster() {
CompletableFuture<ServerName> future = new CompletableFuture<>();
Optional<ServerName> activeMaster = regionServer.getActiveMaster();
Optional<ServerName> activeMaster = endpoint.getActiveMaster();
if (activeMaster.isPresent()) {
future.complete(activeMaster.get());
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER;
import static org.apache.hadoop.hbase.util.DNS.UNSAFE_RS_HOSTNAME_KEY;

import com.google.errorprone.annotations.RestrictedApi;
import java.io.IOException;
import java.io.PrintWriter;
import java.lang.management.MemoryType;
Expand Down Expand Up @@ -75,6 +76,7 @@
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.HealthCheckChore;
import org.apache.hadoop.hbase.MetaRegionLocationCache;
import org.apache.hadoop.hbase.MetaTableAccessor;
Expand All @@ -91,6 +93,7 @@
import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.ConnectionRegistryEndpoint;
import org.apache.hadoop.hbase.client.ConnectionUtils;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
Expand Down Expand Up @@ -247,8 +250,9 @@
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
@SuppressWarnings({ "deprecation"})
public class HRegionServer extends Thread implements
RegionServerServices, LastSequenceId, ConfigurationObserver {
public class HRegionServer extends Thread implements RegionServerServices, LastSequenceId,
ConnectionRegistryEndpoint, ConfigurationObserver {

private static final Logger LOG = LoggerFactory.getLogger(HRegionServer.class);

/**
Expand Down Expand Up @@ -887,7 +891,11 @@ public String getClusterId() {
*/
protected final synchronized void setupClusterConnection() throws IOException {
if (asyncClusterConnection == null) {
asyncClusterConnection = ClusterConnectionFactory.createAsyncClusterConnection(this);
InetSocketAddress localAddress =
new InetSocketAddress(rpcServices.getSocketAddress().getAddress(), 0);
User user = userProvider.getCurrent();
asyncClusterConnection =
ClusterConnectionFactory.createAsyncClusterConnection(this, conf, localAddress, user);
}
}

Expand Down Expand Up @@ -3983,23 +3991,29 @@ public long getRetryPauseTime() {
return this.retryPauseTime;
}

@Override
public Optional<ServerName> getActiveMaster() {
return Optional.ofNullable(masterAddressTracker.getMasterAddress());
}

@Override
public List<ServerName> getBackupMasters() {
return masterAddressTracker.getBackupMasters();
}

@Override
public List<ServerName> getRegionServers() {
return regionServerAddressTracker.getRegionServers();
}

public MetaRegionLocationCache getMetaRegionLocationCache() {
return this.metaRegionLocationCache;
@Override
public List<HRegionLocation> getMetaLocations() {
return metaRegionLocationCache.getMetaRegionLocations();
}

public UserProvider getUserProvider() {
return userProvider;
@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*/src/test/.*")
public MetaRegionLocationCache getMetaRegionLocationCache() {
return metaRegionLocationCache;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@
import org.apache.hadoop.hbase.DroppedSnapshotException;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.MultiActionResultTooLarge;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.PrivateCellUtil;
Expand Down Expand Up @@ -4114,10 +4113,8 @@ public GetMastersResponse getMasters(RpcController controller, GetMastersRequest
public GetMetaRegionLocationsResponse getMetaRegionLocations(RpcController controller,
GetMetaRegionLocationsRequest request) throws ServiceException {
GetMetaRegionLocationsResponse.Builder builder = GetMetaRegionLocationsResponse.newBuilder();
Optional<List<HRegionLocation>> metaLocations =
regionServer.getMetaRegionLocationCache().getMetaRegionLocations();
metaLocations.ifPresent(hRegionLocations -> hRegionLocations
.forEach(location -> builder.addMetaLocations(ProtobufUtil.toRegionLocation(location))));
regionServer.getMetaLocations()
.forEach(location -> builder.addMetaLocations(ProtobufUtil.toRegionLocation(location)));
return builder.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,7 @@ public void testMasterAddressParsing() throws IOException {
public void testRegistryRPCs() throws Exception {
Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
HMaster activeMaster = TEST_UTIL.getHBaseCluster().getMaster();
final int size =
activeMaster.getMetaRegionLocationCache().getMetaRegionLocations().get().size();
final int size = activeMaster.getMetaLocations().size();
for (int numHedgedReqs = 1; numHedgedReqs <= size; numHedgedReqs++) {
conf.setInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, numHedgedReqs);
try (MasterRegistry registry = new MasterRegistry(conf)) {
Expand All @@ -124,8 +123,7 @@ public void testRegistryRPCs() throws Exception {
assertEquals(registry.getActiveMaster().get(), activeMaster.getServerName());
List<HRegionLocation> metaLocations =
Arrays.asList(registry.getMetaRegionLocations().get().getRegionLocations());
List<HRegionLocation> actualMetaLocations =
activeMaster.getMetaRegionLocationCache().getMetaRegionLocations().get();
List<HRegionLocation> actualMetaLocations = activeMaster.getMetaLocations();
Collections.sort(metaLocations);
Collections.sort(actualMetaLocations);
assertEquals(actualMetaLocations, metaLocations);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -88,7 +89,7 @@ private List<HRegionLocation> getCurrentMetaLocations(ZKWatcher zk) throws Excep
private void verifyCachedMetaLocations(HMaster master) throws Exception {
// Wait until initial meta locations are loaded.
int retries = 0;
while (!master.getMetaRegionLocationCache().getMetaRegionLocations().isPresent()) {
while (master.getMetaRegionLocationCache().getMetaRegionLocations().isEmpty()) {
Thread.sleep(1000);
if (++retries == 10) {
break;
Expand All @@ -98,15 +99,14 @@ private void verifyCachedMetaLocations(HMaster master) throws Exception {
List<String> metaZnodes = zk.getMetaReplicaNodes();
// Wait till all replicas available.
retries = 0;
while (master.getMetaRegionLocationCache().getMetaRegionLocations().get().size() != metaZnodes
while (master.getMetaRegionLocationCache().getMetaRegionLocations().size() != metaZnodes
.size()) {
Thread.sleep(1000);
if (++retries == 10) {
break;
}
}
List<HRegionLocation> metaHRLs =
master.getMetaRegionLocationCache().getMetaRegionLocations().get();
List<HRegionLocation> metaHRLs = master.getMetaRegionLocationCache().getMetaRegionLocations();
assertFalse(metaHRLs.isEmpty());
assertEquals(metaZnodes.size(), metaHRLs.size());
List<HRegionLocation> actualHRLs = getCurrentMetaLocations(zk);
Expand Down Expand Up @@ -167,7 +167,7 @@ public void testStandByMetaLocations() throws Exception {
try {
MetaRegionLocationCache metaCache = new MetaRegionLocationCache(zkWatcher);
// meta znodes do not exist at this point, cache should be empty.
assertFalse(metaCache.getMetaRegionLocations().isPresent());
assertTrue(metaCache.getMetaRegionLocations().isEmpty());
// Set the meta locations for a random meta replicas, simulating an active hmaster meta
// assignment.
for (int i = 0; i < 3; i++) {
Expand All @@ -177,13 +177,12 @@ public void testStandByMetaLocations() throws Exception {
// Wait until the meta cache is populated.
int iters = 0;
while (iters++ < 10) {
if (metaCache.getMetaRegionLocations().isPresent()
&& metaCache.getMetaRegionLocations().get().size() == 3) {
if (metaCache.getMetaRegionLocations().size() == 3) {
break;
}
Thread.sleep(1000);
}
List<HRegionLocation> metaLocations = metaCache.getMetaRegionLocations().get();
List<HRegionLocation> metaLocations = metaCache.getMetaRegionLocations();
assertEquals(3, metaLocations.size());
for (HRegionLocation location : metaLocations) {
assertEquals(sn, location.getServerName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,7 @@ public void testRegistryRPCs() throws Exception {
assertEquals(registry.getActiveMaster().get(), activeMaster.getServerName());
List<HRegionLocation> metaLocations =
Arrays.asList(registry.getMetaRegionLocations().get().getRegionLocations());
List<HRegionLocation> actualMetaLocations =
activeMaster.getMetaRegionLocationCache().getMetaRegionLocations().get();
List<HRegionLocation> actualMetaLocations = activeMaster.getMetaLocations();
Collections.sort(metaLocations);
Collections.sort(actualMetaLocations);
assertEquals(actualMetaLocations, metaLocations);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,8 @@ private static HBaseRpcController getRpcController() {
*/
@Test public void TestMetaLocations() throws Exception {
HBaseRpcController rpcController = getRpcController();
List<HRegionLocation> metaLocations = TEST_UTIL.getMiniHBaseCluster().getMaster()
.getMetaRegionLocationCache().getMetaRegionLocations().get();
List<HRegionLocation> metaLocations =
TEST_UTIL.getMiniHBaseCluster().getMaster().getMetaLocations();
Collections.sort(metaLocations);
int rpcCount = 0;
for (JVMClusterUtil.MasterThread masterThread:
Expand Down