-
Notifications
You must be signed in to change notification settings - Fork 9.2k
YARN-11290. Improve Query Condition of FederationStateStore#getApplicationsHomeSubCluster. #4846
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
ba324a5
a1629b7
9c499ca
354b8b3
eecc588
499986a
e1ee2a3
023bb5c
d309ac5
e96f796
7de66bf
40f6d99
f6df901
2a09fa7
152300c
c7620af
358977b
a536e30
1d00020
411d4d2
a12ea7e
923622c
490cf3c
32042dc
bd195f2
b948fab
d3ecfec
27be255
da64b19
794f79b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -24,10 +24,12 @@ | |
| import java.util.Map.Entry; | ||
| import java.util.TimeZone; | ||
| import java.util.concurrent.ConcurrentHashMap; | ||
| import java.util.stream.Collectors; | ||
|
|
||
| import org.apache.hadoop.conf.Configuration; | ||
| import org.apache.hadoop.yarn.api.records.ApplicationId; | ||
| import org.apache.hadoop.yarn.api.records.ReservationId; | ||
| import org.apache.hadoop.yarn.conf.YarnConfiguration; | ||
| import org.apache.hadoop.yarn.exceptions.YarnException; | ||
| import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; | ||
| import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest; | ||
|
|
@@ -90,6 +92,7 @@ public class MemoryFederationStateStore implements FederationStateStore { | |
| private Map<ApplicationId, SubClusterId> applications; | ||
| private Map<ReservationId, SubClusterId> reservations; | ||
| private Map<String, SubClusterPolicyConfiguration> policies; | ||
| private int maxAppsInStateStore; | ||
|
|
||
| private final MonotonicClock clock = new MonotonicClock(); | ||
|
|
||
|
|
@@ -102,6 +105,9 @@ public void init(Configuration conf) { | |
| applications = new ConcurrentHashMap<ApplicationId, SubClusterId>(); | ||
| reservations = new ConcurrentHashMap<ReservationId, SubClusterId>(); | ||
| policies = new ConcurrentHashMap<String, SubClusterPolicyConfiguration>(); | ||
| maxAppsInStateStore = conf.getInt( | ||
| YarnConfiguration.FEDERATION_STATESTORE_MAX_APPLICATIONS, | ||
| YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_MAX_APPLICATIONS); | ||
| } | ||
|
|
||
| @Override | ||
|
|
@@ -255,14 +261,33 @@ public GetApplicationHomeSubClusterResponse getApplicationHomeSubCluster( | |
| @Override | ||
| public GetApplicationsHomeSubClusterResponse getApplicationsHomeSubCluster( | ||
| GetApplicationsHomeSubClusterRequest request) throws YarnException { | ||
| List<ApplicationHomeSubCluster> result = | ||
| new ArrayList<ApplicationHomeSubCluster>(); | ||
| for (Entry<ApplicationId, SubClusterId> e : applications.entrySet()) { | ||
| result | ||
| .add(ApplicationHomeSubCluster.newInstance(e.getKey(), e.getValue())); | ||
|
|
||
| if (request == null) { | ||
| throw new YarnException("Missing getApplicationsHomeSubCluster request"); | ||
| } | ||
|
|
||
| List<ApplicationHomeSubCluster> result = new ArrayList<>(); | ||
| List<ApplicationId> applicationIdList = | ||
| applications.keySet().stream().collect(Collectors.toList()); | ||
|
|
||
| SubClusterId requestSubClusterId = request.getSubClusterId(); | ||
| int appCount = 0; | ||
| for (int i = 0; i < applicationIdList.size(); i++) { | ||
| if (appCount >= maxAppsInStateStore) { | ||
| break; | ||
| } | ||
| ApplicationId applicationId = applicationIdList.get(i); | ||
| SubClusterId subClusterId = applications.get(applicationId); | ||
| // If the requestSubClusterId that needs to be filtered in the request | ||
| // is inconsistent with the SubClusterId in the data, continue to the next round | ||
| if (requestSubClusterId != null && !requestSubClusterId.equals(subClusterId)){ | ||
| continue; | ||
| } | ||
| result.add(ApplicationHomeSubCluster.newInstance(applicationId, subClusterId)); | ||
|
||
| appCount++; | ||
| } | ||
|
|
||
| GetApplicationsHomeSubClusterResponse.newInstance(result); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This was just overlooked?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This line of code should be an extra line of code, which has no practical significance. The following is directly constructed and returned. |
||
| LOG.info("requestSubClusterId = {}, appCount = {}.", requestSubClusterId, appCount); | ||
| return GetApplicationsHomeSubClusterResponse.newInstance(result); | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -129,7 +129,7 @@ public class SQLFederationStateStore implements FederationStateStore { | |
| "{call sp_getApplicationHomeSubCluster(?, ?)}"; | ||
|
|
||
| private static final String CALL_SP_GET_APPLICATIONS_HOME_SUBCLUSTER = | ||
| "{call sp_getApplicationsHomeSubCluster()}"; | ||
| "{call sp_getApplicationsHomeSubCluster(?, ?)}"; | ||
|
|
||
| private static final String CALL_SP_SET_POLICY_CONFIGURATION = | ||
| "{call sp_setPolicyConfiguration(?, ?, ?, ?)}"; | ||
|
|
@@ -154,6 +154,7 @@ public class SQLFederationStateStore implements FederationStateStore { | |
| private final Clock clock = new MonotonicClock(); | ||
| @VisibleForTesting | ||
| Connection conn = null; | ||
| private int maxAppsInStateStore; | ||
|
|
||
| @Override | ||
| public void init(Configuration conf) throws YarnException { | ||
|
|
@@ -193,6 +194,10 @@ public void init(Configuration conf) throws YarnException { | |
| FederationStateStoreUtils.logAndThrowRetriableException(LOG, | ||
| "Not able to get Connection", e); | ||
| } | ||
|
|
||
| maxAppsInStateStore = conf.getInt( | ||
| YarnConfiguration.FEDERATION_STATESTORE_MAX_APPLICATIONS, | ||
| YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_MAX_APPLICATIONS); | ||
| } | ||
|
|
||
| @Override | ||
|
|
@@ -726,13 +731,23 @@ public GetApplicationHomeSubClusterResponse getApplicationHomeSubCluster( | |
| @Override | ||
| public GetApplicationsHomeSubClusterResponse getApplicationsHomeSubCluster( | ||
| GetApplicationsHomeSubClusterRequest request) throws YarnException { | ||
|
|
||
| if (request == null) { | ||
| throw new YarnException("Missing getApplicationsHomeSubCluster request"); | ||
| } | ||
|
|
||
| CallableStatement cstmt = null; | ||
| ResultSet rs = null; | ||
| List<ApplicationHomeSubCluster> appsHomeSubClusters = | ||
| new ArrayList<ApplicationHomeSubCluster>(); | ||
| List<ApplicationHomeSubCluster> appsHomeSubClusters = new ArrayList<>(); | ||
|
|
||
| try { | ||
| cstmt = getCallableStatement(CALL_SP_GET_APPLICATIONS_HOME_SUBCLUSTER); | ||
| cstmt.setInt("limit_IN", maxAppsInStateStore); | ||
| String homeSubClusterIN = null;; | ||
|
||
| if (request.getSubClusterId() != null) { | ||
| homeSubClusterIN = request.getSubClusterId().toString(); | ||
| } | ||
| cstmt.setString("homeSubCluster_IN", homeSubClusterIN); | ||
|
|
||
| // Execute the query | ||
| long startTime = clock.getTime(); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -133,6 +133,7 @@ public class ZookeeperFederationStateStore implements FederationStateStore { | |
| private String membershipZNode; | ||
| private String policiesZNode; | ||
| private String reservationsZNode; | ||
| private int maxAppsInStateStore; | ||
|
|
||
| private volatile Clock clock = SystemClock.getInstance(); | ||
|
|
||
|
|
@@ -144,6 +145,10 @@ public class ZookeeperFederationStateStore implements FederationStateStore { | |
| public void init(Configuration conf) throws YarnException { | ||
| LOG.info("Initializing ZooKeeper connection"); | ||
|
|
||
| maxAppsInStateStore = conf.getInt( | ||
| YarnConfiguration.FEDERATION_STATESTORE_MAX_APPLICATIONS, | ||
| YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_MAX_APPLICATIONS); | ||
|
|
||
| baseZNode = conf.get( | ||
| YarnConfiguration.FEDERATION_STATESTORE_ZK_PARENT_PATH, | ||
| YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_ZK_PARENT_PATH); | ||
|
|
@@ -255,23 +260,41 @@ public GetApplicationHomeSubClusterResponse getApplicationHomeSubCluster( | |
| @Override | ||
| public GetApplicationsHomeSubClusterResponse getApplicationsHomeSubCluster( | ||
| GetApplicationsHomeSubClusterRequest request) throws YarnException { | ||
|
|
||
| if (request == null) { | ||
| throw new YarnException("Missing getApplicationsHomeSubCluster request"); | ||
| } | ||
|
|
||
| long start = clock.getTime(); | ||
| List<ApplicationHomeSubCluster> result = new ArrayList<>(); | ||
| SubClusterId requestSubClusterId = request.getSubClusterId(); | ||
| int appCount = 0; | ||
|
|
||
| try { | ||
| for (String child : zkManager.getChildren(appsZNode)) { | ||
| List<String> childrens = zkManager.getChildren(appsZNode); | ||
| for (String child : childrens) { | ||
| if (appCount >= maxAppsInStateStore) { | ||
| break; | ||
| } | ||
| ApplicationId appId = ApplicationId.fromString(child); | ||
| SubClusterId homeSubCluster = getApp(appId); | ||
| ApplicationHomeSubCluster app = | ||
| ApplicationHomeSubCluster.newInstance(appId, homeSubCluster); | ||
| // If the requestSubClusterId that needs to be filtered in the request | ||
| // is inconsistent with the SubClusterId in the data, continue to the next round | ||
| if (requestSubClusterId != null && !requestSubClusterId.equals(homeSubCluster)) { | ||
| continue; | ||
|
||
| } | ||
| ApplicationHomeSubCluster app = ApplicationHomeSubCluster.newInstance(appId, homeSubCluster); | ||
| result.add(app); | ||
| appCount ++; | ||
| } | ||
| } catch (Exception e) { | ||
| String errMsg = "Cannot get apps: " + e.getMessage(); | ||
| FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); | ||
| } | ||
| long end = clock.getTime(); | ||
| opDurations.addGetAppsHomeSubClusterDuration(start, end); | ||
|
|
||
| LOG.info("requestSubClusterId = {}, appCount = {}.", requestSubClusterId, appCount); | ||
| return GetApplicationsHomeSubClusterResponse.newInstance(result); | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,6 +17,7 @@ | |
|
|
||
| package org.apache.hadoop.yarn.server.federation.store.records; | ||
|
|
||
| import org.apache.hadoop.classification.InterfaceAudience; | ||
| import org.apache.hadoop.classification.InterfaceAudience.Private; | ||
| import org.apache.hadoop.classification.InterfaceStability.Unstable; | ||
| import org.apache.hadoop.yarn.util.Records; | ||
|
|
@@ -37,4 +38,33 @@ public static GetApplicationsHomeSubClusterRequest newInstance() { | |
| return request; | ||
| } | ||
|
|
||
| @Private | ||
| @Unstable | ||
| public static GetApplicationsHomeSubClusterRequest | ||
| newInstance(SubClusterId subClusterId) { | ||
| GetApplicationsHomeSubClusterRequest request = | ||
| Records.newRecord(GetApplicationsHomeSubClusterRequest.class); | ||
| request.setSubClusterId(subClusterId); | ||
| return request; | ||
| } | ||
|
|
||
| /** | ||
| * Get the {@link SubClusterId} representing the unique identifier of the | ||
| * subcluster. | ||
| * | ||
| * @return the subcluster identifier | ||
| */ | ||
| @InterfaceAudience.Public | ||
|
||
| @Unstable | ||
| public abstract SubClusterId getSubClusterId(); | ||
|
|
||
| /** | ||
| * Set the {@link SubClusterId} representing the unique identifier of the | ||
| * subcluster. | ||
| * | ||
| * @param subClusterId the subcluster identifier | ||
| */ | ||
| @InterfaceAudience.Public | ||
| @Unstable | ||
| public abstract void setSubClusterId(SubClusterId subClusterId); | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move this to the for condition.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually it would be good to do a foreach
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your suggestion, I will modify the code!