Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,13 @@ public class AppSchedulingInfo {
private final Map<String, String> applicationSchedulingEnvs = new HashMap<>();
private final RMContext rmContext;
private final int retryAttempts;
private boolean unmanagedAM;

public AppSchedulingInfo(ApplicationAttemptId appAttemptId, String user,
Queue queue, AbstractUsersManager abstractUsersManager, long epoch,
ResourceUsage appResourceUsage,
Map<String, String> applicationSchedulingEnvs, RMContext rmContext) {
Map<String, String> applicationSchedulingEnvs, RMContext rmContext,
boolean unmanagedAM) {
this.applicationAttemptId = appAttemptId;
this.applicationId = appAttemptId.getApplicationId();
this.queue = queue;
Expand All @@ -120,6 +122,7 @@ public AppSchedulingInfo(ApplicationAttemptId appAttemptId, String user,
this.retryAttempts = rmContext.getYarnConfiguration().getInt(
YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_RETRY_ATTEMPTS,
YarnConfiguration.DEFAULT_RM_PLACEMENT_CONSTRAINTS_RETRY_ATTEMPTS);
this.unmanagedAM = unmanagedAM;

ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
updateContext = new ContainerUpdateContext(this);
Expand Down Expand Up @@ -156,6 +159,14 @@ public boolean isPending() {
return pending;
}

public void setUnmanagedAM(boolean unmanagedAM) {
this.unmanagedAM = unmanagedAM;
}

public boolean isUnmanagedAM() {
return unmanagedAM;
}

public Set<String> getRequestedPartitions() {
return requestedPartitions;
}
Expand Down Expand Up @@ -617,8 +628,10 @@ public void move(Queue newQueue) {
ap.getPrimaryRequestedNodePartition(), delta);
}
}
oldMetrics.moveAppFrom(this);
newMetrics.moveAppTo(this);

oldMetrics.moveAppFrom(this, isUnmanagedAM());
newMetrics.moveAppTo(this, isUnmanagedAM());

abstractUsersManager.deactivateApplication(user, applicationId);
abstractUsersManager = newQueue.getAbstractUsersManager();
if (!schedulerKeys.isEmpty()) {
Expand Down Expand Up @@ -649,7 +662,8 @@ public void stop() {
ask.getCount()));
}
}
metrics.finishAppAttempt(applicationId, pending, user);

metrics.finishAppAttempt(applicationId, pending, user, unmanagedAM);

// Clear requests themselves
clearRequests();
Expand Down Expand Up @@ -695,7 +709,7 @@ public void recoverContainer(RMContainer rmContainer, String partition) {
// If there was any container to recover, the application was
// running from scheduler's POV.
pending = false;
metrics.runAppAttempt(applicationId, user);
metrics.runAppAttempt(applicationId, user, isUnmanagedAM());
}

// Container is completed. Skip recovering resources.
Expand Down Expand Up @@ -736,7 +750,7 @@ private void updateMetricsForAllocatedContainer(NodeType type,
// once an allocation is done we assume the application is
// running from scheduler's POV.
pending = false;
metrics.runAppAttempt(applicationId, user);
metrics.runAppAttempt(applicationId, user, isUnmanagedAM());
}

