This is an automated email from the ASF dual-hosted git repository.

adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new b25218172d HDDS-10804. Include only limited set of ports in Pipeline 
proto (#6655)
b25218172d is described below

commit b25218172df8cc9fae2df771bfbe5e985fd07a58
Author: XiChen <[email protected]>
AuthorDate: Mon Nov 18 15:23:41 2024 +0800

    HDDS-10804. Include only limited set of ports in Pipeline proto (#6655)
---
 .../hadoop/hdds/protocol/DatanodeDetails.java      | 37 +++++++++++++++++++---
 .../common/helpers/ContainerWithPipeline.java      |  3 +-
 .../apache/hadoop/hdds/scm/pipeline/Pipeline.java  |  7 +++-
 .../hadoop/hdds/protocol/TestDatanodeDetails.java  | 23 ++++++++++++--
 ...lockLocationProtocolServerSideTranslatorPB.java |  3 +-
 .../hadoop/ozone/om/TestOMRatisSnapshots.java      | 16 +++++-----
 6 files changed, 71 insertions(+), 18 deletions(-)

diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java
index f762b4cc70..f3137749b9 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hdds.protocol;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.EnumSet;
 import java.util.List;
 import java.util.Set;
@@ -487,11 +488,24 @@ public class DatanodeDetails extends NodeImpl implements
   }
 
   public HddsProtos.DatanodeDetailsProto toProto(int clientVersion) {
-    return toProtoBuilder(clientVersion).build();
+    return toProtoBuilder(clientVersion, Collections.emptySet()).build();
   }
 
+  public HddsProtos.DatanodeDetailsProto toProto(int clientVersion, 
Set<Port.Name> filterPorts) {
+    return toProtoBuilder(clientVersion, filterPorts).build();
+  }
+
+  /**
+   * Converts the current DatanodeDetails instance into a proto {@link 
HddsProtos.DatanodeDetailsProto.Builder} object.
+   *
+   * @param clientVersion - The client version.
+   * @param filterPorts   - A set of {@link Port.Name} specifying ports to 
include.
+   *                        If empty, all available ports will be included.
+   * @return A {@link HddsProtos.DatanodeDetailsProto.Builder} Object.
+   */
+
   public HddsProtos.DatanodeDetailsProto.Builder toProtoBuilder(
-      int clientVersion) {
+      int clientVersion, Set<Port.Name> filterPorts) {
 
     HddsProtos.UUID uuid128 = HddsProtos.UUID.newBuilder()
         .setMostSigBits(uuid.getMostSignificantBits())
@@ -530,15 +544,25 @@ public class DatanodeDetails extends NodeImpl implements
     final boolean handlesUnknownPorts =
         ClientVersion.fromProtoValue(clientVersion)
         .compareTo(VERSION_HANDLES_UNKNOWN_DN_PORTS) >= 0;
+    final int requestedPortCount = filterPorts.size();
+    final boolean maySkip = requestedPortCount > 0;
     for (Port port : ports) {
-      if (handlesUnknownPorts || Name.V0_PORTS.contains(port.getName())) {
+      if (maySkip && !filterPorts.contains(port.getName())) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Skip adding {} port {} to proto message",
+                  port.getName(), port.getValue());
+        }
+      } else if (handlesUnknownPorts || 
Name.V0_PORTS.contains(port.getName())) {
         builder.addPorts(port.toProto());
       } else {
         if (LOG.isDebugEnabled()) {
           LOG.debug("Skip adding {} port {} to proto message for client v{}",
-              port.getName(), port.getValue(), clientVersion);
+                  port.getName(), port.getValue(), clientVersion);
         }
       }
+      if (maySkip && builder.getPortsCount() == requestedPortCount) {
+        break;
+      }
     }
 
     builder.setCurrentVersion(currentVersion);
@@ -960,6 +984,9 @@ public class DatanodeDetails extends NodeImpl implements
           Name.values());
       public static final Set<Name> V0_PORTS = ImmutableSet.copyOf(
           EnumSet.of(STANDALONE, RATIS, REST));
+
+      public static final Set<Name> IO_PORTS = ImmutableSet.copyOf(
+          EnumSet.of(STANDALONE, RATIS, RATIS_DATASTREAM));
     }
 
     private final Name name;
