Skip to content

Commit d5334fa

Browse files
authored
YARN-6537. Running RM tests against the Router. (#5957) Contributed by Shilun Fan.
Reviewed-by: Inigo Goiri <inigoiri@apache.org> Signed-off-by: Shilun Fan <slfan1989@apache.org>
1 parent 9857fbc commit d5334fa

12 files changed

Lines changed: 729 additions & 87 deletions

File tree

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -517,7 +517,7 @@ public static boolean isAclEnabled(Configuration conf) {
517517
public static final boolean DEFAULT_YARN_INTERMEDIATE_DATA_ENCRYPTION = false;
518518

519519
/** The address of the RM admin interface.*/
520-
public static final String RM_ADMIN_ADDRESS =
520+
public static final String RM_ADMIN_ADDRESS =
521521
RM_PREFIX + "admin.address";
522522
public static final int DEFAULT_RM_ADMIN_PORT = 8033;
523523
public static final String DEFAULT_RM_ADMIN_ADDRESS = "0.0.0.0:" +

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1074,4 +1074,9 @@ private NMLogAggregationStatusTracker createNMLogAggregationStatusTracker(
10741074
public AsyncDispatcher getDispatcher() {
10751075
return dispatcher;
10761076
}
1077+
1078+
@VisibleForTesting
1079+
public void disableWebServer() {
1080+
removeService(((NMContext) context).webServer);
1081+
}
10771082
}

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/pom.xml

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,24 @@
150150
<type>test-jar</type>
151151
</dependency>
152152

153+
<dependency>
154+
<groupId>org.apache.curator</groupId>
155+
<artifactId>curator-client</artifactId>
156+
</dependency>
157+
158+
<dependency>
159+
<groupId>org.apache.curator</groupId>
160+
<artifactId>curator-test</artifactId>
161+
<scope>test</scope>
162+
</dependency>
163+
164+
<dependency>
165+
<groupId>org.apache.hadoop</groupId>
166+
<artifactId>hadoop-yarn-server-tests</artifactId>
167+
<scope>test</scope>
168+
<type>test-jar</type>
169+
</dependency>
170+
153171
</dependencies>
154172

155173
<build>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,210 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.yarn.server.router.subcluster;
19+
20+
import com.sun.jersey.api.client.Client;
21+
import com.sun.jersey.api.client.ClientResponse;
22+
import com.sun.jersey.api.client.WebResource;
23+
import org.apache.commons.collections.CollectionUtils;
24+
import org.apache.curator.framework.CuratorFramework;
25+
import org.apache.curator.framework.CuratorFrameworkFactory;
26+
import org.apache.curator.framework.state.ConnectionState;
27+
import org.apache.curator.retry.RetryNTimes;
28+
import org.apache.curator.test.TestingServer;
29+
import org.apache.hadoop.conf.Configuration;
30+
import org.apache.hadoop.fs.CommonConfigurationKeys;
31+
import org.apache.hadoop.io.retry.RetryPolicy;
32+
import org.apache.hadoop.security.UserGroupInformation;
33+
import org.apache.hadoop.test.GenericTestUtils;
34+
import org.apache.hadoop.util.Time;
35+
import org.apache.hadoop.yarn.conf.YarnConfiguration;
36+
import org.apache.hadoop.yarn.exceptions.YarnException;
37+
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
38+
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest;
39+
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoResponse;
40+
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
41+
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
42+
import org.apache.hadoop.yarn.server.router.webapp.JavaProcess;
43+
import org.slf4j.Logger;
44+
import org.slf4j.LoggerFactory;
45+
46+
import java.io.File;
47+
import java.io.IOException;
48+
import java.security.PrivilegedExceptionAction;
49+
import java.util.LinkedList;
50+
import java.util.List;
51+
import java.util.concurrent.TimeoutException;
52+
53+
import static javax.servlet.http.HttpServletResponse.SC_OK;
54+
import static javax.ws.rs.core.MediaType.APPLICATION_XML;
55+
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.RM_WEB_SERVICE_PATH;
56+
import static org.apache.hadoop.yarn.server.router.webapp.TestRouterWebServicesREST.waitWebAppRunning;
57+
import static org.junit.Assert.assertEquals;
58+
59+
public class TestFederationSubCluster {
60+
61+
private static final Logger LOG = LoggerFactory.getLogger(TestFederationSubCluster.class);
62+
private static TestingServer curatorTestingServer;
63+
private static CuratorFramework curatorFramework;
64+
private static JavaProcess subCluster1;
65+
private static JavaProcess subCluster2;
66+
private static JavaProcess router;
67+
public static final String ZK_FEDERATION_STATESTORE =
68+
"org.apache.hadoop.yarn.server.federation.store.impl.ZookeeperFederationStateStore";
69+
private static String userName = "test";
70+
71+
public void startFederationSubCluster(int zkPort, String sc1Param,
72+
String sc2Param, String routerParam) throws IOException, InterruptedException,
73+
YarnException, TimeoutException {
74+
75+
// Step1. Initialize ZK's service.
76+
try {
77+
curatorTestingServer = new TestingServer(zkPort);
78+
curatorTestingServer.start();
79+
String connectString = curatorTestingServer.getConnectString();
80+
curatorFramework = CuratorFrameworkFactory.builder()
81+
.connectString(connectString)
82+
.retryPolicy(new RetryNTimes(100, 100))
83+
.build();
84+
curatorFramework.start();
85+
curatorFramework.getConnectionStateListenable().addListener((client, newState) -> {
86+
if (newState == ConnectionState.CONNECTED) {
87+
System.out.println("Connected to the ZooKeeper server!");
88+
}
89+
});
90+
} catch (Exception e) {
91+
LOG.error("Cannot initialize ZooKeeper store.", e);
92+
throw new IOException(e);
93+
}
94+
95+
// Step2. Create a temporary directory output log.
96+
File baseDir = GenericTestUtils.getTestDir("processes");
97+
baseDir.mkdirs();
98+
String baseName = TestFederationSubCluster.class.getSimpleName();
99+
100+
// Step3. Initialize subCluster SC-1
101+
String sc1WebAddress = getSCWebAddress(sc1Param);
102+
File rmOutput = new File(baseDir, baseName + "-" + Time.now() + "-rm.log");
103+
rmOutput.createNewFile();
104+
List<String> addClasspath = new LinkedList<>();
105+
addClasspath.add("../hadoop-yarn-server-timelineservice/target/classes");
106+
subCluster1 = new JavaProcess(TestMockSubCluster.class, addClasspath, rmOutput, sc1Param);
107+
waitWebAppRunning(sc1WebAddress, RM_WEB_SERVICE_PATH);
108+
109+
// Step4. Initialize subCluster SC-2
110+
String sc2WebAddress = getSCWebAddress(sc2Param);
111+
File rmOutput2 = new File(baseDir, baseName + "-" + Time.now() + "-rm.log");
112+
rmOutput2.createNewFile();
113+
List<String> addClasspath2 = new LinkedList<>();
114+
addClasspath2.add("../hadoop-yarn-server-timelineservice/target/classes");
115+
subCluster2 = new JavaProcess(TestMockSubCluster.class, addClasspath2, rmOutput2, sc2Param);
116+
waitWebAppRunning(sc2WebAddress, RM_WEB_SERVICE_PATH);
117+
118+
// Step5. Confirm that subClusters have been registered to ZK.
119+
String zkAddress = getZkAddress(zkPort);
120+
verifyRegistration(zkAddress);
121+
122+
// Step6. Initialize router
123+
String routerWebAddress = getRouterWebAddress(routerParam);
124+
File routerOutput = new File(baseDir, baseName + "-" + Time.now() + "-router.log");
125+
routerOutput.createNewFile();
126+
router = new JavaProcess(TestMockRouter.class, null, routerOutput, routerParam);
127+
waitWebAppRunning(routerWebAddress, RM_WEB_SERVICE_PATH);
128+
}
129+
130+
private String getSCWebAddress(String scParam) {
131+
String[] scParams = scParam.split(",");
132+
return "http://localhost:" + scParams[3];
133+
}
134+
135+
private String getRouterWebAddress(String routerParam) {
136+
String[] routerParams = routerParam.split(",");
137+
return "http://localhost:" + routerParams[2];
138+
}
139+
140+
private String getZkAddress(int port) {
141+
return "localhost:" + port;
142+
}
143+
144+
public void stop() throws Exception {
145+
if (subCluster1 != null) {
146+
subCluster1.stop();
147+
}
148+
if (subCluster2 != null) {
149+
subCluster2.stop();
150+
}
151+
if (router != null) {
152+
router.stop();
153+
}
154+
if (curatorTestingServer != null) {
155+
curatorTestingServer.stop();
156+
}
157+
}
158+
159+
private void verifyRegistration(String zkAddress)
160+
throws YarnException, InterruptedException, TimeoutException {
161+
Configuration conf = new Configuration();
162+
conf.setBoolean(YarnConfiguration.FEDERATION_ENABLED, true);
163+
conf.set(YarnConfiguration.FEDERATION_STATESTORE_CLIENT_CLASS, ZK_FEDERATION_STATESTORE);
164+
conf.set(CommonConfigurationKeys.ZK_ADDRESS, zkAddress);
165+
RetryPolicy retryPolicy = FederationStateStoreFacade.createRetryPolicy(conf);
166+
FederationStateStore stateStore = (FederationStateStore)
167+
FederationStateStoreFacade.createRetryInstance(conf,
168+
YarnConfiguration.FEDERATION_STATESTORE_CLIENT_CLASS,
169+
YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_CLIENT_CLASS,
170+
FederationStateStore.class, retryPolicy);
171+
stateStore.init(conf);
172+
FederationStateStoreFacade.getInstance().reinitialize(stateStore, conf);
173+
GetSubClustersInfoRequest request = GetSubClustersInfoRequest.newInstance(true);
174+
175+
GenericTestUtils.waitFor(() -> {
176+
try {
177+
GetSubClustersInfoResponse response = stateStore.getSubClusters(request);
178+
List<SubClusterInfo> subClusters = response.getSubClusters();
179+
if (CollectionUtils.isNotEmpty(subClusters)) {
180+
return true;
181+
}
182+
} catch (Exception e) {
183+
}
184+
return false;
185+
}, 5000, 50 * 1000);
186+
}
187+
188+
public static <T> T performGetCalls(final String routerAddress, final String path,
189+
final Class<T> returnType, final String queryName,
190+
final String queryValue) throws IOException, InterruptedException {
191+
192+
Client clientToRouter = Client.create();
193+
WebResource toRouter = clientToRouter.resource(routerAddress).path(path);
194+
195+
final WebResource.Builder toRouterBuilder;
196+
197+
if (queryValue != null && queryName != null) {
198+
toRouterBuilder = toRouter.queryParam(queryName, queryValue).accept(APPLICATION_XML);
199+
} else {
200+
toRouterBuilder = toRouter.accept(APPLICATION_XML);
201+
}
202+
203+
return UserGroupInformation.createRemoteUser(userName).doAs(
204+
(PrivilegedExceptionAction<T>) () -> {
205+
ClientResponse response = toRouterBuilder.get(ClientResponse.class);
206+
assertEquals(SC_OK, response.getStatus());
207+
return response.getEntity(returnType);
208+
});
209+
}
210+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.yarn.server.router.subcluster;
19+
20+
import org.apache.commons.lang3.ArrayUtils;
21+
import org.apache.hadoop.fs.CommonConfigurationKeys;
22+
import org.apache.hadoop.io.retry.RetryPolicy;
23+
import org.apache.hadoop.yarn.conf.YarnConfiguration;
24+
import org.apache.hadoop.yarn.exceptions.YarnException;
25+
import org.apache.hadoop.yarn.server.MiniYARNCluster;
26+
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
27+
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
28+
import org.apache.hadoop.yarn.server.router.Router;
29+
import org.slf4j.Logger;
30+
import org.slf4j.LoggerFactory;
31+
32+
/**
33+
* Tests {@link Router}.
34+
*/
35+
public class TestMockRouter {
36+
37+
private static final Logger LOG = LoggerFactory.getLogger(TestMockRouter.class);
38+
39+
public TestMockRouter() {
40+
}
41+
42+
public static void main(String[] args) throws YarnException {
43+
if (ArrayUtils.isEmpty(args)) {
44+
return;
45+
}
46+
47+
// Step1. Parse the parameters.
48+
String[] params = args[0].split(",");
49+
int pRouterClientRMPort = Integer.parseInt(params[0]);
50+
int pRouterAdminAddressPort = Integer.parseInt(params[1]);
51+
int pRouterWebAddressPort = Integer.parseInt(params[2]);
52+
String zkAddress = params[3];
53+
54+
LOG.info("routerClientRMPort={}, routerAdminAddressPort={}, routerWebAddressPort={}, " +
55+
"zkAddress = {}.", pRouterClientRMPort, pRouterAdminAddressPort,
56+
pRouterWebAddressPort, zkAddress);
57+
58+
YarnConfiguration conf = new YarnConfiguration();
59+
Router router = new Router();
60+
conf.setBoolean(YarnConfiguration.FEDERATION_ENABLED, true);
61+
conf.set(YarnConfiguration.FEDERATION_STATESTORE_CLIENT_CLASS,
62+
"org.apache.hadoop.yarn.server.federation.store.impl.ZookeeperFederationStateStore");
63+
conf.set(YarnConfiguration.ROUTER_WEBAPP_INTERCEPTOR_CLASS_PIPELINE,
64+
"org.apache.hadoop.yarn.server.router.webapp.FederationInterceptorREST");
65+
conf.set(YarnConfiguration.ROUTER_CLIENTRM_INTERCEPTOR_CLASS_PIPELINE,
66+
"org.apache.hadoop.yarn.server.router.clientrm.FederationClientInterceptor");
67+
conf.set(CommonConfigurationKeys.ZK_ADDRESS, zkAddress);
68+
conf.set(YarnConfiguration.ROUTER_RMADMIN_INTERCEPTOR_CLASS_PIPELINE,
69+
"org.apache.hadoop.yarn.server.router.rmadmin.FederationRMAdminInterceptor");
70+
conf.setInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS, -1);
71+
conf.set(YarnConfiguration.ROUTER_CLIENTRM_ADDRESS, getHostNameAndPort(pRouterClientRMPort));
72+
conf.set(YarnConfiguration.ROUTER_RMADMIN_ADDRESS, getHostNameAndPort(pRouterAdminAddressPort));
73+
conf.set(YarnConfiguration.ROUTER_WEBAPP_ADDRESS,
74+
getHostNameAndPort(pRouterWebAddressPort));
75+
76+
RetryPolicy retryPolicy = FederationStateStoreFacade.createRetryPolicy(conf);
77+
78+
router.init(conf);
79+
router.start();
80+
81+
FederationStateStore stateStore = (FederationStateStore)
82+
FederationStateStoreFacade.createRetryInstance(conf,
83+
YarnConfiguration.FEDERATION_STATESTORE_CLIENT_CLASS,
84+
YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_CLIENT_CLASS,
85+
FederationStateStore.class, retryPolicy);
86+
stateStore.init(conf);
87+
FederationStateStoreFacade.getInstance().reinitialize(stateStore, conf);
88+
}
89+
90+
private static String getHostNameAndPort(int port) {
91+
return MiniYARNCluster.getHostname() + ":" + port;
92+
}
93+
}

0 commit comments

Comments
 (0)