Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4267,7 +4267,7 @@ public static boolean isAclEnabled(Configuration conf) {
*/
public static final String ROUTER_CLIENTRM_SUBMIT_RETRY =
ROUTER_PREFIX + "submit.retry";
public static final int DEFAULT_ROUTER_CLIENTRM_SUBMIT_RETRY = 3;
public static final int DEFAULT_ROUTER_CLIENTRM_SUBMIT_RETRY = 1;

/**
* GetNewApplication and SubmitApplication request retry interval time.
Expand Down Expand Up @@ -4367,7 +4367,7 @@ public static boolean isAclEnabled(Configuration conf) {
* the default value is 0s.
*/
public static final long DEFAULT_ROUTER_USER_CLIENT_THREAD_POOL_KEEP_ALIVE_TIME =
TimeUnit.SECONDS.toMillis(0); // 0s
TimeUnit.SECONDS.toMillis(30); // 0s
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment needs to be updated.
Also not sure what's the agreement on changing default values.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for discovering this issue! It was my oversight, and I introduced this problem while conducting integration testing. I will fix it.


/** The address of the Router web application. */
public static final String ROUTER_WEBAPP_ADDRESS =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5132,10 +5132,10 @@

<property>
<name>yarn.router.interceptor.user-thread-pool.keep-alive-time</name>
<value>0s</value>
<value>30s</value>
<description>
This configurable is used to set the keepAliveTime of the thread pool of the interceptor.
Default is 0s.
Default is 30s.
</description>
</property>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.hadoop.yarn.server.globalpolicygenerator.applicationcleaner;

import java.util.HashSet;
import java.util.List;
import java.util.Set;

import org.apache.commons.lang3.time.DurationFormatUtils;
Expand Down Expand Up @@ -95,6 +94,10 @@ public GPGContext getGPGContext() {
return this.gpgContext;
}

public FederationRegistryClient getRegistryClient() {
return this.registryClient;
}

/**
* Query router for applications.
*
Expand Down Expand Up @@ -152,18 +155,6 @@ public Set<ApplicationId> getRouterKnownApplications() throws YarnException {
+ " success Router queries after " + totalAttemptCount + " retries");
}

protected void cleanupAppRecordInRegistry(Set<ApplicationId> knownApps) {
List<String> allApps = this.registryClient.getAllApplications();
LOG.info("Got {} existing apps in registry.", allApps.size());
for (String app : allApps) {
ApplicationId appId = ApplicationId.fromString(app);
if (!knownApps.contains(appId)) {
LOG.info("removing finished application entry for {}", app);
this.registryClient.removeAppFromRegistry(appId, true);
}
}
}

@Override
public abstract void run();
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Set;
import java.util.stream.Collectors;

import org.apache.hadoop.util.Sets;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
Expand All @@ -45,33 +46,49 @@ public void run() {
LOG.info("Application cleaner run at time {}", now);

FederationStateStoreFacade facade = getGPGContext().getStateStoreFacade();
Set<ApplicationId> candidates = new HashSet<>();
try {
// Get the candidate list from StateStore before calling router
Set<ApplicationId> allStateStoreApps = new HashSet<>();
List<ApplicationHomeSubCluster> response =
facade.getApplicationsHomeSubCluster();
for (ApplicationHomeSubCluster app : response) {
candidates.add(app.getApplicationId());
allStateStoreApps.add(app.getApplicationId());
}
LOG.info("{} app entries in FederationStateStore", candidates.size());
LOG.info("{} app entries in FederationStateStore", allStateStoreApps.size());

// Get the candidate list from Registry before calling router
List<String> allRegistryApps = getRegistryClient().getAllApplications();
LOG.info("{} app entries in FederationRegistry", allStateStoreApps.size());

// Get the list of known apps from Router
Set<ApplicationId> routerApps = getRouterKnownApplications();
LOG.info("{} known applications from Router", routerApps.size());

candidates.removeAll(routerApps);
LOG.info("Deleting {} applications from statestore", candidates.size());
if (LOG.isDebugEnabled()) {
LOG.debug("Apps to delete: {}.", candidates.stream().map(Object::toString)
.collect(Collectors.joining(",")));
}
for (ApplicationId appId : candidates) {
// Clean up StateStore entries
Set<ApplicationId> toDelete =
Sets.difference(allStateStoreApps, routerApps);

LOG.info("Deleting {} applications from statestore", toDelete.size());
LOG.debug("Apps to delete: {}.",
toDelete.stream().map(Object::toString).collect(Collectors.joining(",")));

for (ApplicationId appId : toDelete) {
try {
LOG.debug("Deleting {} from statestore ", appId);
facade.deleteApplicationHomeSubCluster(appId);
} catch (Exception e) {
LOG.error("deleteApplicationHomeSubCluster failed at application {}.", appId, e);
}
}
// Clean up registry entries
cleanupAppRecordInRegistry(routerApps);

// Clean up Registry entries
for (String app : allRegistryApps) {
ApplicationId appId = ApplicationId.fromString(app);
if (!routerApps.contains(appId)) {
LOG.debug("removing finished application entry for {}", app);
getRegistryClient().removeAppFromRegistry(appId, true);
}
}
} catch (Throwable e) {
LOG.error("Application cleaner started at time {} fails. ", now, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.utils.FederationRegistryClient;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGContext;
Expand All @@ -63,6 +64,8 @@ public class TestDefaultApplicationCleaner {
// The list of applications returned by mocked router
private Set<ApplicationId> routerAppIds;

private ApplicationId appIdToAddConcurrently;

@Before
public void setup() throws Exception {
conf = new YarnConfiguration();
Expand Down Expand Up @@ -111,6 +114,7 @@ public void setup() throws Exception {
new Token<AMRMTokenIdentifier>());
}
Assert.assertEquals(3, registryClient.getAllApplications().size());
appIdToAddConcurrently = null;
}

@After
Expand Down Expand Up @@ -159,7 +163,39 @@ public class TestableDefaultApplicationCleaner
extends DefaultApplicationCleaner {
@Override
public Set<ApplicationId> getAppsFromRouter() throws YarnRuntimeException {
if (appIdToAddConcurrently != null) {
SubClusterId scId = SubClusterId.newInstance("MySubClusterId");
try {
ApplicationHomeSubCluster appHomeSubCluster =
ApplicationHomeSubCluster.newInstance(appIdToAddConcurrently, scId);
AddApplicationHomeSubClusterRequest request =
AddApplicationHomeSubClusterRequest.newInstance(appHomeSubCluster);
stateStore.addApplicationHomeSubCluster(request);
} catch (YarnException e) {
throw new YarnRuntimeException(e);
}
registryClient.writeAMRMTokenForUAM(appIdToAddConcurrently, scId.toString(),
new Token<>());
}
return routerAppIds;
}
}

@Test
public void testConcurrentNewApp() throws YarnException {
appIdToAddConcurrently = ApplicationId.newInstance(1, 1);

appCleaner.run();

// The concurrently added app should be still there
GetApplicationsHomeSubClusterRequest appHomeSubClusterRequest =
GetApplicationsHomeSubClusterRequest.newInstance();
GetApplicationsHomeSubClusterResponse applicationsHomeSubCluster =
stateStore.getApplicationsHomeSubCluster(appHomeSubClusterRequest);
int size = applicationsHomeSubCluster.getAppsHomeSubClusters().size();
Assert.assertEquals(1, size);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For consistency, either extract both or none.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will improve it.


// The concurrently added app should be still there
Assert.assertEquals(1, registryClient.getAllApplications().size());
}
}