Skip to content

Commit 8e72842

Browse files
committed
HDFS-17751. [ARR] Add unit tests using asynchronous router rpc for all in org.apache.hadoop.hdfs.server.federation.router.
1 parent 4452980 commit 8e72842

12 files changed

Lines changed: 1206 additions & 280 deletions

File tree

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2004,10 +2004,11 @@ private static boolean isReadCall(Method method) {
20042004
* Checks and sets last refresh time for a namespace's stateId.
20052005
* Returns true if refresh time is newer than threshold.
20062006
* Otherwise, return false and call should be handled by active namenode.
2007-
* @param nsId namespaceID
2007+
* @param nsId namespaceID.
2008+
* @return true if refresh time is newer than threshold. Otherwise, return false.
20082009
*/
20092010
@VisibleForTesting
2010-
boolean isNamespaceStateIdFresh(String nsId) {
2011+
public boolean isNamespaceStateIdFresh(String nsId) {
20112012
if (activeNNStateIdRefreshPeriodMs < 0) {
20122013
return true;
20132014
}

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterStateIdContext.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ public class RouterStateIdContext implements AlignmentContext {
6464
/** Nameservice specific overrides of the default setting for enabling observer reads. */
6565
private HashSet<String> observerReadEnabledOverrides = new HashSet<>();
6666

67-
RouterStateIdContext(Configuration conf) {
67+
public RouterStateIdContext(Configuration conf) {
6868
this.coordinatedMethods = new HashSet<>();
6969
// For now, only ClientProtocol methods can be coordinated, so only checking
7070
// against ClientProtocol.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.hdfs.server.federation.router;
20+
21+
public final class TestRouterConstants {
22+
23+
private TestRouterConstants() {
24+
25+
}
26+
27+
public static final String ASYNC_MODE = "ASYNC";
28+
public static final String SYNC_MODE = "SYNC";
29+
}

hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterMountTableWithoutDefaultNS.java

Lines changed: 175 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -38,16 +38,24 @@
3838
import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryRequest;
3939
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
4040
import org.apache.hadoop.test.LambdaTestUtils;
41-
import org.junit.jupiter.api.AfterEach;
42-
import org.junit.jupiter.api.AfterAll;
43-
import org.junit.jupiter.api.BeforeAll;
44-
import org.junit.jupiter.api.Test;
41+
import org.junit.jupiter.api.Nested;
42+
import org.junit.jupiter.api.extension.AfterAllCallback;
43+
import org.junit.jupiter.api.extension.AfterEachCallback;
44+
import org.junit.jupiter.api.extension.BeforeEachCallback;
45+
import org.junit.jupiter.api.extension.ExtendWith;
46+
import org.junit.jupiter.api.extension.ExtensionContext;
47+
import org.junit.jupiter.params.ParameterizedTest;
48+
import org.junit.jupiter.params.provider.ValueSource;
4549

4650
import java.io.IOException;
51+
import java.lang.reflect.Method;
4752
import java.util.Collections;
4853
import java.util.List;
4954
import java.util.Map;
5055

56+
import static org.apache.hadoop.hdfs.server.federation.router.TestRouterConstants.ASYNC_MODE;
57+
import static org.apache.hadoop.hdfs.server.federation.router.TestRouterConstants.SYNC_MODE;
58+
import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.syncReturn;
5159
import static org.junit.jupiter.api.Assertions.assertEquals;
5260
import static org.junit.jupiter.api.Assertions.assertNotNull;
5361
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -64,8 +72,23 @@ public class TestRouterMountTableWithoutDefaultNS {
6472
private static FileSystem nnFs0;
6573
private static FileSystem nnFs1;
6674

67-
@BeforeAll
68-
public static void globalSetUp() throws Exception {
75+
public static StateStoreDFSCluster getCluster() {
76+
return cluster;
77+
}
78+
79+
public static void setCluster(StateStoreDFSCluster cluster) {
80+
TestRouterMountTableWithoutDefaultNS.cluster = cluster;
81+
}
82+
83+
public static RouterContext getRouterContext() {
84+
return routerContext;
85+
}
86+
87+
public static void setRouterContext(RouterContext routerContext) {
88+
TestRouterMountTableWithoutDefaultNS.routerContext = routerContext;
89+
}
90+
91+
public static void globalSetUp(String rpcMode) throws Exception {
6992
// Build and start a federated cluster
7093
cluster = new StateStoreDFSCluster(false, 2);
7194
Configuration conf = new RouterConfigBuilder()
@@ -75,6 +98,9 @@ public static void globalSetUp() throws Exception {
7598
.build();
7699
conf.setInt(RBFConfigKeys.DFS_ROUTER_ADMIN_MAX_COMPONENT_LENGTH_KEY, 20);
77100
conf.setBoolean(RBFConfigKeys.DFS_ROUTER_DEFAULT_NAMESERVICE_ENABLE, false);
101+
if (rpcMode.equals(ASYNC_MODE)) {
102+
conf.setBoolean(RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_ENABLE_KEY, true);
103+
}
78104
cluster.addRouterOverrides(conf);
79105
cluster.startCluster();
80106
cluster.startRouters();
@@ -89,17 +115,7 @@ public static void globalSetUp() throws Exception {
89115
mountTable = (MountTableResolver) router.getSubclusterResolver();
90116
}
91117

92-
@AfterAll
93-
public static void tearDown() {
94-
if (cluster != null) {
95-
cluster.stopRouter(routerContext);
96-
cluster.shutdown();
97-
cluster = null;
98-
}
99-
}
100-
101-
@AfterEach
102-
public void clearMountTable() throws IOException {
118+
public static void clearMountTable() throws IOException {
103119
RouterClient client = routerContext.getAdminClient();
104120
MountTableManager mountTableManager = client.getMountTableManager();
105121
GetMountTableEntriesRequest req1 = GetMountTableEntriesRequest.newInstance("/");
@@ -129,11 +145,90 @@ private boolean addMountTable(final MountTable entry) throws IOException {
129145
return addResponse.getStatus();
130146
}
131147

148+
@Nested
149+
@ExtendWith(RouterServerHelperInTestRouterMountTableWithoutDefaultNS.class)
150+
class TestWithAsyncRouterRpc {
151+
@ParameterizedTest
152+
@ValueSource(strings = {ASYNC_MODE})
153+
public void testGetFileInfoWithSubMountPointAsync() throws IOException {
154+
testGetFileInfoWithSubMountPoint();
155+
}
156+
157+
@ParameterizedTest
158+
@ValueSource(strings = {ASYNC_MODE})
159+
public void testGetFileInfoWithoutSubMountPointAsync() throws Exception {
160+
testGetFileInfoWithoutSubMountPoint(ASYNC_MODE);
161+
}
162+
163+
@ParameterizedTest
164+
@ValueSource(strings = {ASYNC_MODE})
165+
public void testGetContentSummaryWithSubMountPointAsync() throws Exception {
166+
testGetContentSummaryWithSubMountPoint(ASYNC_MODE);
167+
}
168+
169+
@ParameterizedTest
170+
@ValueSource(strings = {ASYNC_MODE})
171+
public void testGetAllLocationsAsync() throws IOException {
172+
testGetAllLocations();
173+
}
174+
175+
@ParameterizedTest
176+
@ValueSource(strings = {ASYNC_MODE})
177+
public void testGetLocationsForContentSummaryAsync() throws Exception {
178+
testGetLocationsForContentSummary();
179+
}
180+
181+
@ParameterizedTest
182+
@ValueSource(strings = {ASYNC_MODE})
183+
public void testGetContentSummaryAsync() throws Exception {
184+
testGetContentSummary(ASYNC_MODE);
185+
}
186+
}
187+
188+
@Nested
189+
@ExtendWith(RouterServerHelperInTestRouterMountTableWithoutDefaultNS.class)
190+
class TestWithSyncRouterRpc {
191+
@ParameterizedTest
192+
@ValueSource(strings = {SYNC_MODE})
193+
public void testGetFileInfoWithSubMountPointSync() throws IOException {
194+
testGetFileInfoWithSubMountPoint();
195+
}
196+
197+
@ParameterizedTest
198+
@ValueSource(strings = {SYNC_MODE})
199+
public void testGetFileInfoWithoutSubMountPointSync() throws Exception {
200+
testGetFileInfoWithoutSubMountPoint(SYNC_MODE);
201+
}
202+
203+
@ParameterizedTest
204+
@ValueSource(strings = {SYNC_MODE})
205+
public void testGetContentSummaryWithSubMountPointSync() throws Exception {
206+
testGetContentSummaryWithSubMountPoint(SYNC_MODE);
207+
}
208+
209+
@ParameterizedTest
210+
@ValueSource(strings = {SYNC_MODE})
211+
public void testGetAllLocationsSync() throws IOException {
212+
testGetAllLocations();
213+
}
214+
215+
@ParameterizedTest
216+
@ValueSource(strings = {SYNC_MODE})
217+
public void testGetLocationsForContentSummarySync() throws Exception {
218+
testGetLocationsForContentSummary();
219+
}
220+
221+
@ParameterizedTest
222+
@ValueSource(strings = {SYNC_MODE})
223+
public void testGetContentSummarySync() throws Exception {
224+
testGetContentSummary(SYNC_MODE);
225+
}
226+
}
227+
132228
/**
133229
* Verify that RBF that disable default nameservice should support
134230
* get information about ancestor mount points.
135231
*/
136-
@Test
137232
public void testGetFileInfoWithSubMountPoint() throws IOException {
138233
MountTable addEntry = MountTable.newInstance("/testdir/1",
139234
Collections.singletonMap("ns0", "/testdir/1"));
@@ -148,21 +243,26 @@ public void testGetFileInfoWithSubMountPoint() throws IOException {
148243
* Verify that RBF doesn't support get the file information
149244
* with no location and sub mount points.
150245
*/
151-
@Test
152-
public void testGetFileInfoWithoutSubMountPoint() throws Exception {
246+
public void testGetFileInfoWithoutSubMountPoint(String rpcMode) throws Exception {
153247
MountTable addEntry = MountTable.newInstance("/testdir/1",
154248
Collections.singletonMap("ns0", "/testdir/1"));
155249
assertTrue(addMountTable(addEntry));
156250
LambdaTestUtils.intercept(RouterResolveException.class,
157-
() -> routerContext.getRouter().getRpcServer().getFileInfo("/testdir2"));
251+
() -> {
252+
if (rpcMode.equals(ASYNC_MODE)) {
253+
routerContext.getRouter().getRpcServer().getFileInfo("/testdir2");
254+
syncReturn(HdfsFileStatus.class);
255+
} else {
256+
routerContext.getRouter().getRpcServer().getFileInfo("/testdir2");
257+
}
258+
});
158259
}
159260

160261
/**
161262
* Verify that RBF that disable default nameservice should support
162263
* get information about ancestor mount points.
163264
*/
164-
@Test
165-
public void testGetContentSummaryWithSubMountPoint() throws IOException {
265+
public void testGetContentSummaryWithSubMountPoint(String rpcMode) throws Exception {
166266
MountTable addEntry = MountTable.newInstance("/testdir/1/2",
167267
Collections.singletonMap("ns0", "/testdir/1/2"));
168268
assertTrue(addMountTable(addEntry));
@@ -171,7 +271,13 @@ public void testGetContentSummaryWithSubMountPoint() throws IOException {
171271
writeData(nnFs0, new Path("/testdir/1/2/3"), 10 * 1024 * 1024);
172272

173273
RouterRpcServer routerRpcServer = routerContext.getRouterRpcServer();
174-
ContentSummary summaryFromRBF = routerRpcServer.getContentSummary("/testdir");
274+
ContentSummary summaryFromRBF;
275+
if (rpcMode.equals(ASYNC_MODE)) {
276+
routerRpcServer.getContentSummary("/testdir");
277+
summaryFromRBF = syncReturn(ContentSummary.class);
278+
} else {
279+
summaryFromRBF = routerRpcServer.getContentSummary("/testdir");
280+
}
175281
assertNotNull(summaryFromRBF);
176282
assertEquals(1, summaryFromRBF.getFileCount());
177283
assertEquals(10 * 1024 * 1024, summaryFromRBF.getLength());
@@ -180,7 +286,6 @@ public void testGetContentSummaryWithSubMountPoint() throws IOException {
180286
}
181287
}
182288

183-
@Test
184289
public void testGetAllLocations() throws IOException {
185290
// Add mount table entry.
186291
MountTable addEntry = MountTable.newInstance("/testA",
@@ -198,7 +303,6 @@ public void testGetAllLocations() throws IOException {
198303
assertEquals(3, locations.size());
199304
}
200305

201-
@Test
202306
public void testGetLocationsForContentSummary() throws Exception {
203307
// Add mount table entry.
204308
MountTable addEntry = MountTable.newInstance("/testA/testB",
@@ -227,8 +331,7 @@ public void testGetLocationsForContentSummary() throws Exception {
227331
() -> protocol.getLocationsForContentSummary("/testB"));
228332
}
229333

230-
@Test
231-
public void testGetContentSummary() throws Exception {
334+
public void testGetContentSummary(String rpcMode) throws Exception {
232335
try {
233336
// Add mount table entry.
234337
MountTable addEntry = MountTable.newInstance("/testA",
@@ -246,7 +349,13 @@ public void testGetContentSummary() throws Exception {
246349
writeData(nnFs1, new Path("/testA/testB/testC/file3"), 1024 * 1024);
247350

248351
RouterRpcServer routerRpcServer = routerContext.getRouterRpcServer();
249-
ContentSummary summary = routerRpcServer.getContentSummary("/testA");
352+
ContentSummary summary;
353+
if (rpcMode.equals(ASYNC_MODE)) {
354+
routerRpcServer.getContentSummary("/testA");
355+
summary = syncReturn(ContentSummary.class);
356+
} else {
357+
summary = routerRpcServer.getContentSummary("/testA");
358+
}
250359
assertEquals(3, summary.getFileCount());
251360
assertEquals(1024 * 1024 * 3, summary.getLength());
252361

@@ -266,3 +375,40 @@ void writeData(FileSystem fs, Path path, int fileLength) throws IOException {
266375
}
267376
}
268377
}
378+
379+
class RouterServerHelperInTestRouterMountTableWithoutDefaultNS implements
380+
AfterAllCallback, BeforeEachCallback, AfterEachCallback {
381+
public static final ThreadLocal<RouterServerHelperInTestRouterMountTableWithoutDefaultNS>
382+
TEST_ROUTER_SERVER_TL = new InheritableThreadLocal<>();
383+
384+
@Override
385+
public void afterAll(ExtensionContext context) {
386+
if (TestRouterMountTableWithoutDefaultNS.getCluster() != null) {
387+
TestRouterMountTableWithoutDefaultNS.getCluster().stopRouter(
388+
TestRouterMountTableWithoutDefaultNS.getRouterContext());
389+
TestRouterMountTableWithoutDefaultNS.getCluster().shutdown();
390+
TestRouterMountTableWithoutDefaultNS.setCluster(null);
391+
}
392+
TEST_ROUTER_SERVER_TL.remove();
393+
}
394+
395+
@Override
396+
public void afterEach(ExtensionContext context) throws IOException {
397+
TestRouterMountTableWithoutDefaultNS.clearMountTable();
398+
}
399+
400+
@Override
401+
public void beforeEach(ExtensionContext context) throws Exception {
402+
Method testMethod = context.getRequiredTestMethod();
403+
ValueSource enumAnnotation = testMethod.getAnnotation(ValueSource.class);
404+
if (enumAnnotation != null) {
405+
String[] strings = enumAnnotation.strings();
406+
for (String rpcMode : strings) {
407+
if (TEST_ROUTER_SERVER_TL.get() == null) {
408+
TestRouterMountTableWithoutDefaultNS.globalSetUp(rpcMode);
409+
}
410+
}
411+
}
412+
TEST_ROUTER_SERVER_TL.set(RouterServerHelperInTestRouterMountTableWithoutDefaultNS.this);
413+
}
414+
}

hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterNetworkTopologyServlet.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@
4141
import java.util.Map;
4242

4343
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HTTP_ENABLE;
44+
import static org.apache.hadoop.hdfs.server.federation.router.TestRouterConstants.ASYNC_MODE;
45+
import static org.apache.hadoop.hdfs.server.federation.router.TestRouterConstants.SYNC_MODE;
4446
import static org.junit.jupiter.api.Assertions.assertEquals;
4547
import static org.junit.jupiter.api.Assertions.assertTrue;
4648
import static org.apache.hadoop.hdfs.server.federation.router.TestRouterNetworkTopologyServlet.clusterNoDatanodes;
@@ -53,16 +55,13 @@ public class TestRouterNetworkTopologyServlet {
5355
public static StateStoreDFSCluster clusterWithDatanodes;
5456
public static StateStoreDFSCluster clusterNoDatanodes;
5557

56-
public static final String ASYNC_MODE = "ASYNC";
57-
public static final String SYNC_MODE = "SYNC";
58-
5958
public static void setUp(String rpcMode) throws Exception {
6059
// Builder configuration.
6160
Configuration routerConf =
6261
new RouterConfigBuilder().stateStore().admin().quota().rpc().build();
6362
routerConf.set(DFS_ROUTER_HTTP_ENABLE, "true");
6463
// Use async router rpc.
65-
if (rpcMode.equals("ASYNC")) {
64+
if (rpcMode.equals(ASYNC_MODE)) {
6665
routerConf.setBoolean(RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_ENABLE_KEY, true);
6766
}
6867
Configuration hdfsConf = new Configuration(false);

0 commit comments

Comments
 (0)