Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ public final class FederationStateStoreFacade {
private static final String GET_SUBCLUSTERS_CACHEID = "getSubClusters";
private static final String GET_POLICIES_CONFIGURATIONS_CACHEID =
"getPoliciesConfigurations";
private static final String GET_APPLICATION_HOME_SUBCLUSTER_CACHEID =
"getApplicationHomeSubCluster";

private static final FederationStateStoreFacade FACADE =
new FederationStateStoreFacade();
Expand Down Expand Up @@ -376,10 +378,23 @@ public void updateApplicationHomeSubCluster(
*/
public SubClusterId getApplicationHomeSubCluster(ApplicationId appId)
throws YarnException {
GetApplicationHomeSubClusterResponse response =
stateStore.getApplicationHomeSubCluster(
try {
if (isCachingEnabled()) {
Object value =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This fits in one line.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your help reviewing the code, I will fix it.

cache.get(buildGetApplicationHomeSubClusterRequest(appId.toString()));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add ApplicationId as the argument and tostring inside.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will modify the code.

if(value instanceof SubClusterId){
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Space missing

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your suggestion, I will fix it.

return (SubClusterId) value;
} else {
throw new YarnException("Cannot be converted to a map.");
}
} else {
GetApplicationHomeSubClusterResponse response = stateStore.getApplicationHomeSubCluster(
GetApplicationHomeSubClusterRequest.newInstance(appId));
return response.getApplicationHomeSubCluster().getHomeSubCluster();
return response.getApplicationHomeSubCluster().getHomeSubCluster();
}
} catch (Throwable ex) {
throw new YarnException(ex);
}
}

/**
Expand Down Expand Up @@ -513,6 +528,28 @@ public Map<String, SubClusterPolicyConfiguration> invoke(
return cacheRequest;
}

private Object buildGetApplicationHomeSubClusterRequest(String appId) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we cast it here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your suggestion, I will modify the code.

final String cacheKey = buildCacheKey(getClass().getSimpleName(),
GET_APPLICATION_HOME_SUBCLUSTER_CACHEID, appId);
CacheRequest<String, SubClusterId> cacheRequest =
new CacheRequest<>(
cacheKey,
input -> {
ApplicationId applicationId = ApplicationId.fromString(appId);

GetApplicationHomeSubClusterResponse response =
stateStore.getApplicationHomeSubCluster(
GetApplicationHomeSubClusterRequest.newInstance(applicationId));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extract the request and probably do indentation like:

    CacheRequest<String, SubClusterId> cacheRequest = new CacheRequest<>(
        cacheKey,

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will fix it.


ApplicationHomeSubCluster applicationHomeSubCluster =
response.getApplicationHomeSubCluster();
SubClusterId subClusterId = applicationHomeSubCluster.getHomeSubCluster();

return subClusterId;
});
return cacheRequest;
}

protected String buildCacheKey(String typeName, String methodName,
String argName) {
StringBuilder buffer = new StringBuilder();
Expand Down Expand Up @@ -560,7 +597,7 @@ private static class CacheRequest<K, V> {
private K key;
private Func<K, V> func;

public CacheRequest(K key, Func<K, V> func) {
CacheRequest(K key, Func<K, V> func) {
this.key = key;
this.func = func;
}
Expand Down Expand Up @@ -609,4 +646,14 @@ public boolean equals(Object obj) {
protected interface Func<T, TResult> {
TResult invoke(T input) throws Exception;
}

@VisibleForTesting
public Cache<Object, Object> getCache() {
return cache;
}

@VisibleForTesting
protected Object getAppHomeSubClusterCacheRequest(String appId) {
return buildGetApplicationHomeSubClusterRequest(appId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;

import javax.cache.Cache;

/**
* Unit tests for FederationStateStoreFacade.
*/
Expand All @@ -64,12 +66,14 @@ public static Collection<Boolean[]> getParameters() {
private FederationStateStoreTestUtil stateStoreTestUtil;
private FederationStateStoreFacade facade =
FederationStateStoreFacade.getInstance();
private Boolean isCachingEnabled;

public TestFederationStateStoreFacade(Boolean isCachingEnabled) {
conf = new Configuration();
if (!(isCachingEnabled.booleanValue())) {
conf.setInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS, 0);
}
this.isCachingEnabled = isCachingEnabled;
}

@Before
Expand Down Expand Up @@ -206,4 +210,27 @@ public void testAddApplicationHomeSubCluster() throws YarnException {
Assert.assertEquals(subClusterId1, result);
}

@Test
public void testGetApplicationHomeSubClusterCache() throws YarnException {
ApplicationId appId = ApplicationId.newInstance(clusterTs, numApps + 1);
SubClusterId subClusterId1 = SubClusterId.newInstance("Home1");

ApplicationHomeSubCluster appHomeSubCluster =
ApplicationHomeSubCluster.newInstance(appId, subClusterId1);
SubClusterId subClusterIdAdd =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One line?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will fix it.

facade.addApplicationHomeSubCluster(appHomeSubCluster);

SubClusterId subClusterIdByFacade = facade.getApplicationHomeSubCluster(appId);
Assert.assertEquals(subClusterIdByFacade, subClusterIdAdd);
Assert.assertEquals(subClusterId1, subClusterIdAdd);

if (isCachingEnabled.booleanValue()) {
Cache<Object, Object> cache = facade.getCache();
Object cacheKey = facade.getAppHomeSubClusterCacheRequest(appId.toString());
Object subClusterIdByCache = cache.get(cacheKey);
Assert.assertEquals(subClusterIdByFacade, subClusterIdByCache);
Assert.assertEquals(subClusterId1, subClusterIdByCache);
}
}

}