|
76 | 76 | import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; |
77 | 77 | import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; |
78 | 78 | import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; |
| 79 | +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; |
79 | 80 | import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; |
80 | 81 | import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerInfo; |
81 | 82 | import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerLeafQueueInfo; |
@@ -1102,6 +1103,157 @@ public void testMoveAllAppsInvalidSource() throws Exception { |
1102 | 1103 | rm.stop(); |
1103 | 1104 | } |
1104 | 1105 |
|
| 1106 | + @Test |
| 1107 | + public void testMoveAppWithActiveUsersWithOnlyPendingApps() throws Exception { |
| 1108 | + YarnConfiguration conf = new YarnConfiguration(); |
| 1109 | + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, |
| 1110 | + ResourceScheduler.class); |
| 1111 | + |
| 1112 | + CapacitySchedulerConfiguration newConf = |
| 1113 | + new CapacitySchedulerConfiguration(conf); |
| 1114 | + |
| 1115 | + // Define top-level queues |
| 1116 | + newConf.setQueues(CapacitySchedulerConfiguration.ROOT, |
| 1117 | + new String[]{"a", "b"}); |
| 1118 | + |
| 1119 | + newConf.setCapacity(A, 50); |
| 1120 | + newConf.setCapacity(B, 50); |
| 1121 | + |
| 1122 | + // Define 2nd-level queues |
| 1123 | + newConf.setQueues(A, new String[]{"a1"}); |
| 1124 | + newConf.setCapacity(A1, 100); |
| 1125 | + newConf.setUserLimitFactor(A1, 2.0f); |
| 1126 | + newConf.setMaximumAMResourcePercentPerPartition(A1, "", 0.1f); |
| 1127 | + |
| 1128 | + newConf.setQueues(B, new String[]{"b1"}); |
| 1129 | + newConf.setCapacity(B1, 100); |
| 1130 | + newConf.setUserLimitFactor(B1, 2.0f); |
| 1131 | + |
| 1132 | + MockRM rm = new MockRM(newConf); |
| 1133 | + rm.start(); |
| 1134 | + |
| 1135 | + CapacityScheduler scheduler = |
| 1136 | + (CapacityScheduler) rm.getResourceScheduler(); |
| 1137 | + |
| 1138 | + MockNM nm1 = rm.registerNode("h1:1234", 16 * GB); |
| 1139 | + |
| 1140 | + // submit an app |
| 1141 | + MockRMAppSubmissionData data3 = |
| 1142 | + MockRMAppSubmissionData.Builder.createWithMemory(GB, rm) |
| 1143 | + .withAppName("test-move-1") |
| 1144 | + .withUser("u1") |
| 1145 | + .withAcls(null) |
| 1146 | + .withQueue("a1") |
| 1147 | + .withUnmanagedAM(false) |
| 1148 | + .build(); |
| 1149 | + RMApp app = MockRMAppSubmitter.submit(rm, data3); |
| 1150 | + MockAM am1 = MockRM.launchAndRegisterAM(app, rm, nm1); |
| 1151 | + |
| 1152 | + ApplicationAttemptId appAttemptId = |
| 1153 | + rm.getApplicationReport(app.getApplicationId()) |
| 1154 | + .getCurrentApplicationAttemptId(); |
| 1155 | + |
| 1156 | + MockRMAppSubmissionData data2 = |
| 1157 | + MockRMAppSubmissionData.Builder.createWithMemory(1 * GB, rm) |
| 1158 | + .withAppName("app") |
| 1159 | + .withUser("u2") |
| 1160 | + .withAcls(null) |
| 1161 | + .withQueue("a1") |
| 1162 | + .withUnmanagedAM(false) |
| 1163 | + .build(); |
| 1164 | + RMApp app2 = MockRMAppSubmitter.submit(rm, data2); |
| 1165 | + MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm1); |
| 1166 | + |
| 1167 | + MockRMAppSubmissionData data1 = |
| 1168 | + MockRMAppSubmissionData.Builder.createWithMemory(1 * GB, rm) |
| 1169 | + .withAppName("app") |
| 1170 | + .withUser("u3") |
| 1171 | + .withAcls(null) |
| 1172 | + .withQueue("a1") |
| 1173 | + .withUnmanagedAM(false) |
| 1174 | + .build(); |
| 1175 | + RMApp app3 = MockRMAppSubmitter.submit(rm, data1); |
| 1176 | + |
| 1177 | + MockRMAppSubmissionData data = |
| 1178 | + MockRMAppSubmissionData.Builder.createWithMemory(1 * GB, rm) |
| 1179 | + .withAppName("app") |
| 1180 | + .withUser("u4") |
| 1181 | + .withAcls(null) |
| 1182 | + .withQueue("a1") |
| 1183 | + .withUnmanagedAM(false) |
| 1184 | + .build(); |
| 1185 | + RMApp app4 = MockRMAppSubmitter.submit(rm, data); |
| 1186 | + |
| 1187 | + // Each application asks 50 * 1GB containers |
| 1188 | + am1.allocate("*", 1 * GB, 50, null); |
| 1189 | + am2.allocate("*", 1 * GB, 50, null); |
| 1190 | + |
| 1191 | + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); |
| 1192 | + RMNode rmNode1 = rm.getRMContext().getRMNodes().get(nm1.getNodeId()); |
| 1193 | + |
| 1194 | + // check preconditions |
| 1195 | + assertApps(scheduler, "root", |
| 1196 | + app3.getCurrentAppAttempt().getAppAttemptId(), |
| 1197 | + app4.getCurrentAppAttempt().getAppAttemptId(), |
| 1198 | + appAttemptId, |
| 1199 | + app2.getCurrentAppAttempt().getAppAttemptId()); |
| 1200 | + assertApps(scheduler, "a", |
| 1201 | + app3.getCurrentAppAttempt().getAppAttemptId(), |
| 1202 | + app4.getCurrentAppAttempt().getAppAttemptId(), |
| 1203 | + appAttemptId, |
| 1204 | + app2.getCurrentAppAttempt().getAppAttemptId()); |
| 1205 | + assertApps(scheduler, "a1", |
| 1206 | + app3.getCurrentAppAttempt().getAppAttemptId(), |
| 1207 | + app4.getCurrentAppAttempt().getAppAttemptId(), |
| 1208 | + appAttemptId, |
| 1209 | + app2.getCurrentAppAttempt().getAppAttemptId()); |
| 1210 | + assertApps(scheduler, "b"); |
| 1211 | + assertApps(scheduler, "b1"); |
| 1212 | + |
| 1213 | + UsersManager um = |
| 1214 | + (UsersManager) scheduler.getQueue("a1").getAbstractUsersManager(); |
| 1215 | + |
| 1216 | + assertEquals(4, um.getNumActiveUsers()); |
| 1217 | + assertEquals(2, um.getNumActiveUsersWithOnlyPendingApps()); |
| 1218 | + |
| 1219 | + // now move the app |
| 1220 | + scheduler.moveAllApps("a1", "b1"); |
| 1221 | + |
| 1222 | + //Triggering this event so that user limit computation can |
| 1223 | + //happen again |
| 1224 | + for (int i = 0; i < 10; i++) { |
| 1225 | + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); |
| 1226 | + Thread.sleep(500); |
| 1227 | + } |
| 1228 | + |
| 1229 | + // check post conditions |
| 1230 | + assertApps(scheduler, "root", |
| 1231 | + appAttemptId, |
| 1232 | + app2.getCurrentAppAttempt().getAppAttemptId(), |
| 1233 | + app3.getCurrentAppAttempt().getAppAttemptId(), |
| 1234 | + app4.getCurrentAppAttempt().getAppAttemptId()); |
| 1235 | + assertApps(scheduler, "a"); |
| 1236 | + assertApps(scheduler, "a1"); |
| 1237 | + assertApps(scheduler, "b", |
| 1238 | + appAttemptId, |
| 1239 | + app2.getCurrentAppAttempt().getAppAttemptId(), |
| 1240 | + app3.getCurrentAppAttempt().getAppAttemptId(), |
| 1241 | + app4.getCurrentAppAttempt().getAppAttemptId()); |
| 1242 | + assertApps(scheduler, "b1", |
| 1243 | + appAttemptId, |
| 1244 | + app2.getCurrentAppAttempt().getAppAttemptId(), |
| 1245 | + app3.getCurrentAppAttempt().getAppAttemptId(), |
| 1246 | + app4.getCurrentAppAttempt().getAppAttemptId()); |
| 1247 | + |
| 1248 | + UsersManager umB1 = |
| 1249 | + (UsersManager) scheduler.getQueue("b1").getAbstractUsersManager(); |
| 1250 | + |
| 1251 | + assertEquals(2, umB1.getNumActiveUsers()); |
| 1252 | + assertEquals(2, umB1.getNumActiveUsersWithOnlyPendingApps()); |
| 1253 | + |
| 1254 | + rm.close(); |
| 1255 | + } |
| 1256 | + |
1105 | 1257 | @Test(timeout = 60000) |
1106 | 1258 | public void testMoveAttemptNotAdded() throws Exception { |
1107 | 1259 | Configuration conf = new Configuration(); |
|
0 commit comments