This is an automated email from the ASF dual-hosted git repository.
adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new b25218172d HDDS-10804. Include only limited set of ports in Pipeline
proto (#6655)
b25218172d is described below
commit b25218172df8cc9fae2df771bfbe5e985fd07a58
Author: XiChen <[email protected]>
AuthorDate: Mon Nov 18 15:23:41 2024 +0800
HDDS-10804. Include only limited set of ports in Pipeline proto (#6655)
---
.../hadoop/hdds/protocol/DatanodeDetails.java | 37 +++++++++++++++++++---
.../common/helpers/ContainerWithPipeline.java | 3 +-
.../apache/hadoop/hdds/scm/pipeline/Pipeline.java | 7 +++-
.../hadoop/hdds/protocol/TestDatanodeDetails.java | 23 ++++++++++++--
...lockLocationProtocolServerSideTranslatorPB.java | 3 +-
.../hadoop/ozone/om/TestOMRatisSnapshots.java | 16 +++++-----
6 files changed, 71 insertions(+), 18 deletions(-)
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java
index f762b4cc70..f3137749b9 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.hdds.protocol;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.Set;
@@ -487,11 +488,24 @@ public class DatanodeDetails extends NodeImpl implements
}
public HddsProtos.DatanodeDetailsProto toProto(int clientVersion) {
- return toProtoBuilder(clientVersion).build();
+ return toProtoBuilder(clientVersion, Collections.emptySet()).build();
}
+ public HddsProtos.DatanodeDetailsProto toProto(int clientVersion,
Set<Port.Name> filterPorts) {
+ return toProtoBuilder(clientVersion, filterPorts).build();
+ }
+
+ /**
+ * Converts the current DatanodeDetails instance into a proto {@link
HddsProtos.DatanodeDetailsProto.Builder} object.
+ *
+ * @param clientVersion - The client version.
+ * @param filterPorts - A set of {@link Port.Name} specifying ports to
include.
+ * If empty, all available ports will be included.
+ * @return A {@link HddsProtos.DatanodeDetailsProto.Builder} Object.
+ */
+
public HddsProtos.DatanodeDetailsProto.Builder toProtoBuilder(
- int clientVersion) {
+ int clientVersion, Set<Port.Name> filterPorts) {
HddsProtos.UUID uuid128 = HddsProtos.UUID.newBuilder()
.setMostSigBits(uuid.getMostSignificantBits())
@@ -530,15 +544,25 @@ public class DatanodeDetails extends NodeImpl implements
final boolean handlesUnknownPorts =
ClientVersion.fromProtoValue(clientVersion)
.compareTo(VERSION_HANDLES_UNKNOWN_DN_PORTS) >= 0;
+ final int requestedPortCount = filterPorts.size();
+ final boolean maySkip = requestedPortCount > 0;
for (Port port : ports) {
- if (handlesUnknownPorts || Name.V0_PORTS.contains(port.getName())) {
+ if (maySkip && !filterPorts.contains(port.getName())) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Skip adding {} port {} to proto message",
+ port.getName(), port.getValue());
+ }
+ } else if (handlesUnknownPorts ||
Name.V0_PORTS.contains(port.getName())) {
builder.addPorts(port.toProto());
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Skip adding {} port {} to proto message for client v{}",
- port.getName(), port.getValue(), clientVersion);
+ port.getName(), port.getValue(), clientVersion);
}
}
+ if (maySkip && builder.getPortsCount() == requestedPortCount) {
+ break;
+ }
}
builder.setCurrentVersion(currentVersion);
@@ -960,6 +984,9 @@ public class DatanodeDetails extends NodeImpl implements
Name.values());
public static final Set<Name> V0_PORTS = ImmutableSet.copyOf(
EnumSet.of(STANDALONE, RATIS, REST));
+
+ public static final Set<Name> IO_PORTS = ImmutableSet.copyOf(
+ EnumSet.of(STANDALONE, RATIS, RATIS_DATASTREAM));
}
private final Name name;
@@ -1109,7 +1136,7 @@ public class DatanodeDetails extends NodeImpl implements
public HddsProtos.NetworkNode toProtobuf(
int clientVersion) {
return HddsProtos.NetworkNode.newBuilder()
- .setDatanodeDetails(toProtoBuilder(clientVersion).build())
+ .setDatanodeDetails(toProtoBuilder(clientVersion,
Collections.emptySet()).build())
.build();
}
}
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ContainerWithPipeline.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ContainerWithPipeline.java
index ac72dc9422..f4c9a5dbda 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ContainerWithPipeline.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ContainerWithPipeline.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hdds.scm.container.common.helpers;
import java.util.Comparator;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails.Port.Name;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
@@ -61,7 +62,7 @@ public class ContainerWithPipeline implements
Comparator<ContainerWithPipeline>,
HddsProtos.ContainerWithPipeline.Builder builder =
HddsProtos.ContainerWithPipeline.newBuilder();
builder.setContainerInfo(getContainerInfo().getProtobuf())
- .setPipeline(getPipeline().getProtobufMessage(clientVersion));
+ .setPipeline(getPipeline().getProtobufMessage(clientVersion,
Name.IO_PORTS));
return builder.build();
}
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java
index 6c5b4aff57..6e72537367 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java
@@ -362,12 +362,17 @@ public final class Pipeline {
public HddsProtos.Pipeline getProtobufMessage(int clientVersion)
throws UnknownPipelineStateException {
+ return getProtobufMessage(clientVersion, Collections.emptySet());
+ }
+
+ public HddsProtos.Pipeline getProtobufMessage(int clientVersion,
Set<DatanodeDetails.Port.Name> filterPorts)
+ throws UnknownPipelineStateException {
List<HddsProtos.DatanodeDetailsProto> members = new ArrayList<>();
List<Integer> memberReplicaIndexes = new ArrayList<>();
for (DatanodeDetails dn : nodeStatus.keySet()) {
- members.add(dn.toProto(clientVersion));
+ members.add(dn.toProto(clientVersion, filterPorts));
memberReplicaIndexes.add(replicaIndexes.getOrDefault(dn, 0));
}
diff --git
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/protocol/TestDatanodeDetails.java
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/protocol/TestDatanodeDetails.java
index aeb1e207e7..78465fd281 100644
---
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/protocol/TestDatanodeDetails.java
+++
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/protocol/TestDatanodeDetails.java
@@ -17,12 +17,16 @@
*/
package org.apache.hadoop.hdds.protocol;
+import com.google.common.collect.ImmutableSet;
import org.apache.hadoop.hdds.DatanodeVersion;
import org.apache.hadoop.hdds.protocol.DatanodeDetails.Port;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails.Port.Name;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.junit.jupiter.api.Test;
import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
import static
org.apache.hadoop.hdds.protocol.DatanodeDetails.Port.Name.ALL_PORTS;
import static
org.apache.hadoop.hdds.protocol.DatanodeDetails.Port.Name.V0_PORTS;
@@ -48,21 +52,36 @@ public class TestDatanodeDetails {
subject.toProto(VERSION_HANDLES_UNKNOWN_DN_PORTS.toProtoValue());
assertPorts(protoV1, ALL_PORTS);
}
+ @Test
+ void testRequiredPortsProto() {
+ DatanodeDetails subject = MockDatanodeDetails.randomDatanodeDetails();
+ Set<Port.Name> requiredPorts = Stream.of(Port.Name.STANDALONE,
Port.Name.RATIS)
+ .collect(Collectors.toSet());
+ HddsProtos.DatanodeDetailsProto proto =
+ subject.toProto(subject.getCurrentVersion(), requiredPorts);
+ assertPorts(proto, ImmutableSet.copyOf(requiredPorts));
+
+ HddsProtos.DatanodeDetailsProto ioPortProto =
+ subject.toProto(subject.getCurrentVersion(), Name.IO_PORTS);
+ assertPorts(ioPortProto, ImmutableSet.copyOf(Name.IO_PORTS));
+ }
@Test
public void testNewBuilderCurrentVersion() {
// test that if the current version is not set (Ozone 1.4.0 and earlier),
// it falls back to SEPARATE_RATIS_PORTS_AVAILABLE
DatanodeDetails dn = MockDatanodeDetails.randomDatanodeDetails();
+ Set<Port.Name> requiredPorts = Stream.of(Port.Name.STANDALONE,
Port.Name.RATIS)
+ .collect(Collectors.toSet());
HddsProtos.DatanodeDetailsProto.Builder protoBuilder =
- dn.toProtoBuilder(DEFAULT_VERSION.toProtoValue());
+ dn.toProtoBuilder(DEFAULT_VERSION.toProtoValue(), requiredPorts);
protoBuilder.clearCurrentVersion();
DatanodeDetails dn2 =
DatanodeDetails.newBuilder(protoBuilder.build()).build();
assertEquals(DatanodeVersion.SEPARATE_RATIS_PORTS_AVAILABLE.toProtoValue(),
dn2.getCurrentVersion());
// test that if the current version is set, it is used
protoBuilder =
- dn.toProtoBuilder(DEFAULT_VERSION.toProtoValue());
+ dn.toProtoBuilder(DEFAULT_VERSION.toProtoValue(), requiredPorts);
DatanodeDetails dn3 =
DatanodeDetails.newBuilder(protoBuilder.build()).build();
assertEquals(DatanodeVersion.CURRENT.toProtoValue(),
dn3.getCurrentVersion());
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocolServerSideTranslatorPB.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocolServerSideTranslatorPB.java
index e77e2aebb3..c1431845ce 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocolServerSideTranslatorPB.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocolServerSideTranslatorPB.java
@@ -24,6 +24,7 @@ import java.util.stream.Collectors;
import org.apache.hadoop.hdds.annotation.InterfaceAudience;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails.Port.Name;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos;
import
org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.AllocateBlockResponse;
@@ -216,7 +217,7 @@ public final class
ScmBlockLocationProtocolServerSideTranslatorPB
for (AllocatedBlock block : allocatedBlocks) {
builder.addBlocks(AllocateBlockResponse.newBuilder()
.setContainerBlockID(block.getBlockID().getProtobuf())
- .setPipeline(block.getPipeline().getProtobufMessage(clientVersion)));
+ .setPipeline(block.getPipeline().getProtobufMessage(clientVersion,
Name.IO_PORTS)));
}
return builder.build();
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java
index bd5046bfc0..f739472989 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java
@@ -409,9 +409,9 @@ public class TestOMRatisSnapshots {
// Do some transactions so that the log index increases
List<String> firstKeys = writeKeysToIncreaseLogIndex(leaderRatisServer,
- 80);
+ 100);
- SnapshotInfo snapshotInfo2 = createOzoneSnapshot(leaderOM, "snap80");
+ SnapshotInfo snapshotInfo2 = createOzoneSnapshot(leaderOM, "snap100");
followerOM.getConfiguration().setInt(
OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL,
KeyManagerImpl.DISABLE_VALUE);
@@ -424,9 +424,9 @@ public class TestOMRatisSnapshots {
}, 1000, 30_000);
// Get two incremental tarballs, adding new keys/snapshot for each.
- IncrementData firstIncrement = getNextIncrementalTarball(160, 2, leaderOM,
+ IncrementData firstIncrement = getNextIncrementalTarball(200, 2, leaderOM,
leaderRatisServer, faultInjector, followerOM, tempDir);
- IncrementData secondIncrement = getNextIncrementalTarball(240, 3, leaderOM,
+ IncrementData secondIncrement = getNextIncrementalTarball(300, 3, leaderOM,
leaderRatisServer, faultInjector, followerOM, tempDir);
// Resume the follower thread, it would download the incremental snapshot.
@@ -501,10 +501,10 @@ public class TestOMRatisSnapshots {
assertNotNull(filesInCandidate);
assertEquals(0, filesInCandidate.length);
- checkSnapshot(leaderOM, followerOM, "snap80", firstKeys, snapshotInfo2);
- checkSnapshot(leaderOM, followerOM, "snap160", firstIncrement.getKeys(),
+ checkSnapshot(leaderOM, followerOM, "snap100", firstKeys, snapshotInfo2);
+ checkSnapshot(leaderOM, followerOM, "snap200", firstIncrement.getKeys(),
firstIncrement.getSnapshotInfo());
- checkSnapshot(leaderOM, followerOM, "snap240", secondIncrement.getKeys(),
+ checkSnapshot(leaderOM, followerOM, "snap300", secondIncrement.getKeys(),
secondIncrement.getSnapshotInfo());
assertEquals(
followerOM.getOmSnapshotProvider().getInitCount(), 2,
@@ -618,7 +618,7 @@ public class TestOMRatisSnapshots {
// Do some transactions so that the log index increases
List<String> firstKeys = writeKeysToIncreaseLogIndex(leaderRatisServer,
- 80);
+ 100);
// Start the inactive OM. Checkpoint installation will happen
spontaneously.
cluster.startInactiveOM(followerNodeId);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]