Skip to content

Commit a6c2526

Browse files
authored
YARN-11435. [Router] FederationStateStoreFacade is not reinitialized with Router conf. (#5967) Contributed by Shilun Fan.
Reviewed-by: Inigo Goiri <inigoiri@apache.org> Signed-off-by: Shilun Fan <slfan1989@apache.org>
1 parent d5334fa commit a6c2526

44 files changed

Lines changed: 129 additions & 63 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestFederationRMFailoverProxyProvider.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ public void setUp() throws IOException, YarnException {
7777

7878
stateStore = spy(new MemoryFederationStateStore());
7979
stateStore.init(conf);
80-
FederationStateStoreFacade.getInstance().reinitialize(stateStore, conf);
80+
FederationStateStoreFacade.getInstance(conf).reinitialize(stateStore, conf);
8181
verify(stateStore, times(0))
8282
.getSubClusters(any(GetSubClustersInfoRequest.class));
8383
}
@@ -180,7 +180,7 @@ public void run() {
180180
.getSubClusters(any(GetSubClustersInfoRequest.class));
181181

182182
// Force flush cache, so that it will pick up the new RM address
183-
FederationStateStoreFacade.getInstance().getSubCluster(subClusterId,
183+
FederationStateStoreFacade.getInstance(conf).getSubCluster(subClusterId,
184184
true);
185185
}
186186

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationRMFailoverProxyProvider.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ public void init(Configuration configuration, RMProxy<T> proxy,
7676
String clusterId = configuration.get(YarnConfiguration.RM_CLUSTER_ID);
7777
Preconditions.checkNotNull(clusterId, "Missing RM ClusterId");
7878
this.subClusterId = SubClusterId.newInstance(clusterId);
79-
this.facade = FederationStateStoreFacade.getInstance();
79+
this.facade = FederationStateStoreFacade.getInstance(configuration);
8080
if (configuration instanceof YarnConfiguration) {
8181
this.conf = (YarnConfiguration) configuration;
8282
}

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java

Lines changed: 47 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -105,8 +105,7 @@ public final class FederationStateStoreFacade {
105105
private static final Logger LOG =
106106
LoggerFactory.getLogger(FederationStateStoreFacade.class);
107107

108-
private static final FederationStateStoreFacade FACADE =
109-
new FederationStateStoreFacade();
108+
private static volatile FederationStateStoreFacade facade;
110109

111110
private static Random rand = new Random(System.currentTimeMillis());
112111

@@ -115,8 +114,8 @@ public final class FederationStateStoreFacade {
115114
private SubClusterResolver subclusterResolver;
116115
private FederationCache federationCache;
117116

118-
private FederationStateStoreFacade() {
119-
initializeFacadeInternal(new Configuration());
117+
private FederationStateStoreFacade(Configuration conf) {
118+
initializeFacadeInternal(conf);
120119
}
121120

122121
private void initializeFacadeInternal(Configuration config) {
@@ -199,7 +198,50 @@ public static RetryPolicy createRetryPolicy(Configuration conf) {
199198
* @return the singleton {@link FederationStateStoreFacade} instance
200199
*/
201200
public static FederationStateStoreFacade getInstance() {
202-
return FACADE;
201+
return getInstanceInternal(new Configuration());
202+
}
203+
204+
/**
205+
* Returns the singleton instance of the FederationStateStoreFacade object.
206+
*
207+
* @param conf configuration.
208+
* @return the singleton {@link FederationStateStoreFacade} instance
209+
*/
210+
public static FederationStateStoreFacade getInstance(Configuration conf) {
211+
return getInstanceInternal(conf);
212+
}
213+
214+
/**
215+
* Returns the singleton instance of the FederationStateStoreFacade object.
216+
*
217+
* @param conf configuration.
218+
* @return the singleton {@link FederationStateStoreFacade} instance
219+
*/
220+
private static FederationStateStoreFacade getInstanceInternal(Configuration conf){
221+
if (facade != null) {
222+
return facade;
223+
}
224+
generateStateStoreFacade(conf);
225+
return facade;
226+
}
227+
228+
/**
229+
* Generate the singleton instance of the FederationStateStoreFacade object.
230+
*
231+
* @param conf configuration.
232+
*/
233+
private static void generateStateStoreFacade(Configuration conf){
234+
if (facade == null) {
235+
synchronized (FederationStateStoreFacade.class) {
236+
if (facade == null) {
237+
Configuration yarnConf = new Configuration();
238+
if (conf != null) {
239+
yarnConf = conf;
240+
}
241+
facade = new FederationStateStoreFacade(yarnConf);
242+
}
243+
}
244+
}
203245
}
204246

205247
/**

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/cache/TestFederationCache.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,13 +60,14 @@ public static Collection<Class[]> getParameters() {
6060
private Configuration conf;
6161
private FederationStateStore stateStore;
6262
private FederationStateStoreTestUtil stateStoreTestUtil;
63-
private FederationStateStoreFacade facade = FederationStateStoreFacade.getInstance();
63+
private FederationStateStoreFacade facade;
6464

6565
public TestFederationCache(Class cacheClassName) {
6666
conf = new Configuration();
6767
conf.setInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS, 1);
6868
conf.setClass(YarnConfiguration.FEDERATION_FACADE_CACHE_CLASS,
6969
cacheClassName, FederationCache.class);
70+
facade = FederationStateStoreFacade.getInstance(conf);
7071
}
7172

7273
@Before

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BaseFederationPoliciesTest.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.util.Map;
2828
import java.util.Random;
2929

30+
import org.apache.hadoop.conf.Configuration;
3031
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
3132
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
3233
import org.apache.hadoop.yarn.api.records.ResourceRequest;
@@ -75,7 +76,8 @@ public void testReinitilialize() throws YarnException {
7576
.newInstance("queue1", getPolicy().getClass().getCanonicalName(), buf));
7677
fpc.setFederationSubclusterResolver(
7778
FederationPoliciesTestUtil.initResolver());
78-
fpc.setFederationStateStoreFacade(FederationPoliciesTestUtil.initFacade());
79+
Configuration conf = new Configuration();
80+
fpc.setFederationStateStoreFacade(FederationPoliciesTestUtil.initFacade(conf));
7981
getPolicy().reinitialize(fpc);
8082
}
8183

@@ -100,7 +102,8 @@ public void testReinitilializeBad3() throws YarnException {
100102
.newInstance("queue1", "WrongPolicyName", buf));
101103
fpc.setFederationSubclusterResolver(
102104
FederationPoliciesTestUtil.initResolver());
103-
fpc.setFederationStateStoreFacade(FederationPoliciesTestUtil.initFacade());
105+
Configuration conf = new Configuration();
106+
fpc.setFederationStateStoreFacade(FederationPoliciesTestUtil.initFacade(conf));
104107
getPolicy().reinitialize(fpc);
105108
}
106109

@@ -212,9 +215,9 @@ public String generateClusterMetricsInfo(int id) {
212215
public FederationStateStoreFacade getMemoryFacade() throws YarnException {
213216

214217
// setting up a store and its facade (with caching off)
215-
FederationStateStoreFacade fedFacade = FederationStateStoreFacade.getInstance();
216218
YarnConfiguration conf = new YarnConfiguration();
217219
conf.setInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS, 0);
220+
FederationStateStoreFacade fedFacade = FederationStateStoreFacade.getInstance(conf);
218221
FederationStateStore store = new MemoryFederationStateStore();
219222
store.init(conf);
220223
fedFacade.reinitialize(store, conf);

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestFederationPolicyInitializationContextValidator.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import java.nio.ByteBuffer;
2222

23+
import org.apache.hadoop.conf.Configuration;
2324
import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy;
2425
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
2526
import org.apache.hadoop.yarn.server.federation.policies.manager.FederationPolicyManager;
@@ -45,7 +46,8 @@ public class TestFederationPolicyInitializationContextValidator {
4546

4647
@Before
4748
public void setUp() throws Exception {
48-
goodFacade = FederationPoliciesTestUtil.initFacade();
49+
Configuration conf = new Configuration();
50+
goodFacade = FederationPoliciesTestUtil.initFacade(conf);
4951
goodConfig = new MockPolicyManager().serializeConf();
5052
goodSR = FederationPoliciesTestUtil.initResolver();
5153
goodHome = SubClusterId.newInstance("homesubcluster");

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestRouterPolicyFacade.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,10 +62,9 @@ public class TestRouterPolicyFacade {
6262
public void setup() throws YarnException {
6363

6464
// setting up a store and its facade (with caching off)
65-
FederationStateStoreFacade fedFacade =
66-
FederationStateStoreFacade.getInstance();
6765
YarnConfiguration conf = new YarnConfiguration();
6866
conf.set(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS, "0");
67+
FederationStateStoreFacade fedFacade = FederationStateStoreFacade.getInstance(conf);
6968
store = new MemoryFederationStateStore();
7069
store.init(conf);
7170
fedFacade.reinitialize(store, conf);

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/manager/BasePolicyManagerTest.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import static org.assertj.core.api.Assertions.assertThat;
2121

22+
import org.apache.hadoop.conf.Configuration;
2223
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext;
2324
import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy;
2425
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
@@ -72,13 +73,14 @@ protected static void serializeAndDeserializePolicyManager(
7273
Class expAMRMProxyPolicy, Class expRouterPolicy) throws Exception {
7374

7475
// serializeConf it in a context
76+
Configuration conf = new Configuration();
7577
SubClusterPolicyConfiguration fpc = wfp.serializeConf();
7678
fpc.setType(policyManagerType.getCanonicalName());
7779
FederationPolicyInitializationContext context =
7880
new FederationPolicyInitializationContext();
7981
context.setSubClusterPolicyConfiguration(fpc);
8082
context
81-
.setFederationStateStoreFacade(FederationPoliciesTestUtil.initFacade());
83+
.setFederationStateStoreFacade(FederationPoliciesTestUtil.initFacade(conf));
8284
context.setFederationSubclusterResolver(
8385
FederationPoliciesTestUtil.initResolver());
8486
context.setHomeSubcluster(SubClusterId.newInstance("homesubcluster"));

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationPoliciesTestUtil.java

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ public static void initializePolicyContext(
123123
fpc.setSubClusterPolicyConfiguration(SubClusterPolicyConfiguration
124124
.newInstance("queue1", policy.getClass().getCanonicalName(), buf));
125125
FederationStateStoreFacade facade = FederationStateStoreFacade
126-
.getInstance();
126+
.getInstance(conf);
127127
FederationStateStore fss = mock(FederationStateStore.class);
128128

129129
if (activeSubclusters == null) {
@@ -242,9 +242,8 @@ public static SubClusterResolver initResolver() {
242242

243243
public static FederationStateStoreFacade initFacade(
244244
List<SubClusterInfo> subClusterInfos, SubClusterPolicyConfiguration
245-
policyConfiguration) throws YarnException {
246-
FederationStateStoreFacade goodFacade = FederationStateStoreFacade
247-
.getInstance();
245+
policyConfiguration, Configuration conf) throws YarnException {
246+
FederationStateStoreFacade goodFacade = FederationStateStoreFacade.getInstance(conf);
248247
FederationStateStore fss = mock(FederationStateStore.class);
249248
GetSubClustersInfoResponse response = GetSubClustersInfoResponse
250249
.newInstance(subClusterInfos);
@@ -276,8 +275,20 @@ public static FederationStateStoreFacade initFacade(
276275
* @throws YarnException in case the initialization is not successful.
277276
*/
278277
public static FederationStateStoreFacade initFacade() throws YarnException {
278+
return initFacade(new Configuration());
279+
}
280+
281+
/**
282+
* Initialiaze a main-memory {@link FederationStateStoreFacade} used for
283+
* testing, wiht a mock resolver.
284+
*
285+
* @param conf Configuration.
286+
* @return the facade.
287+
* @throws YarnException in case the initialization is not successful.
288+
*/
289+
public static FederationStateStoreFacade initFacade(Configuration conf) throws YarnException {
279290
SubClusterPolicyConfiguration policyConfiguration =
280291
SubClusterPolicyConfiguration.newInstance(null, null, null);
281-
return initFacade(new ArrayList<>(), policyConfiguration);
292+
return initFacade(new ArrayList<>(), policyConfiguration, conf);
282293
}
283294
}

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationStateStoreFacade.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,8 +80,8 @@ public static Collection<Boolean[]> getParameters() {
8080
private Configuration conf;
8181
private FederationStateStore stateStore;
8282
private FederationStateStoreTestUtil stateStoreTestUtil;
83-
private FederationStateStoreFacade facade =
84-
FederationStateStoreFacade.getInstance();
83+
private FederationStateStoreFacade facade;
84+
8585
private Boolean isCachingEnabled;
8686

8787
public TestFederationStateStoreFacade(Boolean isCachingEnabled) {
@@ -90,6 +90,7 @@ public TestFederationStateStoreFacade(Boolean isCachingEnabled) {
9090
conf.setInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS, 0);
9191
}
9292
this.isCachingEnabled = isCachingEnabled;
93+
facade = FederationStateStoreFacade.getInstance(conf);
9394
}
9495

9596
@Before

0 commit comments

Comments
 (0)