Skip to content

Commit 7e10e84

Browse files
juncevichsarvekshayr
authored andcommitted
HDDS-11394. Fix pipeline close --all command (apache#7138)
1 parent 7221c80 commit 7e10e84

2 files changed

Lines changed: 192 additions & 4 deletions

File tree

hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/pipeline/ClosePipelineSubcommand.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -59,12 +59,22 @@ public void execute(ScmClient scmClient) throws IOException {
5959

6060
List<Pipeline> pipelineList = new ArrayList<>();
6161
Predicate<? super Pipeline> predicate = replicationFilter.orElse(null);
62-
for (Pipeline pipeline : scmClient.listPipelines()) {
63-
boolean filterPassed = (predicate != null) && predicate.test(pipeline);
64-
if (pipeline.getPipelineState() != Pipeline.PipelineState.CLOSED && filterPassed) {
65-
pipelineList.add(pipeline);
62+
List<Pipeline> pipelines = scmClient.listPipelines();
63+
if (predicate == null) {
64+
for (Pipeline pipeline : pipelines) {
65+
if (pipeline.getPipelineState() != Pipeline.PipelineState.CLOSED) {
66+
pipelineList.add(pipeline);
67+
}
68+
}
69+
} else {
70+
for (Pipeline pipeline : pipelines) {
71+
boolean filterPassed = predicate.test(pipeline);
72+
if (pipeline.getPipelineState() != Pipeline.PipelineState.CLOSED && filterPassed) {
73+
pipelineList.add(pipeline);
74+
}
6675
}
6776
}
77+
6878
System.out.println("Sending close command for " + pipelineList.size() + " pipelines...");
6979
pipelineList.forEach(pipeline -> {
7080
try {
Lines changed: 178 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,178 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hdds.scm.cli.pipeline;
19+
20+
import org.apache.hadoop.hdds.client.ECReplicationConfig;
21+
import org.apache.hadoop.hdds.client.RatisReplicationConfig;
22+
import org.apache.hadoop.hdds.client.ReplicationConfig;
23+
import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
24+
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
25+
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
26+
import org.apache.hadoop.hdds.scm.client.ScmClient;
27+
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
28+
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
29+
import org.junit.jupiter.api.AfterEach;
30+
import org.junit.jupiter.api.BeforeEach;
31+
import org.junit.jupiter.params.ParameterizedTest;
32+
import org.junit.jupiter.params.provider.Arguments;
33+
import org.junit.jupiter.params.provider.MethodSource;
34+
import picocli.CommandLine;
35+
36+
import java.io.ByteArrayOutputStream;
37+
import java.io.IOException;
38+
import java.io.PrintStream;
39+
import java.nio.charset.StandardCharsets;
40+
import java.util.ArrayList;
41+
import java.util.List;
42+
import java.util.UUID;
43+
import java.util.stream.Stream;
44+
45+
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.ONE;
46+
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.THREE;
47+
import static org.junit.jupiter.api.Assertions.assertEquals;
48+
import static org.junit.jupiter.params.provider.Arguments.arguments;
49+
import static org.mockito.Mockito.mock;
50+
import static org.mockito.Mockito.when;
51+
52+
/**
53+
* Tests for the ClosePipelineSubcommand class.
54+
*/
55+
class TestClosePipelinesSubCommand {
56+
57+
private static final String DEFAULT_ENCODING = StandardCharsets.UTF_8.name();
58+
private final ByteArrayOutputStream outContent = new ByteArrayOutputStream();
59+
private final ByteArrayOutputStream errContent = new ByteArrayOutputStream();
60+
private final PrintStream originalOut = System.out;
61+
private final PrintStream originalErr = System.err;
62+
private ClosePipelineSubcommand cmd;
63+
private ScmClient scmClient;
64+
65+
public static Stream<Arguments> values() {
66+
return Stream.of(
67+
arguments(
68+
new String[]{"--all"},
69+
"Sending close command for 2 pipelines...\n",
70+
"with empty parameters"
71+
),
72+
arguments(
73+
new String[]{"--all", "-ffc", "THREE"},
74+
"Sending close command for 1 pipelines...\n",
75+
"by filter factor, opened"
76+
),
77+
arguments(
78+
new String[]{"--all", "-ffc", "ONE"},
79+
"Sending close command for 0 pipelines...\n",
80+
"by filter factor, closed"
81+
),
82+
arguments(
83+
new String[]{"--all", "-r", "rs-3-2-1024k", "-t", "EC"},
84+
"Sending close command for 1 pipelines...\n",
85+
"by replication and type, opened"
86+
),
87+
arguments(
88+
new String[]{"--all", "-r", "rs-6-3-1024k", "-t", "EC"},
89+
"Sending close command for 0 pipelines...\n",
90+
"by replication and type, closed"
91+
),
92+
arguments(
93+
new String[]{"--all", "-t", "EC"},
94+
"Sending close command for 1 pipelines...\n",
95+
"by type, opened"
96+
),
97+
arguments(
98+
new String[]{"--all", "-t", "RS"},
99+
"Sending close command for 0 pipelines...\n",
100+
"by type, closed"
101+
)
102+
);
103+
}
104+
105+
@BeforeEach
106+
public void setup() throws IOException {
107+
cmd = new ClosePipelineSubcommand();
108+
System.setOut(new PrintStream(outContent, false, DEFAULT_ENCODING));
109+
System.setErr(new PrintStream(errContent, false, DEFAULT_ENCODING));
110+
111+
scmClient = mock(ScmClient.class);
112+
when(scmClient.listPipelines()).thenAnswer(invocation -> createPipelines());
113+
}
114+
115+
@AfterEach
116+
public void tearDown() {
117+
System.setOut(originalOut);
118+
System.setErr(originalErr);
119+
}
120+
121+
@ParameterizedTest(name = "{index}. {2}")
122+
@MethodSource("values")
123+
void testCloseAllPipelines(String[] commands, String expectedOutput, String testName) throws IOException {
124+
CommandLine c = new CommandLine(cmd);
125+
c.parseArgs(commands);
126+
cmd.execute(scmClient);
127+
assertEquals(expectedOutput, outContent.toString(DEFAULT_ENCODING));
128+
}
129+
130+
private List<Pipeline> createPipelines() {
131+
List<Pipeline> pipelines = new ArrayList<>();
132+
pipelines.add(createPipeline(StandaloneReplicationConfig.getInstance(ONE),
133+
Pipeline.PipelineState.CLOSED));
134+
pipelines.add(createPipeline(RatisReplicationConfig.getInstance(THREE),
135+
Pipeline.PipelineState.OPEN));
136+
pipelines.add(createPipeline(RatisReplicationConfig.getInstance(THREE),
137+
Pipeline.PipelineState.CLOSED));
138+
139+
pipelines.add(createPipeline(
140+
new ECReplicationConfig(3, 2), Pipeline.PipelineState.OPEN));
141+
pipelines.add(createPipeline(
142+
new ECReplicationConfig(3, 2), Pipeline.PipelineState.CLOSED));
143+
pipelines.add(createPipeline(
144+
new ECReplicationConfig(6, 3), Pipeline.PipelineState.CLOSED));
145+
pipelines.add(createPipeline(
146+
RatisReplicationConfig.getInstance(THREE), Pipeline.PipelineState.CLOSED));
147+
return pipelines;
148+
}
149+
150+
private Pipeline createPipeline(ReplicationConfig repConfig,
151+
Pipeline.PipelineState state) {
152+
return new Pipeline.Builder()
153+
.setId(PipelineID.randomId())
154+
.setCreateTimestamp(System.currentTimeMillis())
155+
.setState(state)
156+
.setReplicationConfig(repConfig)
157+
.setNodes(createDatanodeDetails(1))
158+
.build();
159+
}
160+
161+
private List<DatanodeDetails> createDatanodeDetails(int count) {
162+
List<DatanodeDetails> dns = new ArrayList<>();
163+
for (int i = 0; i < count; i++) {
164+
HddsProtos.DatanodeDetailsProto dnd =
165+
HddsProtos.DatanodeDetailsProto.newBuilder()
166+
.setHostName("host" + i)
167+
.setIpAddress("1.2.3." + i + 1)
168+
.setNetworkLocation("/default")
169+
.setNetworkName("host" + i)
170+
.addPorts(HddsProtos.Port.newBuilder()
171+
.setName("ratis").setValue(5678).build())
172+
.setUuid(UUID.randomUUID().toString())
173+
.build();
174+
dns.add(DatanodeDetails.getFromProtoBuf(dnd));
175+
}
176+
return dns;
177+
}
178+
}

0 commit comments

Comments
 (0)