Skip to content

Commit a0d60ad

Browse files
Divneet18virajjasani
authored andcommitted
HBASE-28428 : Zookeeper ConnectionRegistry APIs should have timeout (#6095) (#5837)
Signed-off-by: Duo Zhang <zhangduo@apache.org> Signed-off-by: Pankaj Kumar <pankajkumar@apache.org> Signed-off-by: Viraj Jasani <vjasani@apache.org>
1 parent 290162c commit a0d60ad

3 files changed

Lines changed: 74 additions & 6 deletions

File tree

hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKConnectionRegistry.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -62,10 +62,17 @@ class ZKConnectionRegistry implements ConnectionRegistry {
6262
private final ReadOnlyZKClient zk;
6363

6464
private final ZNodePaths znodePaths;
65+
private final Configuration conf;
66+
private final int zkRegistryAsyncTimeout;
67+
public static final String ZK_REGISTRY_ASYNC_GET_TIMEOUT = "zookeeper.registry.async.get.timeout";
68+
public static final int DEFAULT_ZK_REGISTRY_ASYNC_GET_TIMEOUT = 60000; // 1 min
6569

6670
ZKConnectionRegistry(Configuration conf) {
6771
this.znodePaths = new ZNodePaths(conf);
68-
this.zk = new ReadOnlyZKClient(conf);
72+
this.zk = new ReadOnlyZKClient(conf, AsyncConnectionImpl.RETRY_TIMER);
73+
this.conf = conf;
74+
this.zkRegistryAsyncTimeout =
75+
conf.getInt(ZK_REGISTRY_ASYNC_GET_TIMEOUT, DEFAULT_ZK_REGISTRY_ASYNC_GET_TIMEOUT);
6976
}
7077

7178
private interface Converter<T> {
@@ -74,7 +81,7 @@ private interface Converter<T> {
7481

7582
private <T> CompletableFuture<T> getAndConvert(String path, Converter<T> converter) {
7683
CompletableFuture<T> future = new CompletableFuture<>();
77-
addListener(zk.get(path), (data, error) -> {
84+
addListener(zk.get(path, this.zkRegistryAsyncTimeout), (data, error) -> {
7885
if (error != null) {
7986
future.completeExceptionally(error);
8087
return;
@@ -208,8 +215,8 @@ public CompletableFuture<RegionLocations> getMetaRegionLocations() {
208215
return tracedFuture(() -> {
209216
CompletableFuture<RegionLocations> future = new CompletableFuture<>();
210217
addListener(
211-
zk.list(znodePaths.baseZNode).thenApply(children -> children.stream()
212-
.filter(c -> this.znodePaths.isMetaZNodePrefix(c)).collect(Collectors.toList())),
218+
zk.list(znodePaths.baseZNode, this.zkRegistryAsyncTimeout).thenApply(children -> children
219+
.stream().filter(c -> this.znodePaths.isMetaZNodePrefix(c)).collect(Collectors.toList())),
213220
(metaReplicaZNodes, error) -> {
214221
if (error != null) {
215222
future.completeExceptionally(error);

hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ReadOnlyZKClient.java

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,9 @@
4343
import org.slf4j.Logger;
4444
import org.slf4j.LoggerFactory;
4545

46+
import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
47+
import org.apache.hbase.thirdparty.io.netty.util.TimerTask;
48+
4649
/**
4750
* A very simple read only zookeeper implementation without watcher support.
4851
*/
@@ -76,6 +79,8 @@ public final class ReadOnlyZKClient implements Closeable {
7679

7780
private final int keepAliveTimeMs;
7881

82+
private HashedWheelTimer retryTimer;
83+
7984
private final ZKClientConfig zkClientConfig;
8085

8186
private static abstract class Task implements Delayed {
@@ -126,7 +131,7 @@ private String getId() {
126131
return String.format("0x%08x", System.identityHashCode(this));
127132
}
128133

129-
public ReadOnlyZKClient(Configuration conf) {
134+
public ReadOnlyZKClient(Configuration conf, HashedWheelTimer retryTimer) {
130135
// We might use a different ZK for client access
131136
String clientZkQuorumServers = ZKConfig.getClientZKQuorumServersString(conf);
132137
if (clientZkQuorumServers != null) {
@@ -139,6 +144,7 @@ public ReadOnlyZKClient(Configuration conf) {
139144
this.retryIntervalMs =
140145
conf.getInt(RECOVERY_RETRY_INTERVAL_MILLIS, DEFAULT_RECOVERY_RETRY_INTERVAL_MILLIS);
141146
this.keepAliveTimeMs = conf.getInt(KEEPALIVE_MILLIS, DEFAULT_KEEPALIVE_MILLIS);
147+
this.retryTimer = retryTimer;
142148
this.zkClientConfig = ZKConfig.getZKClientConfig(conf);
143149
LOG.debug(
144150
"Connect {} to {} with session timeout={}ms, retries={}, "
@@ -258,6 +264,23 @@ public void closed(IOException e) {
258264
}
259265
}
260266

267+
private static TimerTask getTimerTask(final long timeoutMs, final CompletableFuture<?> future,
268+
final String api) {
269+
return timeout -> {
270+
if (!future.isDone()) {
271+
future.completeExceptionally(new DoNotRetryIOException(
272+
"Zookeeper " + api + " could not be completed in " + timeoutMs + " ms"));
273+
}
274+
};
275+
}
276+
277+
public CompletableFuture<byte[]> get(final String path, final long timeoutMs) {
278+
CompletableFuture<byte[]> future = get(path);
279+
TimerTask timerTask = getTimerTask(timeoutMs, future, "GET");
280+
retryTimer.newTimeout(timerTask, timeoutMs + 1, TimeUnit.MILLISECONDS);
281+
return future;
282+
}
283+
261284
public CompletableFuture<byte[]> get(String path) {
262285
if (closed.get()) {
263286
return FutureUtils.failedFuture(new DoNotRetryIOException("Client already closed"));
@@ -274,6 +297,13 @@ protected void doExec(ZooKeeper zk) {
274297
return future;
275298
}
276299

300+
public CompletableFuture<Stat> exists(String path, long timeoutMs) {
301+
CompletableFuture<Stat> future = exists(path);
302+
TimerTask timerTask = getTimerTask(timeoutMs, future, "EXISTS");
303+
retryTimer.newTimeout(timerTask, timeoutMs + 1, TimeUnit.MILLISECONDS);
304+
return future;
305+
}
306+
277307
public CompletableFuture<Stat> exists(String path) {
278308
if (closed.get()) {
279309
return FutureUtils.failedFuture(new DoNotRetryIOException("Client already closed"));
@@ -289,6 +319,13 @@ protected void doExec(ZooKeeper zk) {
289319
return future;
290320
}
291321

322+
public CompletableFuture<List<String>> list(String path, long timeoutMs) {
323+
CompletableFuture<List<String>> future = list(path);
324+
TimerTask timerTask = getTimerTask(timeoutMs, future, "LIST");
325+
retryTimer.newTimeout(timerTask, timeoutMs + 1, TimeUnit.MILLISECONDS);
326+
return future;
327+
}
328+
292329
public CompletableFuture<List<String>> list(String path) {
293330
if (closed.get()) {
294331
return FutureUtils.failedFuture(new DoNotRetryIOException("Client already closed"));

hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestReadOnlyZKClient.java

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import java.util.concurrent.CompletableFuture;
4444
import java.util.concurrent.Exchanger;
4545
import java.util.concurrent.ExecutionException;
46+
import java.util.concurrent.TimeUnit;
4647
import org.apache.hadoop.conf.Configuration;
4748
import org.apache.hadoop.hbase.HBaseClassTestRule;
4849
import org.apache.hadoop.hbase.HBaseZKTestingUtility;
@@ -51,6 +52,7 @@
5152
import org.apache.hadoop.hbase.testclassification.MediumTests;
5253
import org.apache.hadoop.hbase.testclassification.ZKTests;
5354
import org.apache.hadoop.hbase.util.Bytes;
55+
import org.apache.hadoop.hbase.util.Threads;
5456
import org.apache.zookeeper.AsyncCallback;
5557
import org.apache.zookeeper.CreateMode;
5658
import org.apache.zookeeper.KeeperException;
@@ -63,6 +65,9 @@
6365
import org.junit.Test;
6466
import org.junit.experimental.categories.Category;
6567

68+
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
69+
import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
70+
6671
@Category({ ZKTests.class, MediumTests.class })
6772
public class TestReadOnlyZKClient {
6873

@@ -79,6 +84,10 @@ public class TestReadOnlyZKClient {
7984
private static int CHILDREN = 5;
8085

8186
private static ReadOnlyZKClient RO_ZK;
87+
private static final HashedWheelTimer RETRY_TIMER = new HashedWheelTimer(
88+
new ThreadFactoryBuilder().setNameFormat("Async-Client-Retry-Timer-pool-%d").setDaemon(true)
89+
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build(),
90+
10, TimeUnit.MILLISECONDS);
8291

8392
@BeforeClass
8493
public static void setUp() throws Exception {
@@ -98,13 +107,14 @@ public static void setUp() throws Exception {
98107
conf.setInt(ReadOnlyZKClient.RECOVERY_RETRY, 3);
99108
conf.setInt(ReadOnlyZKClient.RECOVERY_RETRY_INTERVAL_MILLIS, 100);
100109
conf.setInt(ReadOnlyZKClient.KEEPALIVE_MILLIS, 3000);
101-
RO_ZK = new ReadOnlyZKClient(conf);
110+
RO_ZK = new ReadOnlyZKClient(conf, RETRY_TIMER);
102111
// only connect when necessary
103112
assertNull(RO_ZK.zookeeper);
104113
}
105114

106115
@AfterClass
107116
public static void tearDown() throws IOException {
117+
RETRY_TIMER.stop();
108118
RO_ZK.close();
109119
UTIL.shutdownMiniZKCluster();
110120
UTIL.cleanupTestDir();
@@ -204,4 +214,18 @@ public void testNotCloseZkWhenPending() throws Exception {
204214
waitForIdleConnectionClosed();
205215
verify(mockedZK, times(1)).close();
206216
}
217+
218+
@Test
219+
public void testReadWithTimeout() throws Exception {
220+
assertArrayEquals(DATA, RO_ZK.get(PATH, 60000).get());
221+
assertEquals(CHILDREN, RO_ZK.exists(PATH, 60000).get().getNumChildren());
222+
List<String> children = RO_ZK.list(PATH, 60000).get();
223+
assertEquals(CHILDREN, children.size());
224+
Collections.sort(children);
225+
for (int i = 0; i < CHILDREN; i++) {
226+
assertEquals("c" + i, children.get(i));
227+
}
228+
assertNotNull(RO_ZK.zookeeper);
229+
waitForIdleConnectionClosed();
230+
}
207231
}

0 commit comments

Comments
 (0)