Skip to content

Commit cd44e91

Browse files
YARN-10476. Queue metrics for Unmanaged applications (#2674). Contributed by Cyrus Jackson
1 parent 4781761 commit cd44e91

18 files changed

Lines changed: 445 additions & 113 deletions

File tree

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -102,11 +102,13 @@ public class AppSchedulingInfo {
102102
private final Map<String, String> applicationSchedulingEnvs = new HashMap<>();
103103
private final RMContext rmContext;
104104
private final int retryAttempts;
105+
private boolean unmanagedAM;
105106

106107
public AppSchedulingInfo(ApplicationAttemptId appAttemptId, String user,
107108
Queue queue, AbstractUsersManager abstractUsersManager, long epoch,
108109
ResourceUsage appResourceUsage,
109-
Map<String, String> applicationSchedulingEnvs, RMContext rmContext) {
110+
Map<String, String> applicationSchedulingEnvs, RMContext rmContext,
111+
boolean unmanagedAM) {
110112
this.applicationAttemptId = appAttemptId;
111113
this.applicationId = appAttemptId.getApplicationId();
112114
this.queue = queue;
@@ -120,6 +122,7 @@ public AppSchedulingInfo(ApplicationAttemptId appAttemptId, String user,
120122
this.retryAttempts = rmContext.getYarnConfiguration().getInt(
121123
YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_RETRY_ATTEMPTS,
122124
YarnConfiguration.DEFAULT_RM_PLACEMENT_CONSTRAINTS_RETRY_ATTEMPTS);
125+
this.unmanagedAM = unmanagedAM;
123126

124127
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
125128
updateContext = new ContainerUpdateContext(this);
@@ -156,6 +159,14 @@ public boolean isPending() {
156159
return pending;
157160
}
158161

162+
public void setUnmanagedAM(boolean unmanagedAM) {
163+
this.unmanagedAM = unmanagedAM;
164+
}
165+
166+
public boolean isUnmanagedAM() {
167+
return unmanagedAM;
168+
}
169+
159170
public Set<String> getRequestedPartitions() {
160171
return requestedPartitions;
161172
}
@@ -617,8 +628,10 @@ public void move(Queue newQueue) {
617628
ap.getPrimaryRequestedNodePartition(), delta);
618629
}
619630
}
620-
oldMetrics.moveAppFrom(this);
621-
newMetrics.moveAppTo(this);
631+
632+
oldMetrics.moveAppFrom(this, isUnmanagedAM());
633+
newMetrics.moveAppTo(this, isUnmanagedAM());
634+
622635
abstractUsersManager.deactivateApplication(user, applicationId);
623636
abstractUsersManager = newQueue.getAbstractUsersManager();
624637
if (!schedulerKeys.isEmpty()) {
@@ -649,7 +662,8 @@ public void stop() {
649662
ask.getCount()));
650663
}
651664
}
652-
metrics.finishAppAttempt(applicationId, pending, user);
665+
666+
metrics.finishAppAttempt(applicationId, pending, user, unmanagedAM);
653667

654668
// Clear requests themselves
655669
clearRequests();
@@ -695,7 +709,7 @@ public void recoverContainer(RMContainer rmContainer, String partition) {
695709
// If there was any container to recover, the application was
696710
// running from scheduler's POV.
697711
pending = false;
698-
metrics.runAppAttempt(applicationId, user);
712+
metrics.runAppAttempt(applicationId, user, isUnmanagedAM());
699713
}
700714

701715
// Container is completed. Skip recovering resources.
@@ -736,7 +750,7 @@ private void updateMetricsForAllocatedContainer(NodeType type,
736750
// once an allocation is done we assume the application is
737751
// running from scheduler's POV.
738752
pending = false;
739-
metrics.runAppAttempt(applicationId, user);
753+
metrics.runAppAttempt(applicationId, user, isUnmanagedAM());
740754
}
741755

