Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,12 @@ public final class RouterMetrics {
private MutableGaugeInt numListReservationsFailedRetrieved;
@Metric("# of getResourceTypeInfo failed to be retrieved")
private MutableGaugeInt numGetResourceTypeInfo;
@Metric("# of failApplicationAttempt failed to be retrieved")
private MutableGaugeInt numFailAppAttemptFailedRetrieved;
@Metric("# of updateApplicationPriority failed to be retrieved")
private MutableGaugeInt numUpdateAppPriorityFailedRetrieved;
@Metric("# of updateApplicationPriority failed to be retrieved")
private MutableGaugeInt numUpdateAppTimeoutsFailedRetrieved;

// Aggregate metrics are shared, and don't have to be looked up per call
@Metric("Total number of successful Submitted apps and latency(ms)")
Expand Down Expand Up @@ -114,6 +120,12 @@ public final class RouterMetrics {
private MutableRate totalSucceededListReservationsRetrieved;
@Metric("Total number of successful Retrieved getResourceTypeInfo and latency(ms)")
private MutableRate totalSucceededGetResourceTypeInfoRetrieved;
@Metric("Total number of successful Retrieved failApplicationAttempt and latency(ms)")
private MutableRate totalSucceededFailAppAttemptRetrieved;
@Metric("Total number of successful Retrieved updateApplicationPriority and latency(ms)")
private MutableRate totalSucceededUpdateAppPriorityRetrieved;
@Metric("Total number of successful Retrieved updateApplicationTimeouts and latency(ms)")
private MutableRate totalSucceededUpdateAppTimeoutsRetrieved;

/**
* Provide quantile counters for all latencies.
Expand All @@ -135,8 +147,11 @@ public final class RouterMetrics {
private MutableQuantiles getContainerLatency;
private MutableQuantiles listReservationsLatency;
private MutableQuantiles listResourceTypeInfoLatency;
private MutableQuantiles failAppAttemptLatency;
private MutableQuantiles updateAppPriorityLatency;
private MutableQuantiles updateAppTimeoutsLatency;

private static volatile RouterMetrics INSTANCE = null;
private static volatile RouterMetrics instance = null;
private static MetricsRegistry registry;

private RouterMetrics() {
Expand Down Expand Up @@ -201,25 +216,37 @@ private RouterMetrics() {
listResourceTypeInfoLatency =
registry.newQuantiles("getResourceTypeInfoLatency",
"latency of get resource type info", "ops", "latency", 10);

failAppAttemptLatency =
registry.newQuantiles("failApplicationAttemptLatency",
"latency of fail application attempt", "ops", "latency", 10);

updateAppPriorityLatency =
registry.newQuantiles("updateApplicationPriorityLatency",
"latency of update application priority", "ops", "latency", 10);

updateAppTimeoutsLatency =
registry.newQuantiles("updateApplicationTimeoutsLatency",
"latency of update application timeouts", "ops", "latency", 10);
}

public static RouterMetrics getMetrics() {
if (!isInitialized.get()) {
synchronized (RouterMetrics.class) {
if (INSTANCE == null) {
INSTANCE = DefaultMetricsSystem.instance().register("RouterMetrics",
if (instance == null) {
instance = DefaultMetricsSystem.instance().register("RouterMetrics",
"Metrics for the Yarn Router", new RouterMetrics());
isInitialized.set(true);
}
}
}
return INSTANCE;
return instance;
}

@VisibleForTesting
synchronized static void destroy() {
isInitialized.set(false);
INSTANCE = null;
instance = null;
}

@VisibleForTesting
Expand Down Expand Up @@ -307,6 +334,21 @@ public long getNumSucceededGetResourceTypeInfoRetrieved() {
return totalSucceededGetResourceTypeInfoRetrieved.lastStat().numSamples();
}

@VisibleForTesting
public long getNumSucceededFailAppAttemptRetrieved() {
return totalSucceededFailAppAttemptRetrieved.lastStat().numSamples();
}

@VisibleForTesting
public long getNumSucceededUpdateAppPriorityRetrieved() {
return totalSucceededUpdateAppPriorityRetrieved.lastStat().numSamples();
}

@VisibleForTesting
public long getNumSucceededUpdateAppTimeoutsRetrieved() {
return totalSucceededUpdateAppTimeoutsRetrieved.lastStat().numSamples();
}

@VisibleForTesting
public double getLatencySucceededAppsCreated() {
return totalSucceededAppsCreated.lastStat().mean();
Expand Down Expand Up @@ -392,6 +434,21 @@ public double getLatencySucceededGetResourceTypeInfoRetrieved() {
return totalSucceededGetResourceTypeInfoRetrieved.lastStat().mean();
}

@VisibleForTesting
public double getLatencySucceededFailAppAttemptRetrieved() {
return totalSucceededFailAppAttemptRetrieved.lastStat().mean();
}

@VisibleForTesting
public double getLatencySucceededUpdateAppPriorityRetrieved() {
return totalSucceededUpdateAppPriorityRetrieved.lastStat().mean();
}

@VisibleForTesting
public double getLatencySucceededUpdateAppTimeoutsRetrieved() {
return totalSucceededUpdateAppTimeoutsRetrieved.lastStat().mean();
}

@VisibleForTesting
public int getAppsFailedCreated() {
return numAppsFailedCreated.value();
Expand Down Expand Up @@ -477,6 +534,21 @@ public int getGetResourceTypeInfoRetrieved() {
return numGetResourceTypeInfo.value();
}

@VisibleForTesting
public int getFailApplicationAttemptFailedRetrieved() {
return numFailAppAttemptFailedRetrieved.value();
}

@VisibleForTesting
public int getUpdateApplicationPriorityFailedRetrieved() {
return numUpdateAppPriorityFailedRetrieved.value();
}

@VisibleForTesting
public int getUpdateApplicationTimeoutsFailedRetrieved() {
return numUpdateAppTimeoutsFailedRetrieved.value();
}

public void succeededAppsCreated(long duration) {
totalSucceededAppsCreated.add(duration);
getNewApplicationLatency.add(duration);
Expand Down Expand Up @@ -562,6 +634,21 @@ public void succeededGetResourceTypeInfoRetrieved(long duration) {
listResourceTypeInfoLatency.add(duration);
}

public void succeededFailAppAttemptRetrieved(long duration) {
totalSucceededFailAppAttemptRetrieved.add(duration);
failAppAttemptLatency.add(duration);
}

public void succeededUpdateAppPriorityRetrieved(long duration) {
totalSucceededUpdateAppPriorityRetrieved.add(duration);
updateAppPriorityLatency.add(duration);
}

public void succeededUpdateAppTimeoutsRetrieved(long duration) {
totalSucceededUpdateAppTimeoutsRetrieved.add(duration);
updateAppTimeoutsLatency.add(duration);
}

public void incrAppsFailedCreated() {
numAppsFailedCreated.incr();
}
Expand Down Expand Up @@ -629,4 +716,16 @@ public void incrListReservationsFailedRetrieved() {
public void incrResourceTypeInfoFailedRetrieved() {
numGetResourceTypeInfo.incr();
}

public void incrFailAppAttemptFailedRetrieved() {
numFailAppAttemptFailedRetrieved.incr();
}

public void incrUpdateAppPriorityFailedRetrieved() {
numUpdateAppPriorityFailedRetrieved.incr();
}

public void incrUpdateApplicationTimeoutsRetrieved() {
numUpdateAppTimeoutsFailedRetrieved.incr();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1213,14 +1213,92 @@ public CancelDelegationTokenResponse cancelDelegationToken(
@Override
public FailApplicationAttemptResponse failApplicationAttempt(
FailApplicationAttemptRequest request) throws YarnException, IOException {
throw new NotImplementedException("Code is not implemented");
if (request == null || request.getApplicationAttemptId() == null
|| request.getApplicationAttemptId().getApplicationId() == null) {
routerMetrics.incrFailAppAttemptFailedRetrieved();
RouterServerUtil.logAndThrowException(
"Missing failApplicationAttempt request or applicationId " +
"or applicationAttemptId information.", null);
}
long startTime = clock.getTime();
SubClusterId subClusterId = null;
ApplicationId applicationId = request.getApplicationAttemptId().getApplicationId();

try {
subClusterId = getApplicationHomeSubCluster(applicationId);
} catch (YarnException e) {
routerMetrics.incrFailAppAttemptFailedRetrieved();
RouterServerUtil.logAndThrowException("ApplicationAttempt " +
request.getApplicationAttemptId() + " belongs to Application " +
request.getApplicationAttemptId().getApplicationId() +
" does not exist in FederationStateStore.", e);
}

ApplicationClientProtocol clientRMProxy = getClientRMProxyForSubCluster(subClusterId);
FailApplicationAttemptResponse response = null;
try {
response = clientRMProxy.failApplicationAttempt(request);
} catch (Exception e) {
routerMetrics.incrFailAppAttemptFailedRetrieved();
RouterServerUtil.logAndThrowException("Unable to get the applicationAttempt report for " +
request.getApplicationAttemptId() + " to SubCluster " + subClusterId.getId(), e);
}

if (response == null) {
LOG.error("No response when attempting to retrieve the report of " +
"the applicationAttempt {} to SubCluster {}.",
request.getApplicationAttemptId(), subClusterId.getId());
}

long stopTime = clock.getTime();
routerMetrics.succeededFailAppAttemptRetrieved(stopTime - startTime);
return response;
}

@Override
public UpdateApplicationPriorityResponse updateApplicationPriority(
UpdateApplicationPriorityRequest request)
throws YarnException, IOException {
throw new NotImplementedException("Code is not implemented");
if (request == null || request.getApplicationId() == null
|| request.getApplicationPriority() == null) {
routerMetrics.incrUpdateAppPriorityFailedRetrieved();
RouterServerUtil.logAndThrowException(
"Missing updateApplicationPriority request or applicationId " +
"or applicationPriority information.", null);
}

long startTime = clock.getTime();
SubClusterId subClusterId = null;
ApplicationId applicationId = request.getApplicationId();

try {
subClusterId = getApplicationHomeSubCluster(applicationId);
} catch (YarnException e) {
routerMetrics.incrUpdateAppPriorityFailedRetrieved();
RouterServerUtil.logAndThrowException("Application " +
request.getApplicationId() +
" does not exist in FederationStateStore.", e);
}

ApplicationClientProtocol clientRMProxy = getClientRMProxyForSubCluster(subClusterId);
UpdateApplicationPriorityResponse response = null;
try {
response = clientRMProxy.updateApplicationPriority(request);
} catch (Exception e) {
routerMetrics.incrFailAppAttemptFailedRetrieved();
RouterServerUtil.logAndThrowException("Unable to update application priority for " +
request.getApplicationId() + " to SubCluster " + subClusterId.getId(), e);
}

if (response == null) {
LOG.error("No response when update application priority of " +
"the applicationId {} to SubCluster {}.",
applicationId, subClusterId.getId());
}

long stopTime = clock.getTime();
routerMetrics.succeededUpdateAppPriorityRetrieved(stopTime - startTime);
return response;
}

@Override
Expand All @@ -1233,7 +1311,45 @@ public SignalContainerResponse signalToContainer(
public UpdateApplicationTimeoutsResponse updateApplicationTimeouts(
UpdateApplicationTimeoutsRequest request)
throws YarnException, IOException {
throw new NotImplementedException("Code is not implemented");
if (request == null || request.getApplicationId() == null
|| request.getApplicationTimeouts() == null) {
routerMetrics.incrUpdateApplicationTimeoutsRetrieved();
RouterServerUtil.logAndThrowException(
"Missing updateApplicationTimeouts request or applicationId " +
"or applicationTimeouts information.", null);
}

long startTime = clock.getTime();
SubClusterId subClusterId = null;
ApplicationId applicationId = request.getApplicationId();
try {
subClusterId = getApplicationHomeSubCluster(applicationId);
} catch (YarnException e) {
routerMetrics.incrFailAppAttemptFailedRetrieved();
RouterServerUtil.logAndThrowException("Application " +
request.getApplicationId() +
" does not exist in FederationStateStore.", e);
}

ApplicationClientProtocol clientRMProxy = getClientRMProxyForSubCluster(subClusterId);
UpdateApplicationTimeoutsResponse response = null;
try {
response = clientRMProxy.updateApplicationTimeouts(request);
} catch (Exception e) {
routerMetrics.incrFailAppAttemptFailedRetrieved();
RouterServerUtil.logAndThrowException("Unable to update application timeout for " +
request.getApplicationId() + " to SubCluster " + subClusterId.getId(), e);
}

if (response == null) {
LOG.error("No response when update application timeout of " +
"the applicationId {} to SubCluster {}.",
applicationId, subClusterId.getId());
}

long stopTime = clock.getTime();
routerMetrics.succeededUpdateAppTimeoutsRetrieved(stopTime - startTime);
return response;
}

@Override
Expand Down
Loading