Skip to content

Commit b252181

Browse files
authored
HDDS-10804. Include only limited set of ports in Pipeline proto (#6655)
1 parent 79ca956 commit b252181

File tree

6 files changed

+71
-18
lines changed

6 files changed

+71
-18
lines changed

hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.hadoop.hdds.protocol;
2020

2121
import java.util.ArrayList;
22+
import java.util.Collections;
2223
import java.util.EnumSet;
2324
import java.util.List;
2425
import java.util.Set;
@@ -487,11 +488,24 @@ public HddsProtos.DatanodeDetailsProto getProtoBufMessage() {
487488
}
488489

489490
public HddsProtos.DatanodeDetailsProto toProto(int clientVersion) {
490-
return toProtoBuilder(clientVersion).build();
491+
return toProtoBuilder(clientVersion, Collections.emptySet()).build();
491492
}
492493

494+
public HddsProtos.DatanodeDetailsProto toProto(int clientVersion, Set<Port.Name> filterPorts) {
495+
return toProtoBuilder(clientVersion, filterPorts).build();
496+
}
497+
498+
/**
499+
* Converts the current DatanodeDetails instance into a proto {@link HddsProtos.DatanodeDetailsProto.Builder} object.
500+
*
501+
* @param clientVersion - The client version.
502+
* @param filterPorts - A set of {@link Port.Name} specifying ports to include.
503+
* If empty, all available ports will be included.
504+
* @return A {@link HddsProtos.DatanodeDetailsProto.Builder} Object.
505+
*/
506+
493507
public HddsProtos.DatanodeDetailsProto.Builder toProtoBuilder(
494-
int clientVersion) {
508+
int clientVersion, Set<Port.Name> filterPorts) {
495509

496510
HddsProtos.UUID uuid128 = HddsProtos.UUID.newBuilder()
497511
.setMostSigBits(uuid.getMostSignificantBits())
@@ -530,15 +544,25 @@ public HddsProtos.DatanodeDetailsProto.Builder toProtoBuilder(
530544
final boolean handlesUnknownPorts =
531545
ClientVersion.fromProtoValue(clientVersion)
532546
.compareTo(VERSION_HANDLES_UNKNOWN_DN_PORTS) >= 0;
547+
final int requestedPortCount = filterPorts.size();
548+
final boolean maySkip = requestedPortCount > 0;
533549
for (Port port : ports) {
534-
if (handlesUnknownPorts || Name.V0_PORTS.contains(port.getName())) {
550+
if (maySkip && !filterPorts.contains(port.getName())) {
551+
if (LOG.isDebugEnabled()) {
552+
LOG.debug("Skip adding {} port {} to proto message",
553+
port.getName(), port.getValue());
554+
}
555+
} else if (handlesUnknownPorts || Name.V0_PORTS.contains(port.getName())) {
535556
builder.addPorts(port.toProto());
536557
} else {
537558
if (LOG.isDebugEnabled()) {
538559
LOG.debug("Skip adding {} port {} to proto message for client v{}",
539-
port.getName(), port.getValue(), clientVersion);
560+
port.getName(), port.getValue(), clientVersion);
540561
}
541562
}
563+
if (maySkip && builder.getPortsCount() == requestedPortCount) {
564+
break;
565+
}
542566
}
543567

544568
builder.setCurrentVersion(currentVersion);
@@ -960,6 +984,9 @@ public enum Name {
960984
Name.values());
961985
public static final Set<Name> V0_PORTS = ImmutableSet.copyOf(
962986
EnumSet.of(STANDALONE, RATIS, REST));
987+
988+
public static final Set<Name> IO_PORTS = ImmutableSet.copyOf(
989+
EnumSet.of(STANDALONE, RATIS, RATIS_DATASTREAM));
963990
}
964991

965992
private final Name name;
@@ -1109,7 +1136,7 @@ public void setRevision(String rev) {
11091136
public HddsProtos.NetworkNode toProtobuf(
11101137
int clientVersion) {
11111138
return HddsProtos.NetworkNode.newBuilder()
1112-
.setDatanodeDetails(toProtoBuilder(clientVersion).build())
1139+
.setDatanodeDetails(toProtoBuilder(clientVersion, Collections.emptySet()).build())
11131140
.build();
11141141
}
11151142
}

hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ContainerWithPipeline.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.util.Comparator;
2222
import org.apache.commons.lang3.builder.EqualsBuilder;
2323
import org.apache.commons.lang3.builder.HashCodeBuilder;
24+
import org.apache.hadoop.hdds.protocol.DatanodeDetails.Port.Name;
2425
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
2526
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
2627
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
@@ -61,7 +62,7 @@ public HddsProtos.ContainerWithPipeline getProtobuf(int clientVersion)
6162
HddsProtos.ContainerWithPipeline.Builder builder =
6263
HddsProtos.ContainerWithPipeline.newBuilder();
6364
builder.setContainerInfo(getContainerInfo().getProtobuf())
64-
.setPipeline(getPipeline().getProtobufMessage(clientVersion));
65+
.setPipeline(getPipeline().getProtobufMessage(clientVersion, Name.IO_PORTS));
6566

6667
return builder.build();
6768
}

hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -362,12 +362,17 @@ public ReplicationConfig getReplicationConfig() {
362362

363363
public HddsProtos.Pipeline getProtobufMessage(int clientVersion)
364364
throws UnknownPipelineStateException {
365+
return getProtobufMessage(clientVersion, Collections.emptySet());
366+
}
367+
368+
public HddsProtos.Pipeline getProtobufMessage(int clientVersion, Set<DatanodeDetails.Port.Name> filterPorts)
369+
throws UnknownPipelineStateException {
365370

366371
List<HddsProtos.DatanodeDetailsProto> members = new ArrayList<>();
367372
List<Integer> memberReplicaIndexes = new ArrayList<>();
368373

369374
for (DatanodeDetails dn : nodeStatus.keySet()) {
370-
members.add(dn.toProto(clientVersion));
375+
members.add(dn.toProto(clientVersion, filterPorts));
371376
memberReplicaIndexes.add(replicaIndexes.getOrDefault(dn, 0));
372377
}
373378

hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/protocol/TestDatanodeDetails.java

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,16 @@
1717
*/
1818
package org.apache.hadoop.hdds.protocol;
1919

20+
import com.google.common.collect.ImmutableSet;
2021
import org.apache.hadoop.hdds.DatanodeVersion;
2122
import org.apache.hadoop.hdds.protocol.DatanodeDetails.Port;
23+
import org.apache.hadoop.hdds.protocol.DatanodeDetails.Port.Name;
2224
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
2325
import org.junit.jupiter.api.Test;
2426

2527
import java.util.Set;
28+
import java.util.stream.Collectors;
29+
import java.util.stream.Stream;
2630

2731
import static org.apache.hadoop.hdds.protocol.DatanodeDetails.Port.Name.ALL_PORTS;
2832
import static org.apache.hadoop.hdds.protocol.DatanodeDetails.Port.Name.V0_PORTS;
@@ -48,21 +52,36 @@ void protoIncludesNewPortsOnlyForV1() {
4852
subject.toProto(VERSION_HANDLES_UNKNOWN_DN_PORTS.toProtoValue());
4953
assertPorts(protoV1, ALL_PORTS);
5054
}
55+
@Test
56+
void testRequiredPortsProto() {
57+
DatanodeDetails subject = MockDatanodeDetails.randomDatanodeDetails();
58+
Set<Port.Name> requiredPorts = Stream.of(Port.Name.STANDALONE, Port.Name.RATIS)
59+
.collect(Collectors.toSet());
60+
HddsProtos.DatanodeDetailsProto proto =
61+
subject.toProto(subject.getCurrentVersion(), requiredPorts);
62+
assertPorts(proto, ImmutableSet.copyOf(requiredPorts));
63+
64+
HddsProtos.DatanodeDetailsProto ioPortProto =
65+
subject.toProto(subject.getCurrentVersion(), Name.IO_PORTS);
66+
assertPorts(ioPortProto, ImmutableSet.copyOf(Name.IO_PORTS));
67+
}
5168

5269
@Test
5370
public void testNewBuilderCurrentVersion() {
5471
// test that if the current version is not set (Ozone 1.4.0 and earlier),
5572
// it falls back to SEPARATE_RATIS_PORTS_AVAILABLE
5673
DatanodeDetails dn = MockDatanodeDetails.randomDatanodeDetails();
74+
Set<Port.Name> requiredPorts = Stream.of(Port.Name.STANDALONE, Port.Name.RATIS)
75+
.collect(Collectors.toSet());
5776
HddsProtos.DatanodeDetailsProto.Builder protoBuilder =
58-
dn.toProtoBuilder(DEFAULT_VERSION.toProtoValue());
77+
dn.toProtoBuilder(DEFAULT_VERSION.toProtoValue(), requiredPorts);
5978
protoBuilder.clearCurrentVersion();
6079
DatanodeDetails dn2 = DatanodeDetails.newBuilder(protoBuilder.build()).build();
6180
assertEquals(DatanodeVersion.SEPARATE_RATIS_PORTS_AVAILABLE.toProtoValue(), dn2.getCurrentVersion());
6281

6382
// test that if the current version is set, it is used
6483
protoBuilder =
65-
dn.toProtoBuilder(DEFAULT_VERSION.toProtoValue());
84+
dn.toProtoBuilder(DEFAULT_VERSION.toProtoValue(), requiredPorts);
6685
DatanodeDetails dn3 = DatanodeDetails.newBuilder(protoBuilder.build()).build();
6786
assertEquals(DatanodeVersion.CURRENT.toProtoValue(), dn3.getCurrentVersion());
6887
}

hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocolServerSideTranslatorPB.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.hadoop.hdds.annotation.InterfaceAudience;
2525
import org.apache.hadoop.hdds.client.ReplicationConfig;
2626
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
27+
import org.apache.hadoop.hdds.protocol.DatanodeDetails.Port.Name;
2728
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
2829
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos;
2930
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.AllocateBlockResponse;
@@ -216,7 +217,7 @@ public AllocateScmBlockResponseProto allocateScmBlock(
216217
for (AllocatedBlock block : allocatedBlocks) {
217218
builder.addBlocks(AllocateBlockResponse.newBuilder()
218219
.setContainerBlockID(block.getBlockID().getProtobuf())
219-
.setPipeline(block.getPipeline().getProtobufMessage(clientVersion)));
220+
.setPipeline(block.getPipeline().getProtobufMessage(clientVersion, Name.IO_PORTS)));
220221
}
221222

222223
return builder.build();

hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -409,9 +409,9 @@ public void testInstallIncrementalSnapshot(@TempDir Path tempDir)
409409

410410
// Do some transactions so that the log index increases
411411
List<String> firstKeys = writeKeysToIncreaseLogIndex(leaderRatisServer,
412-
80);
412+
100);
413413

414-
SnapshotInfo snapshotInfo2 = createOzoneSnapshot(leaderOM, "snap80");
414+
SnapshotInfo snapshotInfo2 = createOzoneSnapshot(leaderOM, "snap100");
415415
followerOM.getConfiguration().setInt(
416416
OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL,
417417
KeyManagerImpl.DISABLE_VALUE);
@@ -424,9 +424,9 @@ public void testInstallIncrementalSnapshot(@TempDir Path tempDir)
424424
}, 1000, 30_000);
425425

