Skip to content

Commit 39b4a37

Browse files
committed
YARN-9341. Fixed enentrant lock usage in YARN project.
Contributed by Prabhu Joseph
1 parent 1bc282e commit 39b4a37

File tree

52 files changed

+388
-412
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

52 files changed

+388
-412
lines changed

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,8 +117,8 @@ public ServiceManager(ServiceContext context) {
117117

118118
@Override
119119
public void handle(ServiceEvent event) {
120+
writeLock.lock();
120121
try {
121-
writeLock.lock();
122122
State oldState = getState();
123123
try {
124124
stateMachine.doTransition(event.getType(), event);

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1090,8 +1090,8 @@ public ServiceScheduler getScheduler() {
10901090

10911091
@Override
10921092
public void handle(ComponentEvent event) {
1093+
writeLock.lock();
10931094
try {
1094-
writeLock.lock();
10951095
ComponentState oldState = getState();
10961096
try {
10971097
stateMachine.doTransition(event.getType(), event);

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -751,8 +751,8 @@ public void setContainerState(ContainerState state) {
751751

752752
@Override
753753
public void handle(ComponentInstanceEvent event) {
754+
writeLock.lock();
754755
try {
755-
writeLock.lock();
756756
ComponentInstanceState oldState = getState();
757757
try {
758758
stateMachine.doTransition(event.getType(), event);
@@ -782,8 +782,8 @@ public String getCompInstanceName() {
782782
void updateLocalizationStatuses(
783783
List<org.apache.hadoop.yarn.api.records.LocalizationStatus> statuses) {
784784
Map<String, String> resourcesCpy = new HashMap<>();
785+
readLock.lock();
785786
try {
786-
readLock.lock();
787787
if (resolvedParams == null || resolvedParams.didLaunchFail() ||
788788
resolvedParams.getResolvedRsrcPaths() == null ||
789789
resolvedParams.getResolvedRsrcPaths().isEmpty()) {
@@ -823,8 +823,8 @@ void updateLocalizationStatuses(
823823

824824
public void updateResolvedLaunchParams(
825825
Future<ProviderService.ResolvedLaunchParams> future) {
826+
writeLock.lock();
826827
try {
827-
writeLock.lock();
828828
this.resolvedParams = future.get();
829829
} catch (InterruptedException | ExecutionException e) {
830830
LOG.error("{} updating resolved params", getCompInstanceId(), e);
@@ -834,8 +834,8 @@ public void updateResolvedLaunchParams(
834834
}
835835

836836
public ContainerStatus getContainerStatus() {
837+
readLock.lock();
837838
try {
838-
readLock.lock();
839839
return status;
840840
} finally {
841841
readLock.unlock();
@@ -844,8 +844,8 @@ public ContainerStatus getContainerStatus() {
844844

845845
private void setContainerStatus(ContainerId containerId,
846846
ContainerStatus latestStatus) {
847+
writeLock.lock();
847848
try {
848-
writeLock.lock();
849849
this.status = latestStatus;
850850
org.apache.hadoop.yarn.service.api.records.Container containerRec =
851851
getCompSpec().getContainer(containerId.toString());

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/FileSystemTimelineWriter.java

Lines changed: 23 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -478,8 +478,8 @@ public LogFDsCache(long flushIntervalSecs, long cleanIntervalSecs,
478478

479479
@Override
480480
public void flush() throws IOException {
481+
this.domainFDLocker.lock();
481482
try {
482-
this.domainFDLocker.lock();
483483
if (domainLogFD != null) {
484484
domainLogFD.flush();
485485
}
@@ -494,8 +494,8 @@ public void flush() throws IOException {
494494

495495
private Map<ApplicationAttemptId, EntityLogFD> copySummaryLogFDs(
496496
Map<ApplicationAttemptId, EntityLogFD> summanyLogFDsToCopy) {
497+
summaryTableCopyLocker.lock();
497498
try {
498-
summaryTableCopyLocker.lock();
499499
return new HashMap<ApplicationAttemptId, EntityLogFD>(
500500
summanyLogFDsToCopy);
501501
} finally {
@@ -506,8 +506,8 @@ private Map<ApplicationAttemptId, EntityLogFD> copySummaryLogFDs(
506506
private Map<ApplicationAttemptId, HashMap<TimelineEntityGroupId,
507507
EntityLogFD>> copyEntityLogFDs(Map<ApplicationAttemptId,
508508
HashMap<TimelineEntityGroupId, EntityLogFD>> entityLogFDsToCopy) {
509+
entityTableCopyLocker.lock();
509510
try {
510-
entityTableCopyLocker.lock();
511511
return new HashMap<ApplicationAttemptId, HashMap<TimelineEntityGroupId,
512512
EntityLogFD>>(entityLogFDsToCopy);
513513
} finally {
@@ -521,8 +521,8 @@ private void flushSummaryFDMap(Map<ApplicationAttemptId,
521521
for (Entry<ApplicationAttemptId, EntityLogFD> logFDEntry : logFDs
522522
.entrySet()) {
523523
EntityLogFD logFD = logFDEntry.getValue();
524+
logFD.lock();
524525
try {
525-
logFD.lock();
526526
logFD.flush();
527527
} finally {
528528
logFD.unlock();
@@ -541,8 +541,8 @@ private void flushEntityFDMap(Map<ApplicationAttemptId, HashMap<
541541
for (Entry<TimelineEntityGroupId, EntityLogFD> logFDEntry
542542
: logFDMap.entrySet()) {
543543
EntityLogFD logFD = logFDEntry.getValue();
544+
logFD.lock();
544545
try {
545-
logFD.lock();
546546
logFD.flush();
547547
} finally {
548548
logFD.unlock();
@@ -567,8 +567,8 @@ public void run() {
567567

568568
private void cleanInActiveFDs() {
569569
long currentTimeStamp = Time.monotonicNow();
570+
this.domainFDLocker.lock();
570571
try {
571-
this.domainFDLocker.lock();
572572
if (domainLogFD != null) {
573573
if (currentTimeStamp - domainLogFD.getLastModifiedTime() >= ttl) {
574574
domainLogFD.close();
@@ -593,8 +593,8 @@ private void cleanInActiveSummaryFDsforMap(
593593
for (Entry<ApplicationAttemptId, EntityLogFD> logFDEntry : logFDs
594594
.entrySet()) {
595595
EntityLogFD logFD = logFDEntry.getValue();
596+
logFD.lock();
596597
try {
597-
logFD.lock();
598598
if (currentTimeStamp - logFD.getLastModifiedTime() >= ttl) {
599599
logFD.close();
600600
}
@@ -617,8 +617,8 @@ private void cleanInActiveEntityFDsforMap(Map<ApplicationAttemptId,
617617
for (Entry<TimelineEntityGroupId, EntityLogFD> logFDEntry
618618
: logFDMap.entrySet()) {
619619
EntityLogFD logFD = logFDEntry.getValue();
620+
logFD.lock();
620621
try {
621-
logFD.lock();
622622
if (currentTimeStamp - logFD.getLastModifiedTime() >= ttl) {
623623
logFD.close();
624624
}
@@ -644,8 +644,8 @@ public void run() {
644644
private class TimerMonitorTask extends TimerTask {
645645
@Override
646646
public void run() {
647+
timerTasksMonitorWriteLock.lock();
647648
try {
648-
timerTasksMonitorWriteLock.lock();
649649
monitorTimerTasks();
650650
} finally {
651651
timerTasksMonitorWriteLock.unlock();
@@ -691,8 +691,8 @@ private void cancelAndCloseTimerTasks() {
691691
monitorTaskTimer = null;
692692
}
693693

694+
this.domainFDLocker.lock();
694695
try {
695-
this.domainFDLocker.lock();
696696
if (domainLogFD != null) {
697697
domainLogFD.close();
698698
domainLogFD = null;
@@ -708,8 +708,8 @@ private void cancelAndCloseTimerTasks() {
708708

709709
private void closeEntityFDs(Map<ApplicationAttemptId,
710710
HashMap<TimelineEntityGroupId, EntityLogFD>> logFDs) {
711+
entityTableLocker.lock();
711712
try {
712-
entityTableLocker.lock();
713713
if (!logFDs.isEmpty()) {
714714
for (Entry<ApplicationAttemptId, HashMap<TimelineEntityGroupId,
715715
EntityLogFD>> logFDMapEntry : logFDs.entrySet()) {
@@ -734,8 +734,8 @@ private void closeEntityFDs(Map<ApplicationAttemptId,
734734

735735
private void closeSummaryFDs(
736736
Map<ApplicationAttemptId, EntityLogFD> logFDs) {
737+
summaryTableLocker.lock();
737738
try {
738-
summaryTableLocker.lock();
739739
if (!logFDs.isEmpty()) {
740740
for (Entry<ApplicationAttemptId, EntityLogFD> logFDEntry
741741
: logFDs.entrySet()) {
@@ -757,8 +757,8 @@ public void writeDomainLog(FileSystem fs, Path logPath,
757757
ObjectMapper objMapper, TimelineDomain domain,
758758
boolean isAppendSupported) throws IOException {
759759
checkAndStartTimeTasks();
760+
this.domainFDLocker.lock();
760761
try {
761-
this.domainFDLocker.lock();
762762
if (this.domainLogFD != null) {
763763
this.domainLogFD.writeDomain(domain);
764764
} else {
@@ -790,8 +790,8 @@ private void writeEntityLogs(FileSystem fs, Path logPath,
790790
if (logMapFD != null) {
791791
EntityLogFD logFD = logMapFD.get(groupId);
792792
if (logFD != null) {
793+
logFD.lock();
793794
try {
794-
logFD.lock();
795795
if (serviceStopped) {
796796
return;
797797
}
@@ -814,8 +814,8 @@ private void createEntityFDandWrite(FileSystem fs, Path logPath,
814814
TimelineEntityGroupId groupId, List<TimelineEntity> entities,
815815
boolean isAppendSupported, Map<ApplicationAttemptId, HashMap<
816816
TimelineEntityGroupId, EntityLogFD>> logFDs) throws IOException{
817+
entityTableLocker.lock();
817818
try {
818-
entityTableLocker.lock();
819819
if (serviceStopped) {
820820
return;
821821
}
@@ -828,11 +828,11 @@ private void createEntityFDandWrite(FileSystem fs, Path logPath,
828828
if (logFD == null) {
829829
logFD = new EntityLogFD(fs, logPath, objMapper, isAppendSupported);
830830
}
831+
logFD.lock();
831832
try {
832-
logFD.lock();
833833
logFD.writeEntities(entities);
834+
entityTableCopyLocker.lock();
834835
try {
835-
entityTableCopyLocker.lock();
836836
logFDMap.put(groupId, logFD);
837837
logFDs.put(attemptId, logFDMap);
838838
} finally {
@@ -862,8 +862,8 @@ private void writeSummmaryEntityLogs(FileSystem fs, Path logPath,
862862
EntityLogFD logFD = null;
863863
logFD = logFDs.get(attemptId);
864864
if (logFD != null) {
865+
logFD.lock();
865866
try {
866-
logFD.lock();
867867
if (serviceStopped) {
868868
return;
869869
}
@@ -881,20 +881,20 @@ private void createSummaryFDAndWrite(FileSystem fs, Path logPath,
881881
ObjectMapper objMapper, ApplicationAttemptId attemptId,
882882
List<TimelineEntity> entities, boolean isAppendSupported,
883883
Map<ApplicationAttemptId, EntityLogFD> logFDs) throws IOException {
884+
summaryTableLocker.lock();
884885
try {
885-
summaryTableLocker.lock();
886886
if (serviceStopped) {
887887
return;
888888
}
889889
EntityLogFD logFD = logFDs.get(attemptId);
890890
if (logFD == null) {
891891
logFD = new EntityLogFD(fs, logPath, objMapper, isAppendSupported);
892892
}
893+
logFD.lock();
893894
try {
894-
logFD.lock();
895895
logFD.writeEntities(entities);
896+
summaryTableCopyLocker.lock();
896897
try {
897-
summaryTableCopyLocker.lock();
898898
logFDs.put(attemptId, logFD);
899899
} finally {
900900
summaryTableCopyLocker.unlock();
@@ -928,12 +928,12 @@ private void createAndStartTimerTasks() {
928928
}
929929

930930
private void checkAndStartTimeTasks() {
931+
this.timerTasksMonitorReadLock.lock();
931932
try {
932-
this.timerTasksMonitorReadLock.lock();
933933
this.timeStampOfLastWrite = Time.monotonicNow();
934934
if(!timerTaskStarted) {
935+
timerTaskLocker.lock();
935936
try {
936-
timerTaskLocker.lock();
937937
if (!timerTaskStarted) {
938938
createAndStartTimerTasks();
939939
timerTaskStarted = true;

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/CommonNodeLabelsManager.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -766,8 +766,8 @@ public Map<NodeId, Set<NodeLabel>> getNodeLabelsInfo() {
766766

767767
@SuppressWarnings("unchecked")
768768
private <T> Map<NodeId, Set<T>> generateNodeLabelsInfoPerNode(Class<T> type) {
769+
readLock.lock();
769770
try {
770-
readLock.lock();
771771
Map<NodeId, Set<T>> nodeToLabels = new HashMap<>();
772772
for (Entry<String, Host> entry : nodeCollections.entrySet()) {
773773
String hostName = entry.getKey();
@@ -809,8 +809,8 @@ private <T> Map<NodeId, Set<T>> generateNodeLabelsInfoPerNode(Class<T> type) {
809809
* @return set of nodes with no labels
810810
*/
811811
public Set<NodeId> getNodesWithoutALabel() {
812+
readLock.lock();
812813
try {
813-
readLock.lock();
814814
Set<NodeId> nodes = new HashSet<>();
815815
for (Host host : nodeCollections.values()) {
816816
for (NodeId nodeId : host.nms.keySet()) {
@@ -832,8 +832,8 @@ public Set<NodeId> getNodesWithoutALabel() {
832832
* @return labels to nodes map
833833
*/
834834
public Map<String, Set<NodeId>> getLabelsToNodes() {
835+
readLock.lock();
835836
try {
836-
readLock.lock();
837837
return getLabelsToNodes(labelCollections.keySet());
838838
} finally {
839839
readLock.unlock();
@@ -848,8 +848,8 @@ public Map<String, Set<NodeId>> getLabelsToNodes() {
848848
* @return labels to nodes map
849849
*/
850850
public Map<String, Set<NodeId>> getLabelsToNodes(Set<String> labels) {
851+
readLock.lock();
851852
try {
852-
readLock.lock();
853853
Map<String, Set<NodeId>> labelsToNodes = getLabelsToNodesMapping(labels,
854854
String.class);
855855
return Collections.unmodifiableMap(labelsToNodes);
@@ -865,8 +865,8 @@ public Map<String, Set<NodeId>> getLabelsToNodes(Set<String> labels) {
865865
* @return labels to nodes map
866866
*/
867867
public Map<NodeLabel, Set<NodeId>> getLabelsInfoToNodes() {
868+
readLock.lock();
868869
try {
869-
readLock.lock();
870870
return getLabelsInfoToNodes(labelCollections.keySet());
871871
} finally {
872872
readLock.unlock();
@@ -882,8 +882,8 @@ public Map<NodeLabel, Set<NodeId>> getLabelsInfoToNodes() {
882882
* @return labels to nodes map
883883
*/
884884
public Map<NodeLabel, Set<NodeId>> getLabelsInfoToNodes(Set<String> labels) {
885+
readLock.lock();
885886
try {
886-
readLock.lock();
887887
Map<NodeLabel, Set<NodeId>> labelsToNodes = getLabelsToNodesMapping(
888888
labels, NodeLabel.class);
889889
return Collections.unmodifiableMap(labelsToNodes);
@@ -922,8 +922,8 @@ private <T> Map<T, Set<NodeId>> getLabelsToNodesMapping(Set<String> labels,
922922
* @return existing valid labels in repository
923923
*/
924924
public Set<String> getClusterNodeLabelNames() {
925+
readLock.lock();
925926
try {
926-
readLock.lock();
927927
Set<String> labels = new HashSet<String>(labelCollections.keySet());
928928
labels.remove(NO_LABEL);
929929
return Collections.unmodifiableSet(labels);
@@ -933,8 +933,8 @@ public Set<String> getClusterNodeLabelNames() {
933933
}
934934

935935
public List<NodeLabel> getClusterNodeLabels() {
936+
readLock.lock();
936937
try {
937-
readLock.lock();
938938
List<NodeLabel> nodeLabels = new ArrayList<>();
939939
for (RMNodeLabel label : labelCollections.values()) {
940940
if (!label.getLabelName().equals(NO_LABEL)) {
@@ -952,8 +952,8 @@ public boolean isExclusiveNodeLabel(String nodeLabel) throws IOException {
952952
if (nodeLabel.equals(NO_LABEL)) {
953953
return noNodeLabel.getIsExclusive();
954954
}
955+
readLock.lock();
955956
try {
956-
readLock.lock();
957957
RMNodeLabel label = labelCollections.get(nodeLabel);
958958
if (label == null) {
959959
String message =
@@ -1048,8 +1048,8 @@ protected Set<String> getLabelsByNode(NodeId nodeId, Map<String, Host> map) {
10481048
}
10491049

10501050
public Set<NodeLabel> getLabelsInfoByNode(NodeId nodeId) {
1051+
readLock.lock();
10511052
try {
1052-
readLock.lock();
10531053
Set<String> labels = getLabelsByNode(nodeId, nodeCollections);
10541054
if (labels.isEmpty()) {
10551055
return EMPTY_NODELABEL_SET;

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/NonAppendableFSNodeLabelStore.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -93,10 +93,10 @@ public void removeClusterNodeLabels(Collection<String> labels)
9393

9494
private void writeNewMirror() throws IOException {
9595
ReentrantReadWriteLock.ReadLock readLock = manager.readLock;
96+
// Acquire readlock to make sure we get cluster node labels and
97+
// node-to-labels mapping atomically.
98+
readLock.lock();
9699
try {
97-
// Acquire readlock to make sure we get cluster node labels and
98-
// node-to-labels mapping atomically.
99-
readLock.lock();
100100
// Write mirror to mirror.new.tmp file
101101
Path newTmpPath = new Path(fsWorkingPath, MIRROR_FILENAME + ".new.tmp");
102102
try (FSDataOutputStream os = fs.create(newTmpPath, true)) {

0 commit comments

Comments
 (0)