Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.conf.Configuration;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not used?

import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@
import static org.apache.hadoop.hbase.trace.TraceUtil.tracedFuture;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
import static org.apache.hadoop.hbase.zookeeper.ZKMetadata.removeMetaData;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.hbase.thirdparty.io.netty.util.Timeout;
import org.apache.hbase.thirdparty.io.netty.util.TimerTask;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ClusterId;
Expand All @@ -49,6 +51,7 @@

import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;

/**
* Zookeeper based registry implementation.
Expand All @@ -70,6 +73,11 @@ class ZKConnectionRegistry implements ConnectionRegistry {

private final ZNodePaths znodePaths;

private static final long expectedTimeout = 120000;
private static final int maxAttempts = 5;
private static final long pauseNs = 100000;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These should be configs, not constants.


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove these extra empty lines.


// User not used, but for rpc based registry we need it
ZKConnectionRegistry(Configuration conf, User ignored) {
this.znodePaths = new ZNodePaths(conf);
Expand All @@ -85,23 +93,54 @@ class ZKConnectionRegistry implements ConnectionRegistry {
}
}

public ZNodePaths getZNodePaths() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we need to add this method?

return znodePaths;
}

private interface Converter<T> {
T convert(byte[] data) throws Exception;
}

private <T> CompletableFuture<T> getAndConvert(String path, Converter<T> converter) {
CompletableFuture<T> future = new CompletableFuture<>();
addListener(zk.get(path), (data, error) -> {
if (error != null) {
future.completeExceptionally(error);
return;
}
try {
future.complete(converter.convert(data));
} catch (Exception e) {
future.completeExceptionally(e);
}
});
TimerTask pollingTask = new TimerTask() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not think we need to do this here?

We have timeout in ReadOnlyZKClient, here we just need to schedule retries?

int tries = 0;
long startTime = EnvironmentEdgeManager.currentTime();
long endTime = startTime + expectedTimeout;
long maxPauseTime = expectedTimeout / maxAttempts;

@Override
public void run(Timeout timeout) throws Exception {
if (EnvironmentEdgeManager.currentTime() < endTime) {
addListener(zk.get(path), (data, error) -> {
if (error != null) {
future.completeExceptionally(error);
return;
}
if (data != null && data.length > 0) {
try {
future.complete(converter.convert(data));
} catch (Exception e) {
future.completeExceptionally(e);
}
} else {
// retry again after pauseTime.
long pauseTime =
ConnectionUtils.getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries);
pauseTime = Math.min(pauseTime, maxPauseTime);
AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime,
TimeUnit.MICROSECONDS);
}
});
} else {
future.completeExceptionally(new IOException("Procedure wasn't completed in "
+ "expectedTime:" + expectedTimeout + " ms"));
}
}
};
// Queue the polling task into RETRY_TIMER to poll procedure state asynchronously.
AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS);

return future;
}

Expand Down Expand Up @@ -217,16 +256,43 @@ private void getMetaRegionLocation(CompletableFuture<RegionLocations> future,
public CompletableFuture<RegionLocations> getMetaRegionLocations() {
return tracedFuture(() -> {
CompletableFuture<RegionLocations> future = new CompletableFuture<>();
addListener(
zk.list(znodePaths.baseZNode).thenApply(children -> children.stream()
.filter(c -> this.znodePaths.isMetaZNodePrefix(c)).collect(Collectors.toList())),
(metaReplicaZNodes, error) -> {
if (error != null) {
future.completeExceptionally(error);
return;
TimerTask pollingTask = new TimerTask() {
int tries = 0;
long startTime = EnvironmentEdgeManager.currentTime();
long endTime = startTime + expectedTimeout;
long maxPauseTime = expectedTimeout / maxAttempts;

@Override
public void run(Timeout timeout) throws Exception {
if (EnvironmentEdgeManager.currentTime() < endTime) {
addListener(
zk.list(znodePaths.baseZNode).thenApply(children -> children.stream()
.filter(c -> getZNodePaths().isMetaZNodePrefix(c)).collect(Collectors.toList())),
(metaReplicaZNodes, error) -> {
if (error != null) {
future.completeExceptionally(error);
return;
}
if (metaReplicaZNodes != null && !metaReplicaZNodes.isEmpty()) {
getMetaRegionLocation(future, metaReplicaZNodes);
} else {
// retry again after pauseTime.
long pauseTime =
ConnectionUtils.getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries);
pauseTime = Math.min(pauseTime, maxPauseTime);
AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime,
TimeUnit.MICROSECONDS);
}
});
} else {
future.completeExceptionally(new IOException("Procedure wasn't completed in "
+ "expectedTime:" + expectedTimeout + " ms"));
}
getMetaRegionLocation(future, metaReplicaZNodes);
});
}
};
// Queue the polling task into RETRY_TIMER to poll procedure state asynchronously.
AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS);

return future;
}, "ZKConnectionRegistry.getMetaRegionLocations");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ private ClusterConnectionFactory() {
}

private static AsyncClusterConnection createAsyncClusterConnection(Configuration conf,
ConnectionRegistry registry, SocketAddress localAddress, User user) throws IOException {
ConnectionRegistry registry, SocketAddress localAddress, User user ) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not used?

String clusterId = FutureUtils.get(registry.getClusterId());
Class<? extends AsyncClusterConnection> clazz =
conf.getClass(HBASE_SERVER_CLUSTER_CONNECTION_IMPL, AsyncClusterConnectionImpl.class,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ public void updateCachedLocationOnError(HRegionLocation loc, Throwable error) {
@Override
public RegionLocations getRegionLocations(TableName tableName, int replicaId,
boolean reload) throws Exception {
final Configuration conf = HBaseConfiguration.create();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove this useless code?

return locator.getRegionLocations(replicaId, reload).get();
}
});
Expand Down