This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 7247da84bc When reading server to segments map, exclude OFFLINE 
segments (#11818)
7247da84bc is described below

commit 7247da84bcb51c49c8e45354d3e9c2ec05e3e8df
Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com>
AuthorDate: Mon Oct 16 19:19:18 2023 -0700

    When reading server to segments map, exclude OFFLINE segments (#11818)
---
 .../api/resources/PinotSegmentRestletResource.java |  8 ++--
 .../helix/core/PinotHelixResourceManager.java      | 52 +++++++---------------
 .../util/ConsumingSegmentInfoReader.java           |  8 ++--
 .../ConsumingSegmentInfoReaderStatelessTest.java   |  6 +--
 4 files changed, 28 insertions(+), 46 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
index 33d4b65b5d..7a5961c680 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
@@ -34,12 +34,12 @@ import io.swagger.annotations.SecurityDefinition;
 import io.swagger.annotations.SwaggerDefinition;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.TreeMap;
 import java.util.concurrent.Executor;
 import java.util.stream.Collectors;
 import javax.annotation.Nullable;
@@ -661,13 +661,13 @@ public class PinotSegmentRestletResource {
         
controllerJobZKMetadata.get(CommonConstants.ControllerJob.SEGMENT_RELOAD_JOB_SEGMENT_NAME);
     if (singleSegmentName != null) {
       // No need to query servers where this segment is not supposed to be 
hosted
-      serverToSegments = new HashMap<>();
-      List<String> segmentList = Arrays.asList(singleSegmentName);
+      serverToSegments = new TreeMap<>();
+      List<String> segmentList = Collections.singletonList(singleSegmentName);
       _pinotHelixResourceManager.getServers(tableNameWithType, 
singleSegmentName).forEach(server -> {
         serverToSegments.put(server, segmentList);
       });
     } else {
-      serverToSegments = 
_pinotHelixResourceManager.getServerToOnlineSegmentsMap(tableNameWithType);
+      serverToSegments = 
_pinotHelixResourceManager.getServerToSegmentsMap(tableNameWithType);
     }
 
     BiMap<String, String> serverEndPoints =
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index c3fde8a85d..bfb4918bf7 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -2651,29 +2651,8 @@ public class PinotHelixResourceManager {
   }
 
   /**
-   * Returns a map from server instance to list of online segments it serves 
for the given table.
-   */
-  public Map<String, List<String>> getServerToOnlineSegmentsMap(String 
tableNameWithType) {
-    Map<String, List<String>> serverToSegmentsMap = new TreeMap<>();
-    IdealState idealState = 
_helixAdmin.getResourceIdealState(_helixClusterName, tableNameWithType);
-    if (idealState == null) {
-      throw new IllegalStateException("Ideal state does not exist for table: " 
+ tableNameWithType);
-    }
-    for (Map.Entry<String, Map<String, String>> entry : 
idealState.getRecord().getMapFields().entrySet()) {
-      String segmentName = entry.getKey();
-      for (Map.Entry<String, String> e : entry.getValue().entrySet()) {
-        String server = e.getKey();
-        String status = e.getValue();
-        if (status.equals(SegmentStateModel.CONSUMING) || 
status.equals(SegmentStateModel.ONLINE)) {
-          serverToSegmentsMap.computeIfAbsent(server, key -> new 
ArrayList<>()).add(segmentName);
-        }
-      }
-    }
-    return serverToSegmentsMap;
-  }
-
-  /**
-   * Returns a map from server instance to list of segments it serves for the 
given table.
+   * Returns a map from server instance to list of segments it serves for the 
given table. Ignore OFFLINE segments from
+   * the ideal state because they are not supposed to be served.
    */
   public Map<String, List<String>> getServerToSegmentsMap(String 
tableNameWithType) {
     Map<String, List<String>> serverToSegmentsMap = new TreeMap<>();
@@ -2683,15 +2662,18 @@ public class PinotHelixResourceManager {
     }
     for (Map.Entry<String, Map<String, String>> entry : 
idealState.getRecord().getMapFields().entrySet()) {
       String segmentName = entry.getKey();
-      for (String server : entry.getValue().keySet()) {
-        serverToSegmentsMap.computeIfAbsent(server, key -> new 
ArrayList<>()).add(segmentName);
+      for (Map.Entry<String, String> instanceStateEntry : 
entry.getValue().entrySet()) {
+        if (!instanceStateEntry.getValue().equals(SegmentStateModel.OFFLINE)) {
+          serverToSegmentsMap.computeIfAbsent(instanceStateEntry.getKey(), key 
-> new ArrayList<>()).add(segmentName);
+        }
       }
     }
     return serverToSegmentsMap;
   }
 
   /**
-   * Returns a set of server instances for a given table and segment
+   * Returns a set of server instances for a given table and segment. Ignore 
OFFLINE segments from the ideal state
+   * because they are not supposed to be served.
    */
   public Set<String> getServers(String tableNameWithType, String segmentName) {
     IdealState idealState = 
_helixAdmin.getResourceIdealState(_helixClusterName, tableNameWithType);
@@ -2701,7 +2683,13 @@ public class PinotHelixResourceManager {
     Map<String, String> instanceStateMap = 
idealState.getInstanceStateMap(segmentName);
     Preconditions.checkState(instanceStateMap != null, "Segment: {} does not 
exist in the ideal state of table: {}",
         segmentName, tableNameWithType);
-    return instanceStateMap.keySet();
+    Set<String> servers = new TreeSet<>();
+    for (Map.Entry<String, String> entry : instanceStateMap.entrySet()) {
+      if (!entry.getValue().equals(SegmentStateModel.OFFLINE)) {
+        servers.add(entry.getKey());
+      }
+    }
+    return servers;
   }
 
   /**
@@ -2722,15 +2710,9 @@ public class PinotHelixResourceManager {
     return consumingSegments;
   }
 
-  /**
-   * Utility function to return set of servers corresponding to a given 
segment.
-   */
+  @Deprecated
   public Set<String> getServersForSegment(String tableNameWithType, String 
segmentName) {
-    IdealState idealState = 
_helixAdmin.getResourceIdealState(_helixClusterName, tableNameWithType);
-    if (idealState == null) {
-      throw new IllegalStateException("Ideal state does not exist for table: " 
+ tableNameWithType);
-    }
-    return new HashSet<>(idealState.getInstanceStateMap(segmentName).keySet());
+    return getServers(tableNameWithType, segmentName);
   }
 
   public synchronized Map<String, String> getSegmentsCrcForTable(String 
tableNameWithType) {
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/ConsumingSegmentInfoReader.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/ConsumingSegmentInfoReader.java
index c8ee931580..26dab01955 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/ConsumingSegmentInfoReader.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/ConsumingSegmentInfoReader.java
@@ -157,13 +157,13 @@ public class ConsumingSegmentInfoReader {
         }
 
         // Check if any responses are missing
-        Set<String> serversForSegment = 
_pinotHelixResourceManager.getServersForSegment(tableNameWithType, segmentName);
-        if (serversForSegment.size() != consumingSegmentInfoList.size()) {
+        Set<String> servers = 
_pinotHelixResourceManager.getServers(tableNameWithType, segmentName);
+        if (servers.size() != consumingSegmentInfoList.size()) {
           Set<String> serversResponded =
               consumingSegmentInfoList.stream().map(c -> 
c._serverName).collect(Collectors.toSet());
-          serversForSegment.removeAll(serversResponded);
+          servers.removeAll(serversResponded);
           String errorMessage =
-              "Not all servers responded for segment: " + segmentName + " 
Missing servers : " + serversForSegment;
+              "Not all servers responded for segment: " + segmentName + " 
Missing servers : " + servers;
           return 
TableStatus.IngestionStatus.newIngestionStatus(TableStatus.IngestionState.UNHEALTHY,
 errorMessage);
         }
 
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/ConsumingSegmentInfoReaderStatelessTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/ConsumingSegmentInfoReaderStatelessTest.java
index 3e8d935813..9c29142b46 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/ConsumingSegmentInfoReaderStatelessTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/ConsumingSegmentInfoReaderStatelessTest.java
@@ -28,10 +28,10 @@ import java.io.OutputStream;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.TreeSet;
 import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
 import java.util.stream.Collectors;
@@ -197,11 +197,11 @@ public class ConsumingSegmentInfoReaderStatelessTest {
   private void mockSetup(final String[] servers, final Set<String> 
consumingSegments)
       throws InvalidConfigException {
     
when(_helix.getServerToSegmentsMap(anyString())).thenAnswer(invocationOnMock -> 
subsetOfServerSegments(servers));
+    when(_helix.getServers(anyString(), anyString())).thenAnswer(
+        invocationOnMock -> new TreeSet<>(Arrays.asList(servers)));
     
when(_helix.getDataInstanceAdminEndpoints(ArgumentMatchers.anySet())).thenAnswer(
         invocationOnMock -> serverEndpoints(servers));
     when(_helix.getConsumingSegments(anyString())).thenAnswer(invocationOnMock 
-> consumingSegments);
-    when(_helix.getServersForSegment(anyString(), anyString())).thenAnswer(
-        invocationOnMock -> new HashSet<>(Arrays.asList(servers)));
   }
 
   private ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap testRunner(final 
String[] servers,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to