@@ -1109,7 +1136,7 @@ public class DatanodeDetails extends NodeImpl implements
   public HddsProtos.NetworkNode toProtobuf(
       int clientVersion) {
     return HddsProtos.NetworkNode.newBuilder()
-        .setDatanodeDetails(toProtoBuilder(clientVersion).build())
+        .setDatanodeDetails(toProtoBuilder(clientVersion, 
Collections.emptySet()).build())
         .build();
   }
 }
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ContainerWithPipeline.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ContainerWithPipeline.java
index ac72dc9422..f4c9a5dbda 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ContainerWithPipeline.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ContainerWithPipeline.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hdds.scm.container.common.helpers;
 import java.util.Comparator;
 import org.apache.commons.lang3.builder.EqualsBuilder;
 import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails.Port.Name;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
@@ -61,7 +62,7 @@ public class ContainerWithPipeline implements 
Comparator<ContainerWithPipeline>,
     HddsProtos.ContainerWithPipeline.Builder builder =
         HddsProtos.ContainerWithPipeline.newBuilder();
     builder.setContainerInfo(getContainerInfo().getProtobuf())
-        .setPipeline(getPipeline().getProtobufMessage(clientVersion));
+        .setPipeline(getPipeline().getProtobufMessage(clientVersion, 
Name.IO_PORTS));
 
     return builder.build();
   }
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java
index 6c5b4aff57..6e72537367 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java
@@ -362,12 +362,17 @@ public final class Pipeline {
 
   public HddsProtos.Pipeline getProtobufMessage(int clientVersion)
       throws UnknownPipelineStateException {
+    return getProtobufMessage(clientVersion, Collections.emptySet());
+  }
+
+  public HddsProtos.Pipeline getProtobufMessage(int clientVersion, 
Set<DatanodeDetails.Port.Name> filterPorts)
+      throws UnknownPipelineStateException {
 
     List<HddsProtos.DatanodeDetailsProto> members = new ArrayList<>();
     List<Integer> memberReplicaIndexes = new ArrayList<>();
 
     for (DatanodeDetails dn : nodeStatus.keySet()) {
-      members.add(dn.toProto(clientVersion));
+      members.add(dn.toProto(clientVersion, filterPorts));
       memberReplicaIndexes.add(replicaIndexes.getOrDefault(dn, 0));
     }
 
diff --git 
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/protocol/TestDatanodeDetails.java
 
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/protocol/TestDatanodeDetails.java
index aeb1e207e7..78465fd281 100644
--- 
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/protocol/TestDatanodeDetails.java
+++ 
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/protocol/TestDatanodeDetails.java
@@ -17,12 +17,16 @@
  */
 package org.apache.hadoop.hdds.protocol;
 
+import com.google.common.collect.ImmutableSet;
 import org.apache.hadoop.hdds.DatanodeVersion;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails.Port;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails.Port.Name;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.junit.jupiter.api.Test;
 
 import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static 
org.apache.hadoop.hdds.protocol.DatanodeDetails.Port.Name.ALL_PORTS;
 import static 
org.apache.hadoop.hdds.protocol.DatanodeDetails.Port.Name.V0_PORTS;
@@ -48,21 +52,36 @@ public class TestDatanodeDetails {
         subject.toProto(VERSION_HANDLES_UNKNOWN_DN_PORTS.toProtoValue());
     assertPorts(protoV1, ALL_PORTS);
   }
+  @Test
+  void testRequiredPortsProto() {
+    DatanodeDetails subject = MockDatanodeDetails.randomDatanodeDetails();
+    Set<Port.Name> requiredPorts = Stream.of(Port.Name.STANDALONE, 
Port.Name.RATIS)
+        .collect(Collectors.toSet());
+    HddsProtos.DatanodeDetailsProto proto =
+        subject.toProto(subject.getCurrentVersion(), requiredPorts);
+    assertPorts(proto, ImmutableSet.copyOf(requiredPorts));
+
+    HddsProtos.DatanodeDetailsProto ioPortProto =
+        subject.toProto(subject.getCurrentVersion(), Name.IO_PORTS);
+    assertPorts(ioPortProto, ImmutableSet.copyOf(Name.IO_PORTS));
+  }
 
   @Test
   public void testNewBuilderCurrentVersion() {
     // test that if the current version is not set (Ozone 1.4.0 and earlier),
     // it falls back to SEPARATE_RATIS_PORTS_AVAILABLE
     DatanodeDetails dn = MockDatanodeDetails.randomDatanodeDetails();
+    Set<Port.Name> requiredPorts = Stream.of(Port.Name.STANDALONE, 
Port.Name.RATIS)
+        .collect(Collectors.toSet());
     HddsProtos.DatanodeDetailsProto.Builder protoBuilder =
-        dn.toProtoBuilder(DEFAULT_VERSION.toProtoValue());
+        dn.toProtoBuilder(DEFAULT_VERSION.toProtoValue(), requiredPorts);
     protoBuilder.clearCurrentVersion();
     DatanodeDetails dn2 = 
DatanodeDetails.newBuilder(protoBuilder.build()).build();
     
assertEquals(DatanodeVersion.SEPARATE_RATIS_PORTS_AVAILABLE.toProtoValue(), 
dn2.getCurrentVersion());
 
     // test that if the current version is set, it is used
     protoBuilder =
-        dn.toProtoBuilder(DEFAULT_VERSION.toProtoValue());
+        dn.toProtoBuilder(DEFAULT_VERSION.toProtoValue(), requiredPorts);
     DatanodeDetails dn3 = 
DatanodeDetails.newBuilder(protoBuilder.build()).build();
     assertEquals(DatanodeVersion.CURRENT.toProtoValue(), 
dn3.getCurrentVersion());
   }
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocolServerSideTranslatorPB.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocolServerSideTranslatorPB.java
index e77e2aebb3..c1431845ce 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocolServerSideTranslatorPB.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocolServerSideTranslatorPB.java
@@ -24,6 +24,7 @@ import java.util.stream.Collectors;
 import org.apache.hadoop.hdds.annotation.InterfaceAudience;
 import org.apache.hadoop.hdds.client.ReplicationConfig;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails.Port.Name;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos;
 import 
org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.AllocateBlockResponse;
@@ -216,7 +217,7 @@ public final class 
ScmBlockLocationProtocolServerSideTranslatorPB
     for (AllocatedBlock block : allocatedBlocks) {
       builder.addBlocks(AllocateBlockResponse.newBuilder()
           .setContainerBlockID(block.getBlockID().getProtobuf())
-          .setPipeline(block.getPipeline().getProtobufMessage(clientVersion)));
+          .setPipeline(block.getPipeline().getProtobufMessage(clientVersion, 
Name.IO_PORTS)));
     }
 
     return builder.build();
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java
index bd5046bfc0..f739472989 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java
@@ -409,9 +409,9 @@ public class TestOMRatisSnapshots {
 
     // Do some transactions so that the log index increases
     List<String> firstKeys = writeKeysToIncreaseLogIndex(leaderRatisServer,
-        80);
+        100);
 
-    SnapshotInfo snapshotInfo2 = createOzoneSnapshot(leaderOM, "snap80");
+    SnapshotInfo snapshotInfo2 = createOzoneSnapshot(leaderOM, "snap100");
     followerOM.getConfiguration().setInt(
         OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL,
         KeyManagerImpl.DISABLE_VALUE);
@@ -424,9 +424,9 @@ public class TestOMRatisSnapshots {
     }, 1000, 30_000);
 
     // Get two incremental tarballs, adding new keys/snapshot for each.