updateMetrics(applicationId, type, node, containerAllocated, user, queue);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,20 @@ public class QueueMetrics implements MetricsSource {
@Metric("# of apps completed") MutableCounterInt appsCompleted;
@Metric("# of apps killed") MutableCounterInt appsKilled;
@Metric("# of apps failed") MutableCounterInt appsFailed;

@Metric("# of Unmanaged apps submitted")
private MutableCounterInt unmanagedAppsSubmitted;
@Metric("# of Unmanaged running apps")
private MutableGaugeInt unmanagedAppsRunning;
@Metric("# of Unmanaged pending apps")
private MutableGaugeInt unmanagedAppsPending;
@Metric("# of Unmanaged apps completed")
private MutableCounterInt unmanagedAppsCompleted;
@Metric("# of Unmanaged apps killed")
private MutableCounterInt unmanagedAppsKilled;
@Metric("# of Unmanaged apps failed")
private MutableCounterInt unmanagedAppsFailed;

@Metric("Aggregate # of allocated node-local containers")
MutableCounterLong aggregateNodeLocalContainersAllocated;
@Metric("Aggregate # of allocated rack-local containers")
Expand Down Expand Up @@ -401,103 +415,158 @@ public void getMetrics(MetricsCollector collector, boolean all) {
registry.snapshot(collector.addRecord(registry.info()), all);
}

public void submitApp(String user) {
public void submitApp(String user, boolean unmanagedAM) {
appsSubmitted.incr();
if(unmanagedAM) {
unmanagedAppsSubmitted.incr();
}
QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) {
userMetrics.submitApp(user);
userMetrics.submitApp(user, unmanagedAM);
}
if (parent != null) {
parent.submitApp(user);
parent.submitApp(user, unmanagedAM);
}
}

public void submitAppAttempt(String user) {

public void submitAppAttempt(String user, boolean unmanagedAM) {
appsPending.incr();
if(unmanagedAM) {
unmanagedAppsPending.incr();
}
QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) {
userMetrics.submitAppAttempt(user);
userMetrics.submitAppAttempt(user, unmanagedAM);
}
if (parent != null) {
parent.submitAppAttempt(user);
parent.submitAppAttempt(user, unmanagedAM);
}
}

public void runAppAttempt(ApplicationId appId, String user) {
public void runAppAttempt(ApplicationId appId, String user,
boolean unmanagedAM) {
runBuckets.add(appId, System.currentTimeMillis());
appsRunning.incr();
appsPending.decr();

if(unmanagedAM) {
unmanagedAppsRunning.incr();
unmanagedAppsPending.decr();
}

QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) {
userMetrics.runAppAttempt(appId, user);
userMetrics.runAppAttempt(appId, user, unmanagedAM);
}
if (parent != null) {
parent.runAppAttempt(appId, user);
parent.runAppAttempt(appId, user, unmanagedAM);
}
}

public void finishAppAttempt(
ApplicationId appId, boolean isPending, String user) {
public void finishAppAttempt(ApplicationId appId, boolean isPending,
String user, boolean unmanagedAM) {
runBuckets.remove(appId);
if (isPending) {
appsPending.decr();
} else {
appsRunning.decr();
}

if(unmanagedAM) {
if (isPending) {
unmanagedAppsPending.decr();
} else {
unmanagedAppsRunning.decr();
}
}
QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) {
userMetrics.finishAppAttempt(appId, isPending, user);
userMetrics.finishAppAttempt(appId, isPending, user, unmanagedAM);
}
if (parent != null) {
parent.finishAppAttempt(appId, isPending, user);
parent.finishAppAttempt(appId, isPending, user, unmanagedAM);
}
}

public void finishApp(String user, RMAppState rmAppFinalState) {
public void finishApp(String user, RMAppState rmAppFinalState,
boolean unmanagedAM) {
switch (rmAppFinalState) {
case KILLED: appsKilled.incr(); break;
case FAILED: appsFailed.incr(); break;
default: appsCompleted.incr(); break;
}

if(unmanagedAM) {
switch (rmAppFinalState) {
case KILLED:
unmanagedAppsKilled.incr();
break;
case FAILED:
unmanagedAppsFailed.incr();
break;
default:
unmanagedAppsCompleted.incr();
break;
}
}

QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) {
userMetrics.finishApp(user, rmAppFinalState);
userMetrics.finishApp(user, rmAppFinalState, unmanagedAM);
}
if (parent != null) {
parent.finishApp(user, rmAppFinalState);
parent.finishApp(user, rmAppFinalState, unmanagedAM);
}
}

public void moveAppFrom(AppSchedulingInfo app) {


public void moveAppFrom(AppSchedulingInfo app, boolean unmanagedAM) {
if (app.isPending()) {
appsPending.decr();
} else {
appsRunning.decr();
}
if(unmanagedAM) {
if (app.isPending()) {
unmanagedAppsPending.decr();
} else {
unmanagedAppsRunning.decr();
}
}

QueueMetrics userMetrics = getUserMetrics(app.getUser());
if (userMetrics != null) {
userMetrics.moveAppFrom(app);
userMetrics.moveAppFrom(app, unmanagedAM);
}
if (parent != null) {
parent.moveAppFrom(app);
parent.moveAppFrom(app, unmanagedAM);
}
}
public void moveAppTo(AppSchedulingInfo app) {

public void moveAppTo(AppSchedulingInfo app, boolean unmanagedAM) {
if (app.isPending()) {
appsPending.incr();
} else {
appsRunning.incr();
}
if(unmanagedAM) {
if (app.isPending()) {
unmanagedAppsPending.incr();
} else {
unmanagedAppsRunning.incr();
}
}
QueueMetrics userMetrics = getUserMetrics(app.getUser());
if (userMetrics != null) {
userMetrics.moveAppTo(app);
userMetrics.moveAppTo(app, unmanagedAM);
}
if (parent != null) {
parent.moveAppTo(app);
parent.moveAppTo(app, unmanagedAM);
}
}


/**
* Set available resources. To be called by scheduler periodically as
* resources become available.
Expand Down Expand Up @@ -1024,18 +1093,34 @@ public int getAppsSubmitted() {
return appsSubmitted.value();
}

public int getUnmanagedAppsSubmitted() {
return unmanagedAppsSubmitted.value();
}

public int getAppsRunning() {
return appsRunning.value();
}

public int getUnmanagedAppsRunning() {
return unmanagedAppsRunning.value();
}

public int getAppsPending() {
return appsPending.value();
}

public int getUnmanagedAppsPending() {
return unmanagedAppsPending.value();
}

public int getAppsCompleted() {
return appsCompleted.value();
}

public int getUnmanagedAppsCompleted() {
return unmanagedAppsCompleted.value();
}

public int getAppsKilled() {
return appsKilled.value();
}
Expand All @@ -1044,6 +1129,10 @@ public int getAppsFailed() {
return appsFailed.value();
}

public int getUnmanagedAppsFailed() {
return unmanagedAppsFailed.value();
}

public Resource getAllocatedResources() {
if (queueMetricsForCustomResources != null) {
return Resource.newInstance(allocatedMB.value(), allocatedVCores.value(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,20 @@ public class SchedulerApplication<T extends SchedulerApplicationAttempt> {
private final String user;
private volatile T currentAttempt;
private volatile Priority priority;
private boolean unmanagedAM;

public SchedulerApplication(Queue queue, String user) {
public SchedulerApplication(Queue queue, String user, boolean unmanagedAM) {
this.queue = queue;
this.user = user;
this.unmanagedAM = unmanagedAM;
this.priority = null;
}

public SchedulerApplication(Queue queue, String user, Priority priority) {
public SchedulerApplication(Queue queue, String user, Priority priority,
boolean unmanagedAM) {
this.queue = queue;
this.user = user;
this.unmanagedAM = unmanagedAM;
this.priority = priority;
}

Expand All @@ -64,7 +68,7 @@ public void setCurrentAppAttempt(T currentAttempt) {
}

public void stop(RMAppState rmAppFinalState) {
queue.getMetrics().finishApp(user, rmAppFinalState);
queue.getMetrics().finishApp(user, rmAppFinalState, isUnmanagedAM());
}

public Priority getPriority() {
Expand All @@ -80,4 +84,7 @@ public void setPriority(Priority priority) {
}
}

public boolean isUnmanagedAM() {
return unmanagedAM;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ public SchedulerApplicationAttempt(ApplicationAttemptId applicationAttemptId,

this.appSchedulingInfo = new AppSchedulingInfo(applicationAttemptId, user,
queue, abstractUsersManager, rmContext.getEpoch(), attemptResourceUsage,
applicationSchedulingEnvs, rmContext);
applicationSchedulingEnvs, rmContext, unmanagedAM);
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
readLock = lock.readLock();
writeLock = lock.writeLock();
Expand Down
Loading