Skip to content

Commit a533ab1

Browse files
recon
1 parent 6cc4db1 commit a533ab1

1 file changed

Lines changed: 50 additions & 33 deletions

File tree

hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconPipelineManager.java

Lines changed: 50 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.io.IOException;
2222
import java.time.Clock;
2323
import java.time.ZoneOffset;
24+
import java.util.ArrayList;
2425
import java.util.List;
2526
import java.util.stream.Collectors;
2627

@@ -99,35 +100,49 @@ public static ReconPipelineManager newReconPipelineManager(
99100
*/
100101
void initializePipelines(List<Pipeline> pipelinesFromScm)
101102
throws IOException {
102-
103-
104-
List<Pipeline> pipelinesInHouse = getPipelines();
105-
LOG.info("Recon has {} pipelines in house.", pipelinesInHouse.size());
106-
for (Pipeline pipeline : pipelinesFromScm) {
107-
108-
ReplicationConfig replicationConfig = pipeline.getReplicationConfig();
109-
acquireWriteLock(replicationConfig);
110-
try {
111-
// New pipeline got from SCM. Validate If it doesn't exist at Recon, try adding it.
112-
if (addPipeline(pipeline)) {
113-
LOG.info("Added new pipeline {} from SCM.", pipeline.getId());
114-
} else {
115-
LOG.warn("Pipeline {} already exists in Recon pipeline metadata.",
116-
pipeline.getId());
117-
// Recon already has this pipeline. Just update state and creation
118-
// time.
119-
getStateManager().updatePipelineState(pipeline.getId().getProtobuf(),
120-
Pipeline.PipelineState.getProtobuf(pipeline.getPipelineState()));
121-
getPipeline(pipeline.getId()).setCreationTimestamp(
122-
pipeline.getCreationTimestamp());
103+
List<HddsProtos.ReplicationType> replicationTypes = new ArrayList<>();
104+
replicationTypes.add(HddsProtos.ReplicationType.EC);
105+
replicationTypes.add(HddsProtos.ReplicationType.RATIS);
106+
107+
for (HddsProtos.ReplicationType replicationType : replicationTypes) {
108+
List<Pipeline> pipelines = pipelinesFromScm.stream().filter(
109+
p -> p.getReplicationConfig().getReplicationType()
110+
.equals(replicationType)).collect(Collectors.toList());
111+
112+
if (!pipelines.isEmpty()) {
113+
ReplicationConfig replicationConfig =
114+
pipelines.iterator().next().getReplicationConfig();
115+
116+
acquireWriteLock(replicationConfig);
117+
try {
118+
List<Pipeline> pipelinesInHouse = getPipelines().stream().filter(
119+
p -> p.getReplicationConfig().getReplicationType()
120+
.equals(replicationType)).collect(Collectors.toList());
121+
122+
LOG.info("Recon has {} pipelines in house.", pipelinesInHouse.size());
123+
for (Pipeline pipeline : pipelines) {
124+
// New pipeline got from SCM. Validate If it doesn't exist at Recon, try adding it.
125+
if (addPipeline(pipeline)) {
126+
LOG.info("Added new pipeline {} from SCM.", pipeline.getId());
127+
} else {
128+
LOG.warn("Pipeline {} already exists in Recon pipeline metadata.",
129+
pipeline.getId());
130+
// Recon already has this pipeline. Just update state and creation
131+
// time.
132+
getStateManager().updatePipelineState(
133+
pipeline.getId().getProtobuf(),
134+
Pipeline.PipelineState.getProtobuf(
135+
pipeline.getPipelineState()));
136+
getPipeline(pipeline.getId()).setCreationTimestamp(
137+
pipeline.getCreationTimestamp());
138+
}
139+
}
140+
removeInvalidPipelines(pipelines);
141+
} finally {
142+
releaseWriteLock(replicationConfig);
123143
}
124-
} finally {
125-
releaseWriteLock(replicationConfig);
126144
}
127-
128145
}
129-
removeInvalidPipelines(pipelinesFromScm);
130-
131146
}
132147

133148
public void removeInvalidPipelines(List<Pipeline> pipelinesFromScm) {
@@ -141,20 +156,22 @@ public void removeInvalidPipelines(List<Pipeline> pipelinesFromScm) {
141156
.collect(Collectors.toList());
142157
invalidPipelines.forEach(p -> {
143158
PipelineID pipelineID = p.getId();
144-
ReplicationConfig replicationConfig = p.getReplicationConfig();
145-
acquireWriteLock(replicationConfig);
146-
try {
147-
if (!p.getPipelineState().equals(CLOSED)) {
148-
getStateManager().updatePipelineState(pipelineID.getProtobuf(),
159+
if (!p.getPipelineState().equals(CLOSED)) {
160+
try {
161+
getStateManager().updatePipelineState(
162+
pipelineID.getProtobuf(),
149163
HddsProtos.PipelineState.PIPELINE_CLOSED);
164+
} catch (IOException e) {
165+
LOG.warn("Pipeline {} not found while updating state. ",
166+
p.getId(), e);
150167
}
168+
}
169+
try {
151170
LOG.info("Removing invalid pipeline {} from Recon.", pipelineID);
152171
closePipeline(p.getId());
153172
deletePipeline(p.getId());
154173
} catch (IOException e) {
155174
LOG.warn("Unable to remove pipeline {}", pipelineID, e);
156-
} finally {
157-
releaseWriteLock(replicationConfig);
158175
}
159176
});
160177
}

0 commit comments

Comments
 (0)