-    IncrementData firstIncrement = getNextIncrementalTarball(160, 2, leaderOM,
+    IncrementData firstIncrement = getNextIncrementalTarball(200, 2, leaderOM,
         leaderRatisServer, faultInjector, followerOM, tempDir);
-    IncrementData secondIncrement = getNextIncrementalTarball(240, 3, leaderOM,
+    IncrementData secondIncrement = getNextIncrementalTarball(300, 3, leaderOM,
         leaderRatisServer, faultInjector, followerOM, tempDir);
 
     // Resume the follower thread, it would download the incremental snapshot.
@@ -501,10 +501,10 @@ public class TestOMRatisSnapshots {
     assertNotNull(filesInCandidate);
     assertEquals(0, filesInCandidate.length);
 
-    checkSnapshot(leaderOM, followerOM, "snap80", firstKeys, snapshotInfo2);
-    checkSnapshot(leaderOM, followerOM, "snap160", firstIncrement.getKeys(),
+    checkSnapshot(leaderOM, followerOM, "snap100", firstKeys, snapshotInfo2);
+    checkSnapshot(leaderOM, followerOM, "snap200", firstIncrement.getKeys(),
         firstIncrement.getSnapshotInfo());
-    checkSnapshot(leaderOM, followerOM, "snap240", secondIncrement.getKeys(),
+    checkSnapshot(leaderOM, followerOM, "snap300", secondIncrement.getKeys(),
         secondIncrement.getSnapshotInfo());
     assertEquals(
         followerOM.getOmSnapshotProvider().getInitCount(), 2,
@@ -618,7 +618,7 @@ public class TestOMRatisSnapshots {
 
     // Do some transactions so that the log index increases
     List<String> firstKeys = writeKeysToIncreaseLogIndex(leaderRatisServer,
-        80);
+        100);
 
     // Start the inactive OM. Checkpoint installation will happen 
spontaneously.
     cluster.startInactiveOM(followerNodeId);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to