-
Notifications
You must be signed in to change notification settings - Fork 9.2k
YARN-10885. Make FederationStateStoreFacade#getApplicationHomeSubCluster use JCache. #4701
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 8 commits
d45e599
6ad9607
1454a06
0c9d648
12d77ad
0eefb19
f6096e2
ce5ca51
fb42275
7a9c9a5
0ed4dd4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -86,6 +86,8 @@ public final class FederationStateStoreFacade { | |
| private static final String GET_SUBCLUSTERS_CACHEID = "getSubClusters"; | ||
| private static final String GET_POLICIES_CONFIGURATIONS_CACHEID = | ||
| "getPoliciesConfigurations"; | ||
| private static final String GET_APPLICATION_HOME_SUBCLUSTER_CACHEID = | ||
| "getApplicationHomeSubCluster"; | ||
|
|
||
| private static final FederationStateStoreFacade FACADE = | ||
| new FederationStateStoreFacade(); | ||
|
|
@@ -376,10 +378,23 @@ public void updateApplicationHomeSubCluster( | |
| */ | ||
| public SubClusterId getApplicationHomeSubCluster(ApplicationId appId) | ||
| throws YarnException { | ||
| GetApplicationHomeSubClusterResponse response = | ||
| stateStore.getApplicationHomeSubCluster( | ||
| try { | ||
| if (isCachingEnabled()) { | ||
| Object value = | ||
| cache.get(buildGetApplicationHomeSubClusterRequest(appId)); | ||
| if (value instanceof SubClusterId) { | ||
| return (SubClusterId) value; | ||
| } else { | ||
| throw new YarnException("Cannot be converted to SubClusterId."); | ||
|
||
| } | ||
| } else { | ||
| GetApplicationHomeSubClusterResponse response = stateStore.getApplicationHomeSubCluster( | ||
| GetApplicationHomeSubClusterRequest.newInstance(appId)); | ||
| return response.getApplicationHomeSubCluster().getHomeSubCluster(); | ||
| return response.getApplicationHomeSubCluster().getHomeSubCluster(); | ||
| } | ||
| } catch (Throwable ex) { | ||
| throw new YarnException(ex); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -513,6 +528,26 @@ public Map<String, SubClusterPolicyConfiguration> invoke( | |
| return cacheRequest; | ||
| } | ||
|
|
||
| private Object buildGetApplicationHomeSubClusterRequest(ApplicationId applicationId) { | ||
| final String cacheKey = buildCacheKey(getClass().getSimpleName(), | ||
| GET_APPLICATION_HOME_SUBCLUSTER_CACHEID, applicationId.toString()); | ||
| CacheRequest<String, SubClusterId> cacheRequest = | ||
| new CacheRequest<>( | ||
| cacheKey, | ||
| input -> { | ||
| GetApplicationHomeSubClusterResponse response = | ||
| stateStore.getApplicationHomeSubCluster( | ||
| GetApplicationHomeSubClusterRequest.newInstance(applicationId)); | ||
|
||
|
|
||
| ApplicationHomeSubCluster applicationHomeSubCluster = | ||
| response.getApplicationHomeSubCluster(); | ||
| SubClusterId subClusterId = applicationHomeSubCluster.getHomeSubCluster(); | ||
|
|
||
| return subClusterId; | ||
| }); | ||
| return cacheRequest; | ||
| } | ||
|
|
||
| protected String buildCacheKey(String typeName, String methodName, | ||
| String argName) { | ||
| StringBuilder buffer = new StringBuilder(); | ||
|
|
@@ -560,7 +595,7 @@ private static class CacheRequest<K, V> { | |
| private K key; | ||
| private Func<K, V> func; | ||
|
|
||
| public CacheRequest(K key, Func<K, V> func) { | ||
| CacheRequest(K key, Func<K, V> func) { | ||
| this.key = key; | ||
| this.func = func; | ||
| } | ||
|
|
@@ -609,4 +644,14 @@ public boolean equals(Object obj) { | |
| protected interface Func<T, TResult> { | ||
| TResult invoke(T input) throws Exception; | ||
| } | ||
|
|
||
| @VisibleForTesting | ||
| public Cache<Object, Object> getCache() { | ||
| return cache; | ||
| } | ||
|
|
||
| @VisibleForTesting | ||
| protected Object getAppHomeSubClusterCacheRequest(ApplicationId applicationId) { | ||
| return buildGetApplicationHomeSubClusterRequest(applicationId); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -41,6 +41,8 @@ | |
| import org.junit.runners.Parameterized; | ||
| import org.junit.runners.Parameterized.Parameters; | ||
|
|
||
| import javax.cache.Cache; | ||
|
|
||
| /** | ||
| * Unit tests for FederationStateStoreFacade. | ||
| */ | ||
|
|
@@ -64,12 +66,14 @@ public static Collection<Boolean[]> getParameters() { | |
| private FederationStateStoreTestUtil stateStoreTestUtil; | ||
| private FederationStateStoreFacade facade = | ||
| FederationStateStoreFacade.getInstance(); | ||
| private Boolean isCachingEnabled; | ||
|
|
||
| public TestFederationStateStoreFacade(Boolean isCachingEnabled) { | ||
| conf = new Configuration(); | ||
| if (!(isCachingEnabled.booleanValue())) { | ||
| conf.setInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS, 0); | ||
| } | ||
| this.isCachingEnabled = isCachingEnabled; | ||
| } | ||
|
|
||
| @Before | ||
|
|
@@ -206,4 +210,27 @@ public void testAddApplicationHomeSubCluster() throws YarnException { | |
| Assert.assertEquals(subClusterId1, result); | ||
| } | ||
|
|
||
| @Test | ||
| public void testGetApplicationHomeSubClusterCache() throws YarnException { | ||
| ApplicationId appId = ApplicationId.newInstance(clusterTs, numApps + 1); | ||
| SubClusterId subClusterId1 = SubClusterId.newInstance("Home1"); | ||
|
|
||
| ApplicationHomeSubCluster appHomeSubCluster = | ||
| ApplicationHomeSubCluster.newInstance(appId, subClusterId1); | ||
| SubClusterId subClusterIdAdd = | ||
|
||
| facade.addApplicationHomeSubCluster(appHomeSubCluster); | ||
|
|
||
| SubClusterId subClusterIdByFacade = facade.getApplicationHomeSubCluster(appId); | ||
| Assert.assertEquals(subClusterIdByFacade, subClusterIdAdd); | ||
| Assert.assertEquals(subClusterId1, subClusterIdAdd); | ||
|
|
||
| if (isCachingEnabled.booleanValue()) { | ||
| Cache<Object, Object> cache = facade.getCache(); | ||
| Object cacheKey = facade.getAppHomeSubClusterCacheRequest(appId); | ||
| Object subClusterIdByCache = cache.get(cacheKey); | ||
| Assert.assertEquals(subClusterIdByFacade, subClusterIdByCache); | ||
| Assert.assertEquals(subClusterId1, subClusterIdByCache); | ||
| } | ||
| } | ||
|
|
||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This fits in one line.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your help reviewing the code, I will fix it.