426426
// Get two incremental tarballs, adding new keys/snapshot for each.
427-
IncrementData firstIncrement = getNextIncrementalTarball(160, 2, leaderOM,
427+
IncrementData firstIncrement = getNextIncrementalTarball(200, 2, leaderOM,
428428
leaderRatisServer, faultInjector, followerOM, tempDir);
429-
IncrementData secondIncrement = getNextIncrementalTarball(240, 3, leaderOM,
429+
IncrementData secondIncrement = getNextIncrementalTarball(300, 3, leaderOM,
430430
leaderRatisServer, faultInjector, followerOM, tempDir);
431431

432432
// Resume the follower thread, it would download the incremental snapshot.
@@ -501,10 +501,10 @@ public void testInstallIncrementalSnapshot(@TempDir Path tempDir)
501501
assertNotNull(filesInCandidate);
502502
assertEquals(0, filesInCandidate.length);
503503

504-
checkSnapshot(leaderOM, followerOM, "snap80", firstKeys, snapshotInfo2);
505-
checkSnapshot(leaderOM, followerOM, "snap160", firstIncrement.getKeys(),
504+
checkSnapshot(leaderOM, followerOM, "snap100", firstKeys, snapshotInfo2);
505+
checkSnapshot(leaderOM, followerOM, "snap200", firstIncrement.getKeys(),
506506
firstIncrement.getSnapshotInfo());
507-
checkSnapshot(leaderOM, followerOM, "snap240", secondIncrement.getKeys(),
507+
checkSnapshot(leaderOM, followerOM, "snap300", secondIncrement.getKeys(),
508508
secondIncrement.getSnapshotInfo());
509509
assertEquals(
510510
followerOM.getOmSnapshotProvider().getInitCount(), 2,
@@ -618,7 +618,7 @@ public void testInstallIncrementalSnapshotWithFailure() throws Exception {
618618

619619
// Do some transactions so that the log index increases
620620
List<String> firstKeys = writeKeysToIncreaseLogIndex(leaderRatisServer,
621-
80);
621+
100);
622622

623623
// Start the inactive OM. Checkpoint installation will happen spontaneously.
624624
cluster.startInactiveOM(followerNodeId);

0 commit comments

Comments
 (0)