742756
updateMetrics(applicationId, type, node, containerAllocated, user, queue);

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java

Lines changed: 113 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,20 @@ public class QueueMetrics implements MetricsSource {
6262
@Metric("# of apps completed") MutableCounterInt appsCompleted;
6363
@Metric("# of apps killed") MutableCounterInt appsKilled;
6464
@Metric("# of apps failed") MutableCounterInt appsFailed;
65+
66+
@Metric("# of Unmanaged apps submitted")
67+
private MutableCounterInt unmanagedAppsSubmitted;
68+
@Metric("# of Unmanaged running apps")
69+
private MutableGaugeInt unmanagedAppsRunning;
70+
@Metric("# of Unmanaged pending apps")
71+
private MutableGaugeInt unmanagedAppsPending;
72+
@Metric("# of Unmanaged apps completed")
73+
private MutableCounterInt unmanagedAppsCompleted;
74+
@Metric("# of Unmanaged apps killed")
75+
private MutableCounterInt unmanagedAppsKilled;
76+
@Metric("# of Unmanaged apps failed")
77+
private MutableCounterInt unmanagedAppsFailed;
78+
6579
@Metric("Aggregate # of allocated node-local containers")
6680
MutableCounterLong aggregateNodeLocalContainersAllocated;
6781
@Metric("Aggregate # of allocated rack-local containers")
@@ -401,103 +415,158 @@ public void getMetrics(MetricsCollector collector, boolean all) {
401415
registry.snapshot(collector.addRecord(registry.info()), all);
402416
}
403417

404-
public void submitApp(String user) {
418+
public void submitApp(String user, boolean unmanagedAM) {
405419
appsSubmitted.incr();
420+
if(unmanagedAM) {
421+
unmanagedAppsSubmitted.incr();
422+
}
406423
QueueMetrics userMetrics = getUserMetrics(user);
407424
if (userMetrics != null) {
408-
userMetrics.submitApp(user);
425+
userMetrics.submitApp(user, unmanagedAM);
409426
}
410427
if (parent != null) {
411-
parent.submitApp(user);
428+
parent.submitApp(user, unmanagedAM);
412429
}
413430
}
414431

415-
public void submitAppAttempt(String user) {
432+
433+
public void submitAppAttempt(String user, boolean unmanagedAM) {
416434
appsPending.incr();
435+
if(unmanagedAM) {
436+
unmanagedAppsPending.incr();
437+
}
417438
QueueMetrics userMetrics = getUserMetrics(user);
418439
if (userMetrics != null) {
419-
userMetrics.submitAppAttempt(user);
440+
userMetrics.submitAppAttempt(user, unmanagedAM);
420441
}
421442
if (parent != null) {
422-
parent.submitAppAttempt(user);
443+
parent.submitAppAttempt(user, unmanagedAM);
423444
}
424445
}
425446

426-
public void runAppAttempt(ApplicationId appId, String user) {
447+
public void runAppAttempt(ApplicationId appId, String user,
448+
boolean unmanagedAM) {
427449
runBuckets.add(appId, System.currentTimeMillis());
428450
appsRunning.incr();
429451
appsPending.decr();
452+
453+
if(unmanagedAM) {
454+
unmanagedAppsRunning.incr();
455+
unmanagedAppsPending.decr();
456+
}
457+
430458
QueueMetrics userMetrics = getUserMetrics(user);
431459
if (userMetrics != null) {
432-
userMetrics.runAppAttempt(appId, user);
460+
userMetrics.runAppAttempt(appId, user, unmanagedAM);
433461
}
434462
if (parent != null) {
435-
parent.runAppAttempt(appId, user);
463+
parent.runAppAttempt(appId, user, unmanagedAM);
436464
}
437465
}
438466

439-
public void finishAppAttempt(
440-
ApplicationId appId, boolean isPending, String user) {
467+
public void finishAppAttempt(ApplicationId appId, boolean isPending,
468+
String user, boolean unmanagedAM) {
441469
runBuckets.remove(appId);
442470
if (isPending) {
443471
appsPending.decr();
444472
} else {
445473
appsRunning.decr();
446474
}
475+
476+
if(unmanagedAM) {
477+
if (isPending) {
478+
unmanagedAppsPending.decr();
479+
} else {
480+
unmanagedAppsRunning.decr();
481+
}
482+
}
447483
QueueMetrics userMetrics = getUserMetrics(user);
448484
if (userMetrics != null) {
449-
userMetrics.finishAppAttempt(appId, isPending, user);
485+
userMetrics.finishAppAttempt(appId, isPending, user, unmanagedAM);
450486
}
451487
if (parent != null) {
452-
parent.finishAppAttempt(appId, isPending, user);
488+
parent.finishAppAttempt(appId, isPending, user, unmanagedAM);
453489
}
454490
}
455491

456-
public void finishApp(String user, RMAppState rmAppFinalState) {
492+
public void finishApp(String user, RMAppState rmAppFinalState,
493+
boolean unmanagedAM) {
457494
switch (rmAppFinalState) {
458495
case KILLED: appsKilled.incr(); break;
459496
case FAILED: appsFailed.incr(); break;
460497
default: appsCompleted.incr(); break;
461498
}
499+
500+
if(unmanagedAM) {
501+
switch (rmAppFinalState) {
502+
case KILLED:
503+
unmanagedAppsKilled.incr();
504+
break;
505+
case FAILED:
506+
unmanagedAppsFailed.incr();
507+
break;
508+
default:
509+
unmanagedAppsCompleted.incr();
510+
break;
511+
}
512+
}
513+
462514
QueueMetrics userMetrics = getUserMetrics(user);
463515
if (userMetrics != null) {
464-
userMetrics.finishApp(user, rmAppFinalState);
516+
userMetrics.finishApp(user, rmAppFinalState, unmanagedAM);
465517
}
466518
if (parent != null) {
467-
parent.finishApp(user, rmAppFinalState);
519+
parent.finishApp(user, rmAppFinalState, unmanagedAM);
468520
}
469521
}
470-
471-
public void moveAppFrom(AppSchedulingInfo app) {
522+
523+
524+
public void moveAppFrom(AppSchedulingInfo app, boolean unmanagedAM) {
472525
if (app.isPending()) {
473526
appsPending.decr();
474527
} else {
475528
appsRunning.decr();
476529
}
530+
if(unmanagedAM) {
531+
if (app.isPending()) {
532+
unmanagedAppsPending.decr();
533+
} else {
534+
unmanagedAppsRunning.decr();
535+
}
536+
}
537+
477538
QueueMetrics userMetrics = getUserMetrics(app.getUser());
478539
if (userMetrics != null) {
479-
userMetrics.moveAppFrom(app);
540+
userMetrics.moveAppFrom(app, unmanagedAM);
480541
}
481542
if (parent != null) {
482-
parent.moveAppFrom(app);
543+
parent.moveAppFrom(app, unmanagedAM);
483544
}
484545
}
485-
486-
public void moveAppTo(AppSchedulingInfo app) {
546+
547+
public void moveAppTo(AppSchedulingInfo app, boolean unmanagedAM) {
487548
if (app.isPending()) {
488549
appsPending.incr();
489550
} else {
490551
appsRunning.incr();
491552
}
553+
if(unmanagedAM) {
554+
if (app.isPending()) {
555+
unmanagedAppsPending.incr();
556+
} else {
557+
unmanagedAppsRunning.incr();
558+
}
559+
}
492560
QueueMetrics userMetrics = getUserMetrics(app.getUser());
493561
if (userMetrics != null) {
494-
userMetrics.moveAppTo(app);
562+
userMetrics.moveAppTo(app, unmanagedAM);
495563
}
496564
if (parent != null) {
497-
parent.moveAppTo(app);
565+
parent.moveAppTo(app, unmanagedAM);
498566
}
499567
}
500568

569+
501570
/**
502571
* Set available resources. To be called by scheduler periodically as
503572
* resources become available.
@@ -1024,18 +1093,34 @@ public int getAppsSubmitted() {
10241093
return appsSubmitted.value();
10251094
}
10261095

1096+
public int getUnmanagedAppsSubmitted() {
1097+
return unmanagedAppsSubmitted.value();
1098+
}
1099+
10271100
public int getAppsRunning() {
10281101
return appsRunning.value();
10291102
}
10301103

1104+
public int getUnmanagedAppsRunning() {
1105+
return unmanagedAppsRunning.value();
1106+
}
1107+
10311108
public int getAppsPending() {
10321109
return appsPending.value();
10331110
}
10341111

1112+
public int getUnmanagedAppsPending() {
1113+
return unmanagedAppsPending.value();
1114+
}
1115+
10351116
public int getAppsCompleted() {
10361117
return appsCompleted.value();
10371118
}
10381119

1120+
public int getUnmanagedAppsCompleted() {
1121+
return unmanagedAppsCompleted.value();
1122+
}
1123+
10391124
public int getAppsKilled() {
10401125
return appsKilled.value();
10411126
}
@@ -1044,6 +1129,10 @@ public int getAppsFailed() {
10441129
return appsFailed.value();
10451130
}
10461131

1132+
public int getUnmanagedAppsFailed() {
1133+
return unmanagedAppsFailed.value();
1134+
}
1135+
10471136
public Resource getAllocatedResources() {
10481137
if (queueMetricsForCustomResources != null) {
10491138
return Resource.newInstance(allocatedMB.value(), allocatedVCores.value(),

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,16 +30,20 @@ public class SchedulerApplication<T extends SchedulerApplicationAttempt> {
3030
private final String user;
3131
private volatile T currentAttempt;
3232
private volatile Priority priority;
33+
private boolean unmanagedAM;
3334

34-
public SchedulerApplication(Queue queue, String user) {
35+
public SchedulerApplication(Queue queue, String user, boolean unmanagedAM) {
3536
this.queue = queue;
3637
this.user = user;
38+
this.unmanagedAM = unmanagedAM;
3739
this.priority = null;
3840
}
3941

40-
public SchedulerApplication(Queue queue, String user, Priority priority) {
42+
public SchedulerApplication(Queue queue, String user, Priority priority,
43+
boolean unmanagedAM) {
4144
this.queue = queue;
4245
this.user = user;
46+
this.unmanagedAM = unmanagedAM;
4347
this.priority = priority;
4448
}
4549

@@ -64,7 +68,7 @@ public void setCurrentAppAttempt(T currentAttempt) {
6468
}
6569

6670
public void stop(RMAppState rmAppFinalState) {
67-
queue.getMetrics().finishApp(user, rmAppFinalState);
71+
queue.getMetrics().finishApp(user, rmAppFinalState, isUnmanagedAM());
6872
}
6973

7074
public Priority getPriority() {
@@ -80,4 +84,7 @@ public void setPriority(Priority priority) {
8084
}
8185
}
8286

87+
public boolean isUnmanagedAM() {
88+
return unmanagedAM;
89+
}
8390
}

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -241,7 +241,7 @@ public SchedulerApplicationAttempt(ApplicationAttemptId applicationAttemptId,
241241

242242
this.appSchedulingInfo = new AppSchedulingInfo(applicationAttemptId, user,
243243
queue, abstractUsersManager, rmContext.getEpoch(), attemptResourceUsage,
244-
applicationSchedulingEnvs, rmContext);
244+
applicationSchedulingEnvs, rmContext, unmanagedAM);
245245
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
246246
readLock = lock.readLock();
247247
writeLock = lock.writeLock();

0 commit comments

Comments
 (0)