-
Notifications
You must be signed in to change notification settings - Fork 9.2k
YARN-11290. Improve Query Condition of FederationStateStore#getApplicationsHomeSubCluster. #4846
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 10 commits
ba324a5
a1629b7
9c499ca
354b8b3
eecc588
499986a
e1ee2a3
023bb5c
d309ac5
e96f796
7de66bf
40f6d99
f6df901
2a09fa7
152300c
c7620af
358977b
a536e30
1d00020
411d4d2
a12ea7e
923622c
490cf3c
32042dc
bd195f2
b948fab
d3ecfec
27be255
da64b19
794f79b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -111,12 +111,27 @@ IF OBJECT_ID ( '[sp_getApplicationsHomeSubCluster]', 'P' ) IS NOT NULL | |
| GO | ||
|
|
||
| CREATE PROCEDURE [dbo].[sp_getApplicationsHomeSubCluster] | ||
| @limit int, | ||
| @homeSubCluster VARCHAR(256) | ||
| AS BEGIN | ||
| DECLARE @errorMessage nvarchar(4000) | ||
|
|
||
| BEGIN TRY | ||
| SELECT [applicationId], [homeSubCluster], [createTime] | ||
| FROM [dbo].[applicationsHomeSubCluster] | ||
| SELECT | ||
| [applicationId], | ||
| [homeSubCluster], | ||
| [createTime] | ||
| FROM | ||
| (SELECT | ||
| [applicationId], | ||
| [homeSubCluster], | ||
| [createTime], | ||
| row_number() over(partition by [homeSubCluster] order by [createTime] desc) as row_num | ||
|
||
| FROM [dbo].[applicationsHomeSubCluster]) AS t | ||
| WHERE row_num <= @limit | ||
| AND (CASE WHEN @homeSubCluster IS NULL THEN 1 | ||
|
||
| WHEN @homeSubCluster IS NOT NULL AND [homeSubCluster] = @homeSubCluster THEN 1 | ||
| ELSE 0 END) = 1 | ||
| END TRY | ||
|
|
||
| BEGIN CATCH | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -4963,4 +4963,14 @@ | |
| </description> | ||
| </property> | ||
|
|
||
| <property> | ||
| <name>yarn.federation.state-store.max-applications</name> | ||
| <value>1000</value> | ||
| <description> | ||
| yarn federation state-store supports | ||
|
||
| querying the maximum number of apps | ||
| Default is 1000 | ||
| </description> | ||
| </property> | ||
|
|
||
| </configuration> | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -24,10 +24,12 @@ | |
| import java.util.Map.Entry; | ||
| import java.util.TimeZone; | ||
| import java.util.concurrent.ConcurrentHashMap; | ||
| import java.util.stream.Collectors; | ||
|
|
||
| import org.apache.hadoop.conf.Configuration; | ||
| import org.apache.hadoop.yarn.api.records.ApplicationId; | ||
| import org.apache.hadoop.yarn.api.records.ReservationId; | ||
| import org.apache.hadoop.yarn.conf.YarnConfiguration; | ||
| import org.apache.hadoop.yarn.exceptions.YarnException; | ||
| import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; | ||
| import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest; | ||
|
|
@@ -90,6 +92,7 @@ public class MemoryFederationStateStore implements FederationStateStore { | |
| private Map<ApplicationId, SubClusterId> applications; | ||
| private Map<ReservationId, SubClusterId> reservations; | ||
| private Map<String, SubClusterPolicyConfiguration> policies; | ||
| private int maxAppsInStateStore; | ||
|
|
||
| private final MonotonicClock clock = new MonotonicClock(); | ||
|
|
||
|
|
@@ -102,6 +105,9 @@ public void init(Configuration conf) { | |
| applications = new ConcurrentHashMap<ApplicationId, SubClusterId>(); | ||
| reservations = new ConcurrentHashMap<ReservationId, SubClusterId>(); | ||
| policies = new ConcurrentHashMap<String, SubClusterPolicyConfiguration>(); | ||
| maxAppsInStateStore = conf.getInt( | ||
| YarnConfiguration.FEDERATION_STATESTORE_MAX_APPLICATIONS, | ||
| YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_MAX_APPLICATIONS); | ||
| } | ||
|
|
||
| @Override | ||
|
|
@@ -255,17 +261,36 @@ public GetApplicationHomeSubClusterResponse getApplicationHomeSubCluster( | |
| @Override | ||
| public GetApplicationsHomeSubClusterResponse getApplicationsHomeSubCluster( | ||
| GetApplicationsHomeSubClusterRequest request) throws YarnException { | ||
| List<ApplicationHomeSubCluster> result = | ||
| new ArrayList<ApplicationHomeSubCluster>(); | ||
| for (Entry<ApplicationId, SubClusterId> e : applications.entrySet()) { | ||
| result | ||
| .add(ApplicationHomeSubCluster.newInstance(e.getKey(), e.getValue())); | ||
|
|
||
| if (request == null) { | ||
| throw new YarnException("Missing getApplicationsHomeSubCluster request"); | ||
| } | ||
|
|
||
| GetApplicationsHomeSubClusterResponse.newInstance(result); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This was just overlooked?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This line of code should be an extra line of code, which has no practical significance. The following is directly constructed and returned. |
||
| SubClusterId requestSC = request.getSubClusterId(); | ||
| List<ApplicationHomeSubCluster> result = applications.keySet().stream() | ||
goiri marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| .map(applicationId -> generateAppHomeSC(applicationId)) | ||
| .filter(appHomeSC -> judgeAdd(requestSC, appHomeSC.getHomeSubCluster())) | ||
| .limit(maxAppsInStateStore) | ||
| .collect(Collectors.toList()); | ||
|
|
||
| LOG.info("filterSubClusterId = {}, appCount = {}.", requestSC, result.size()); | ||
| return GetApplicationsHomeSubClusterResponse.newInstance(result); | ||
| } | ||
|
|
||
| private ApplicationHomeSubCluster generateAppHomeSC(ApplicationId applicationId) { | ||
| SubClusterId subClusterId = applications.get(applicationId); | ||
| return ApplicationHomeSubCluster.newInstance(applicationId, subClusterId); | ||
| } | ||
|
|
||
| private boolean judgeAdd(SubClusterId filterSubCluster, SubClusterId homeSubCluster) { | ||
| if (filterSubCluster == null) { | ||
| return true; | ||
| } else if (filterSubCluster.equals(homeSubCluster)) { | ||
|
||
| return true; | ||
| } | ||
| return false; | ||
| } | ||
|
|
||
| @Override | ||
| public DeleteApplicationHomeSubClusterResponse deleteApplicationHomeSubCluster( | ||
| DeleteApplicationHomeSubClusterRequest request) throws YarnException { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -24,6 +24,7 @@ | |
| import java.util.Calendar; | ||
| import java.util.List; | ||
| import java.util.TimeZone; | ||
| import java.util.stream.Collectors; | ||
|
|
||
| import org.apache.hadoop.classification.VisibleForTesting; | ||
| import org.apache.hadoop.conf.Configuration; | ||
|
|
@@ -133,6 +134,7 @@ public class ZookeeperFederationStateStore implements FederationStateStore { | |
| private String membershipZNode; | ||
| private String policiesZNode; | ||
| private String reservationsZNode; | ||
| private int maxAppsInStateStore; | ||
|
|
||
| private volatile Clock clock = SystemClock.getInstance(); | ||
|
|
||
|
|
@@ -144,6 +146,10 @@ public class ZookeeperFederationStateStore implements FederationStateStore { | |
| public void init(Configuration conf) throws YarnException { | ||
| LOG.info("Initializing ZooKeeper connection"); | ||
|
|
||
| maxAppsInStateStore = conf.getInt( | ||
| YarnConfiguration.FEDERATION_STATESTORE_MAX_APPLICATIONS, | ||
| YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_MAX_APPLICATIONS); | ||
|
|
||
| baseZNode = conf.get( | ||
| YarnConfiguration.FEDERATION_STATESTORE_ZK_PARENT_PATH, | ||
| YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_ZK_PARENT_PATH); | ||
|
|
@@ -255,24 +261,52 @@ public GetApplicationHomeSubClusterResponse getApplicationHomeSubCluster( | |
| @Override | ||
| public GetApplicationsHomeSubClusterResponse getApplicationsHomeSubCluster( | ||
| GetApplicationsHomeSubClusterRequest request) throws YarnException { | ||
| long start = clock.getTime(); | ||
| List<ApplicationHomeSubCluster> result = new ArrayList<>(); | ||
|
|
||
| if (request == null) { | ||
| throw new YarnException("Missing getApplicationsHomeSubCluster request"); | ||
| } | ||
|
|
||
| try { | ||
| for (String child : zkManager.getChildren(appsZNode)) { | ||
| ApplicationId appId = ApplicationId.fromString(child); | ||
| SubClusterId homeSubCluster = getApp(appId); | ||
| ApplicationHomeSubCluster app = | ||
| ApplicationHomeSubCluster.newInstance(appId, homeSubCluster); | ||
| result.add(app); | ||
| } | ||
| long start = clock.getTime(); | ||
| SubClusterId requestSC = request.getSubClusterId(); | ||
| List<String> children = zkManager.getChildren(appsZNode); | ||
| List<ApplicationHomeSubCluster> result = | ||
goiri marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| children.stream().map(child -> generateAppHomeSC(child)) | ||
| .filter(appHomeSC -> judgeAdd(requestSC, appHomeSC.getHomeSubCluster())) | ||
| .limit(maxAppsInStateStore) | ||
| .collect(Collectors.toList()); | ||
| long end = clock.getTime(); | ||
| opDurations.addGetAppsHomeSubClusterDuration(start, end); | ||
| LOG.info("filterSubClusterId = {}, appCount = {}.", requestSC, result.size()); | ||
| return GetApplicationsHomeSubClusterResponse.newInstance(result); | ||
| } catch (Exception e) { | ||
| String errMsg = "Cannot get apps: " + e.getMessage(); | ||
| FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); | ||
| } | ||
| long end = clock.getTime(); | ||
| opDurations.addGetAppsHomeSubClusterDuration(start, end); | ||
| return GetApplicationsHomeSubClusterResponse.newInstance(result); | ||
|
|
||
| throw new YarnException("Cannot get app by request"); | ||
| } | ||
|
|
||
| private ApplicationHomeSubCluster generateAppHomeSC(String appId) { | ||
| try { | ||
| ApplicationId applicationId = ApplicationId.fromString(appId); | ||
| SubClusterId homeSubCluster = getApp(applicationId); | ||
| ApplicationHomeSubCluster app = | ||
| ApplicationHomeSubCluster.newInstance(applicationId, homeSubCluster); | ||
| return app; | ||
| } catch (Exception ex) { | ||
| LOG.error("get homeSubCluster by appId = {}.", appId); | ||
| } | ||
| return null; | ||
| } | ||
|
|
||
| private boolean judgeAdd(SubClusterId filterSubCluster, SubClusterId homeSubCluster) { | ||
|
||
| if (filterSubCluster == null) { | ||
| return true; | ||
| } else if (filterSubCluster.equals(homeSubCluster)) { | ||
| return true; | ||
| } | ||
| return false; | ||
| } | ||
|
|
||
| @Override | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,6 +17,7 @@ | |
|
|
||
| package org.apache.hadoop.yarn.server.federation.store.records; | ||
|
|
||
| import org.apache.hadoop.classification.InterfaceAudience; | ||
| import org.apache.hadoop.classification.InterfaceAudience.Private; | ||
| import org.apache.hadoop.classification.InterfaceStability.Unstable; | ||
| import org.apache.hadoop.yarn.util.Records; | ||
|
|
@@ -37,4 +38,33 @@ public static GetApplicationsHomeSubClusterRequest newInstance() { | |
| return request; | ||
| } | ||
|
|
||
| @Private | ||
| @Unstable | ||
| public static GetApplicationsHomeSubClusterRequest | ||
| newInstance(SubClusterId subClusterId) { | ||
| GetApplicationsHomeSubClusterRequest request = | ||
| Records.newRecord(GetApplicationsHomeSubClusterRequest.class); | ||
| request.setSubClusterId(subClusterId); | ||
| return request; | ||
| } | ||
|
|
||
| /** | ||
| * Get the {@link SubClusterId} representing the unique identifier of the | ||
| * subcluster. | ||
| * | ||
| * @return the subcluster identifier | ||
| */ | ||
| @InterfaceAudience.Public | ||
|
||
| @Unstable | ||
| public abstract SubClusterId getSubClusterId(); | ||
|
|
||
| /** | ||
| * Set the {@link SubClusterId} representing the unique identifier of the | ||
| * subcluster. | ||
| * | ||
| * @param subClusterId the subcluster identifier | ||
| */ | ||
| @InterfaceAudience.Public | ||
| @Unstable | ||
| public abstract void setSubClusterId(SubClusterId subClusterId); | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given that we don't order, aren't we getting random apps?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you very much for your suggestion, we should sort the apps and get the topN apps, I will modify the code.