-
Notifications
You must be signed in to change notification settings - Fork 9.2k
YARN-11290. Improve Query Condition of FederationStateStore#getApplicationsHomeSubCluster. #4846
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 23 commits
ba324a5
a1629b7
9c499ca
354b8b3
eecc588
499986a
e1ee2a3
023bb5c
d309ac5
e96f796
7de66bf
40f6d99
f6df901
2a09fa7
152300c
c7620af
358977b
a536e30
1d00020
411d4d2
a12ea7e
923622c
490cf3c
32042dc
bd195f2
b948fab
d3ecfec
27be255
da64b19
794f79b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -24,10 +24,14 @@ | |
| import java.util.Map.Entry; | ||
| import java.util.TimeZone; | ||
| import java.util.concurrent.ConcurrentHashMap; | ||
| import java.util.stream.Collectors; | ||
| import java.util.Comparator; | ||
|
|
||
| import org.apache.hadoop.conf.Configuration; | ||
| import org.apache.hadoop.util.Time; | ||
| import org.apache.hadoop.yarn.api.records.ApplicationId; | ||
| import org.apache.hadoop.yarn.api.records.ReservationId; | ||
| import org.apache.hadoop.yarn.conf.YarnConfiguration; | ||
| import org.apache.hadoop.yarn.exceptions.YarnException; | ||
| import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; | ||
| import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest; | ||
|
|
@@ -90,6 +94,7 @@ public class MemoryFederationStateStore implements FederationStateStore { | |
| private Map<ApplicationId, SubClusterId> applications; | ||
| private Map<ReservationId, SubClusterId> reservations; | ||
| private Map<String, SubClusterPolicyConfiguration> policies; | ||
| private int maxAppsInStateStore; | ||
|
|
||
| private final MonotonicClock clock = new MonotonicClock(); | ||
|
|
||
|
|
@@ -102,6 +107,9 @@ public void init(Configuration conf) { | |
| applications = new ConcurrentHashMap<ApplicationId, SubClusterId>(); | ||
| reservations = new ConcurrentHashMap<ReservationId, SubClusterId>(); | ||
| policies = new ConcurrentHashMap<String, SubClusterPolicyConfiguration>(); | ||
| maxAppsInStateStore = conf.getInt( | ||
| YarnConfiguration.FEDERATION_STATESTORE_MAX_APPLICATIONS, | ||
| YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_MAX_APPLICATIONS); | ||
| } | ||
|
|
||
| @Override | ||
|
|
@@ -255,17 +263,46 @@ public GetApplicationHomeSubClusterResponse getApplicationHomeSubCluster( | |
| @Override | ||
| public GetApplicationsHomeSubClusterResponse getApplicationsHomeSubCluster( | ||
| GetApplicationsHomeSubClusterRequest request) throws YarnException { | ||
| List<ApplicationHomeSubCluster> result = | ||
| new ArrayList<ApplicationHomeSubCluster>(); | ||
| for (Entry<ApplicationId, SubClusterId> e : applications.entrySet()) { | ||
| result | ||
| .add(ApplicationHomeSubCluster.newInstance(e.getKey(), e.getValue())); | ||
|
|
||
| if (request == null) { | ||
| throw new YarnException("Missing getApplicationsHomeSubCluster request"); | ||
| } | ||
|
|
||
| GetApplicationsHomeSubClusterResponse.newInstance(result); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This was just overlooked?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This line of code should be an extra line of code, which has no practical significance. The following is directly constructed and returned. |
||
| SubClusterId requestSC = request.getSubClusterId(); | ||
| List<ApplicationHomeSubCluster> result = applications.keySet().stream() | ||
goiri marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| .map(applicationId -> generateAppHomeSC(applicationId)) | ||
| .sorted(Comparator.comparing(ApplicationHomeSubCluster::getCreateTime).reversed()) | ||
| .filter(appHomeSC -> filterHomeSubCluster(requestSC, appHomeSC.getHomeSubCluster())) | ||
| .limit(maxAppsInStateStore) | ||
| .collect(Collectors.toList()); | ||
|
|
||
| LOG.info("filterSubClusterId = {}, appCount = {}.", requestSC, result.size()); | ||
| return GetApplicationsHomeSubClusterResponse.newInstance(result); | ||
| } | ||
|
|
||
| private ApplicationHomeSubCluster generateAppHomeSC(ApplicationId applicationId) { | ||
| SubClusterId subClusterId = applications.get(applicationId); | ||
| return ApplicationHomeSubCluster.newInstance(applicationId, Time.now(), subClusterId); | ||
| } | ||
|
|
||
| private boolean filterHomeSubCluster(SubClusterId filterSubCluster, | ||
| SubClusterId homeSubCluster) { | ||
|
|
||
| // If the filter condition is empty, | ||
| // it means that homeSubCluster needs to be added | ||
| if (filterSubCluster == null) { | ||
| return true; | ||
| } | ||
|
|
||
| // If the filter condition filterSubCluster is not empty, | ||
| // and filterSubCluster is equal to homeSubCluster, it needs to be added | ||
| if (filterSubCluster.equals(homeSubCluster)) { | ||
| return true; | ||
| } | ||
|
|
||
| return false; | ||
| } | ||
|
|
||
| @Override | ||
| public DeleteApplicationHomeSubClusterResponse deleteApplicationHomeSubCluster( | ||
| DeleteApplicationHomeSubClusterRequest request) throws YarnException { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -24,9 +24,12 @@ | |
| import java.util.Calendar; | ||
| import java.util.List; | ||
| import java.util.TimeZone; | ||
| import java.util.Comparator; | ||
| import java.util.stream.Collectors; | ||
|
|
||
| import org.apache.hadoop.classification.VisibleForTesting; | ||
| import org.apache.hadoop.conf.Configuration; | ||
| import org.apache.hadoop.util.Time; | ||
| import org.apache.hadoop.util.curator.ZKCuratorManager; | ||
| import org.apache.hadoop.yarn.api.records.ApplicationId; | ||
| import org.apache.hadoop.yarn.conf.YarnConfiguration; | ||
|
|
@@ -133,6 +136,7 @@ public class ZookeeperFederationStateStore implements FederationStateStore { | |
| private String membershipZNode; | ||
| private String policiesZNode; | ||
| private String reservationsZNode; | ||
| private int maxAppsInStateStore; | ||
|
|
||
| private volatile Clock clock = SystemClock.getInstance(); | ||
|
|
||
|
|
@@ -144,6 +148,10 @@ public class ZookeeperFederationStateStore implements FederationStateStore { | |
| public void init(Configuration conf) throws YarnException { | ||
| LOG.info("Initializing ZooKeeper connection"); | ||
|
|
||
| maxAppsInStateStore = conf.getInt( | ||
| YarnConfiguration.FEDERATION_STATESTORE_MAX_APPLICATIONS, | ||
| YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_MAX_APPLICATIONS); | ||
|
|
||
| baseZNode = conf.get( | ||
| YarnConfiguration.FEDERATION_STATESTORE_ZK_PARENT_PATH, | ||
| YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_ZK_PARENT_PATH); | ||
|
|
@@ -255,24 +263,54 @@ public GetApplicationHomeSubClusterResponse getApplicationHomeSubCluster( | |
| @Override | ||
| public GetApplicationsHomeSubClusterResponse getApplicationsHomeSubCluster( | ||
| GetApplicationsHomeSubClusterRequest request) throws YarnException { | ||
| long start = clock.getTime(); | ||
| List<ApplicationHomeSubCluster> result = new ArrayList<>(); | ||
|
|
||
| if (request == null) { | ||
| throw new YarnException("Missing getApplicationsHomeSubCluster request"); | ||
| } | ||
|
|
||
| try { | ||
| for (String child : zkManager.getChildren(appsZNode)) { | ||
| ApplicationId appId = ApplicationId.fromString(child); | ||
| SubClusterId homeSubCluster = getApp(appId); | ||
| ApplicationHomeSubCluster app = | ||
| ApplicationHomeSubCluster.newInstance(appId, homeSubCluster); | ||
| result.add(app); | ||
| } | ||
| long start = clock.getTime(); | ||
| SubClusterId requestSC = request.getSubClusterId(); | ||
| List<String> children = zkManager.getChildren(appsZNode); | ||
| List<ApplicationHomeSubCluster> result = | ||
goiri marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| children.stream().map(child -> generateAppHomeSC(child)) | ||
| .sorted(Comparator.comparing(ApplicationHomeSubCluster::getCreateTime).reversed()) | ||
goiri marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| .filter(appHomeSC -> filterHomeSubCluster(requestSC, appHomeSC.getHomeSubCluster())) | ||
| .limit(maxAppsInStateStore) | ||
| .collect(Collectors.toList()); | ||
| long end = clock.getTime(); | ||
| opDurations.addGetAppsHomeSubClusterDuration(start, end); | ||
| LOG.info("filterSubClusterId = {}, appCount = {}.", requestSC, result.size()); | ||
| return GetApplicationsHomeSubClusterResponse.newInstance(result); | ||
| } catch (Exception e) { | ||
| String errMsg = "Cannot get apps: " + e.getMessage(); | ||
| FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); | ||
| } | ||
| long end = clock.getTime(); | ||
| opDurations.addGetAppsHomeSubClusterDuration(start, end); | ||
| return GetApplicationsHomeSubClusterResponse.newInstance(result); | ||
|
|
||
| throw new YarnException("Cannot get app by request"); | ||
| } | ||
|
|
||
| private ApplicationHomeSubCluster generateAppHomeSC(String appId) { | ||
| try { | ||
| ApplicationId applicationId = ApplicationId.fromString(appId); | ||
| SubClusterId homeSubCluster = getApp(applicationId); | ||
| ApplicationHomeSubCluster app = | ||
| ApplicationHomeSubCluster.newInstance(applicationId, Time.now(), homeSubCluster); | ||
| return app; | ||
| } catch (Exception ex) { | ||
| LOG.error("get homeSubCluster by appId = {}.", appId); | ||
| } | ||
| return null; | ||
| } | ||
|
|
||
| private boolean filterHomeSubCluster(SubClusterId filterSubCluster, | ||
|
||
| SubClusterId homeSubCluster) { | ||
| if (filterSubCluster == null) { | ||
| return true; | ||
| } else if (filterSubCluster.equals(homeSubCluster)) { | ||
| return true; | ||
| } | ||
| return false; | ||
| } | ||
|
|
||
| @Override | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the other one we use AS applicationshomesubcluster; let's try to be consistent with the names as possible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you very much for helping to review the code, I will modify the code.