This is an automated email from the ASF dual-hosted git repository.

somandal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new ecb1783d1b Fix needReload to fetch status from servers in ExternalView 
(#15637)
ecb1783d1b is described below

commit ecb1783d1ba334433dda46dbc3772750a1293423
Author: Sonam Mandal <sonam.man...@startree.ai>
AuthorDate: Thu Apr 24 16:11:01 2025 -0700

    Fix needReload to fetch status from servers in ExternalView (#15637)
---
 .../core/rebalance/DefaultRebalancePreChecker.java | 26 +++-----------
 .../pinot/controller/util/TableMetadataReader.java | 42 ++++++++++++++--------
 2 files changed, 32 insertions(+), 36 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/DefaultRebalancePreChecker.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/DefaultRebalancePreChecker.java
index 7ef7d53c00..2c8812d799 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/DefaultRebalancePreChecker.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/DefaultRebalancePreChecker.java
@@ -83,8 +83,7 @@ public class DefaultRebalancePreChecker implements 
RebalancePreChecker {
 
     Map<String, RebalancePreCheckerResult> preCheckResult = new HashMap<>();
     // Check for reload status
-    preCheckResult.put(NEEDS_RELOAD_STATUS,
-        checkReloadNeededOnServers(tableNameWithType, 
preCheckContext.getCurrentAssignment(), tableRebalanceLogger));
+    preCheckResult.put(NEEDS_RELOAD_STATUS, 
checkReloadNeededOnServers(tableNameWithType, tableRebalanceLogger));
     // Check whether minimizeDataMovement is set in TableConfig
     preCheckResult.put(IS_MINIMIZE_DATA_MOVEMENT,
         checkIsMinimizeDataMovement(tableConfig, rebalanceConfig, 
tableRebalanceLogger));
@@ -114,8 +113,7 @@ public class DefaultRebalancePreChecker implements 
RebalancePreChecker {
    * TODO: Add an API to check for whether segments in deep store are up to 
date with the table configs and schema
    *       and add a pre-check here to call that API.
    */
-  private RebalancePreCheckerResult checkReloadNeededOnServers(String 
tableNameWithType,
-      Map<String, Map<String, String>> currentAssignment, Logger 
tableRebalanceLogger) {
+  private RebalancePreCheckerResult checkReloadNeededOnServers(String 
tableNameWithType, Logger tableRebalanceLogger) {
     tableRebalanceLogger.info("Fetching whether reload is needed");
     Boolean needsReload = null;
     if (_executorService == null) {
@@ -125,18 +123,12 @@ public class DefaultRebalancePreChecker implements 
RebalancePreChecker {
     try (PoolingHttpClientConnectionManager connectionManager = new 
PoolingHttpClientConnectionManager()) {
       TableMetadataReader metadataReader = new 
TableMetadataReader(_executorService, connectionManager,
           _pinotHelixResourceManager);
-      // Only send needReload request to servers that are part of the current 
assignment. The tagged server list may
-      // include new servers which are part of target assignment but not 
current assignment. needReload throws an
-      // exception for servers that don't contain segments for the given table
-      Set<String> currentlyAssignedServers = 
getCurrentlyAssignedServers(currentAssignment);
       TableMetadataReader.TableReloadJsonResponse needsReloadMetadataPair =
-          
metadataReader.getServerSetCheckSegmentsReloadMetadata(tableNameWithType, 
30_000, currentlyAssignedServers);
+          
metadataReader.getServerCheckSegmentsReloadMetadata(tableNameWithType, 30_000);
       Map<String, JsonNode> needsReloadMetadata = 
needsReloadMetadataPair.getServerReloadJsonResponses();
       int failedResponses = needsReloadMetadataPair.getNumFailedResponses();
-      tableRebalanceLogger.info(
-          "Received {} needs reload responses and {} failed responses from 
servers with "
-              + "number of servers queried: {}", needsReloadMetadata.size(), 
failedResponses,
-          currentlyAssignedServers.size());
+      tableRebalanceLogger.info("Received {} needs reload responses and {} 
failed responses from servers assigned "
+              + "to table", needsReloadMetadata.size(), failedResponses);
       needsReload = needsReloadMetadata.values().stream().anyMatch(value -> 
value.get("needReload").booleanValue());
       if (!needsReload && failedResponses > 0) {
         tableRebalanceLogger.warn(
@@ -269,14 +261,6 @@ public class DefaultRebalancePreChecker implements 
RebalancePreChecker {
     return RebalancePreCheckerResult.error("Got exception when fetching 
instance assignment, check manually");
   }
 
-  private Set<String> getCurrentlyAssignedServers(Map<String, Map<String, 
String>> currentAssignment) {
-    Set<String> servers = new HashSet<>();
-    for (Map<String, String> serverStateMap : currentAssignment.values()) {
-      servers.addAll(serverStateMap.keySet());
-    }
-    return servers;
-  }
-
   private RebalancePreCheckerResult checkDiskUtilization(Map<String, 
Map<String, String>> currentAssignment,
       Map<String, Map<String, String>> targetAssignment,
       TableSizeReader.TableSubTypeSizeDetails tableSubTypeSizeDetails, double 
threshold, boolean worstCase) {
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableMetadataReader.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableMetadataReader.java
index e6318cb05b..26958034f2 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableMetadataReader.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableMetadataReader.java
@@ -30,12 +30,14 @@ import java.util.Set;
 import java.util.concurrent.Executor;
 import java.util.stream.Collectors;
 import org.apache.hc.client5.http.io.HttpClientConnectionManager;
+import org.apache.helix.model.ExternalView;
 import org.apache.pinot.common.exception.InvalidConfigException;
 import org.apache.pinot.common.restlet.resources.TableMetadataInfo;
 import org.apache.pinot.common.restlet.resources.ValidDocIdsMetadataInfo;
 import org.apache.pinot.controller.api.resources.TableStaleSegmentResponse;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.JsonUtils;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 
@@ -60,7 +62,7 @@ public class TableMetadataReader {
   }
 
   /**
-   * Check if segments need a reload on any servers
+   * Check if segments need a reload on any servers. Server list is obtained 
from the ExternalView of the table
    * @return response containing a) number of failed responses, b) reload 
responses returned
    */
   public TableReloadJsonResponse getServerCheckSegmentsReloadMetadata(String 
tableNameWithType,
@@ -71,25 +73,35 @@ public class TableMetadataReader {
     return processSegmentMetadataReloadResponse(segmentsMetadataResponse);
   }
 
+  /**
+   * Only send needReload request to servers that are part of the 
ExternalView. The tagged server list should not be
+   * used as it may be outdated and may not handle scenarios like tiered 
storage and COMPLETED segments.
+   * needReload throws an exception for servers that don't contain segments 
for the given table
+   */
   public ServerSegmentMetadataReader.TableReloadResponse 
getReloadCheckResponses(String tableNameWithType,
       int timeoutMs) throws InvalidConfigException {
-    TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
-    List<String> serverInstances = 
_pinotHelixResourceManager.getServerInstancesForTable(tableNameWithType, 
tableType);
-    Set<String> serverInstanceSet = new HashSet<>(serverInstances);
+    ExternalView externalView = 
_pinotHelixResourceManager.getTableExternalView(tableNameWithType);
+    Set<String> serverInstanceSet = new HashSet<>();
+    if (externalView != null) {
+      serverInstanceSet = 
getCurrentlyAssignedServersFromExternalView(externalView);
+    }
     return getServerSetReloadCheckResponses(tableNameWithType, timeoutMs, 
serverInstanceSet);
   }
 
-  /**
-   * Check if segments need a reload on any servers based on a provided server 
set (useful for rebalance where the
-   * currently assigned servers may not match the currently tagged server list)
-   * @return response containing a) number of failed responses, b) reload 
responses returned
-   */
-  public TableReloadJsonResponse 
getServerSetCheckSegmentsReloadMetadata(String tableNameWithType,
-      int timeoutMs, Set<String> serverSet)
-      throws InvalidConfigException, IOException {
-    ServerSegmentMetadataReader.TableReloadResponse segmentsMetadataResponse = 
getServerSetReloadCheckResponses(
-        tableNameWithType, timeoutMs, serverSet);
-    return processSegmentMetadataReloadResponse(segmentsMetadataResponse);
+  private Set<String> getCurrentlyAssignedServersFromExternalView(ExternalView 
externalView) {
+    Map<String, Map<String, String>> assignment = 
externalView.getRecord().getMapFields();
+    Set<String> servers = new HashSet<>();
+    for (Map<String, String> serverStateMap : assignment.values()) {
+      for (Map.Entry<String, String> entry : serverStateMap.entrySet()) {
+        String state = entry.getValue();
+        // Skip adding the server if the segment is in ERROR or OFFLINE state
+        if 
(CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE.equals(state)
+            || 
CommonConstants.Helix.StateModel.SegmentStateModel.CONSUMING.equals(state)) {
+          servers.add(entry.getKey());
+        }
+      }
+    }
+    return servers;
   }
 
   public ServerSegmentMetadataReader.TableReloadResponse 
getServerSetReloadCheckResponses(String tableNameWithType,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to