This is an automated email from the ASF dual-hosted git repository. somandal pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new ecb1783d1b Fix needReload to fetch status from servers in ExternalView (#15637) ecb1783d1b is described below commit ecb1783d1ba334433dda46dbc3772750a1293423 Author: Sonam Mandal <sonam.man...@startree.ai> AuthorDate: Thu Apr 24 16:11:01 2025 -0700 Fix needReload to fetch status from servers in ExternalView (#15637) --- .../core/rebalance/DefaultRebalancePreChecker.java | 26 +++----------- .../pinot/controller/util/TableMetadataReader.java | 42 ++++++++++++++-------- 2 files changed, 32 insertions(+), 36 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/DefaultRebalancePreChecker.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/DefaultRebalancePreChecker.java index 7ef7d53c00..2c8812d799 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/DefaultRebalancePreChecker.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/DefaultRebalancePreChecker.java @@ -83,8 +83,7 @@ public class DefaultRebalancePreChecker implements RebalancePreChecker { Map<String, RebalancePreCheckerResult> preCheckResult = new HashMap<>(); // Check for reload status - preCheckResult.put(NEEDS_RELOAD_STATUS, - checkReloadNeededOnServers(tableNameWithType, preCheckContext.getCurrentAssignment(), tableRebalanceLogger)); + preCheckResult.put(NEEDS_RELOAD_STATUS, checkReloadNeededOnServers(tableNameWithType, tableRebalanceLogger)); // Check whether minimizeDataMovement is set in TableConfig preCheckResult.put(IS_MINIMIZE_DATA_MOVEMENT, checkIsMinimizeDataMovement(tableConfig, rebalanceConfig, tableRebalanceLogger)); @@ -114,8 +113,7 @@ public class DefaultRebalancePreChecker implements RebalancePreChecker { * TODO: Add an API to check for whether segments in deep store are up to date with the table configs and schema * and add a pre-check here to call that API. */ - private RebalancePreCheckerResult checkReloadNeededOnServers(String tableNameWithType, - Map<String, Map<String, String>> currentAssignment, Logger tableRebalanceLogger) { + private RebalancePreCheckerResult checkReloadNeededOnServers(String tableNameWithType, Logger tableRebalanceLogger) { tableRebalanceLogger.info("Fetching whether reload is needed"); Boolean needsReload = null; if (_executorService == null) { @@ -125,18 +123,12 @@ public class DefaultRebalancePreChecker implements RebalancePreChecker { try (PoolingHttpClientConnectionManager connectionManager = new PoolingHttpClientConnectionManager()) { TableMetadataReader metadataReader = new TableMetadataReader(_executorService, connectionManager, _pinotHelixResourceManager); - // Only send needReload request to servers that are part of the current assignment. The tagged server list may - // include new servers which are part of target assignment but not current assignment. needReload throws an - // exception for servers that don't contain segments for the given table - Set<String> currentlyAssignedServers = getCurrentlyAssignedServers(currentAssignment); TableMetadataReader.TableReloadJsonResponse needsReloadMetadataPair = - metadataReader.getServerSetCheckSegmentsReloadMetadata(tableNameWithType, 30_000, currentlyAssignedServers); + metadataReader.getServerCheckSegmentsReloadMetadata(tableNameWithType, 30_000); Map<String, JsonNode> needsReloadMetadata = needsReloadMetadataPair.getServerReloadJsonResponses(); int failedResponses = needsReloadMetadataPair.getNumFailedResponses(); - tableRebalanceLogger.info( - "Received {} needs reload responses and {} failed responses from servers with " - + "number of servers queried: {}", needsReloadMetadata.size(), failedResponses, - currentlyAssignedServers.size()); + tableRebalanceLogger.info("Received {} needs reload responses and {} failed responses from servers assigned " + + "to table", needsReloadMetadata.size(), failedResponses); needsReload = needsReloadMetadata.values().stream().anyMatch(value -> value.get("needReload").booleanValue()); if (!needsReload && failedResponses > 0) { tableRebalanceLogger.warn( @@ -269,14 +261,6 @@ public class DefaultRebalancePreChecker implements RebalancePreChecker { return RebalancePreCheckerResult.error("Got exception when fetching instance assignment, check manually"); } - private Set<String> getCurrentlyAssignedServers(Map<String, Map<String, String>> currentAssignment) { - Set<String> servers = new HashSet<>(); - for (Map<String, String> serverStateMap : currentAssignment.values()) { - servers.addAll(serverStateMap.keySet()); - } - return servers; - } - private RebalancePreCheckerResult checkDiskUtilization(Map<String, Map<String, String>> currentAssignment, Map<String, Map<String, String>> targetAssignment, TableSizeReader.TableSubTypeSizeDetails tableSubTypeSizeDetails, double threshold, boolean worstCase) { diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableMetadataReader.java b/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableMetadataReader.java index e6318cb05b..26958034f2 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableMetadataReader.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableMetadataReader.java @@ -30,12 +30,14 @@ import java.util.Set; import java.util.concurrent.Executor; import java.util.stream.Collectors; import org.apache.hc.client5.http.io.HttpClientConnectionManager; +import org.apache.helix.model.ExternalView; import org.apache.pinot.common.exception.InvalidConfigException; import org.apache.pinot.common.restlet.resources.TableMetadataInfo; import org.apache.pinot.common.restlet.resources.ValidDocIdsMetadataInfo; import org.apache.pinot.controller.api.resources.TableStaleSegmentResponse; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.JsonUtils; import org.apache.pinot.spi.utils.builder.TableNameBuilder; @@ -60,7 +62,7 @@ public class TableMetadataReader { } /** - * Check if segments need a reload on any servers + * Check if segments need a reload on any servers. Server list is obtained from the ExternalView of the table * @return response containing a) number of failed responses, b) reload responses returned */ public TableReloadJsonResponse getServerCheckSegmentsReloadMetadata(String tableNameWithType, @@ -71,25 +73,35 @@ public class TableMetadataReader { return processSegmentMetadataReloadResponse(segmentsMetadataResponse); } + /** + * Only send needReload request to servers that are part of the ExternalView. The tagged server list should not be + * used as it may be outdated and may not handle scenarios like tiered storage and COMPLETED segments. + * needReload throws an exception for servers that don't contain segments for the given table + */ public ServerSegmentMetadataReader.TableReloadResponse getReloadCheckResponses(String tableNameWithType, int timeoutMs) throws InvalidConfigException { - TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType); - List<String> serverInstances = _pinotHelixResourceManager.getServerInstancesForTable(tableNameWithType, tableType); - Set<String> serverInstanceSet = new HashSet<>(serverInstances); + ExternalView externalView = _pinotHelixResourceManager.getTableExternalView(tableNameWithType); + Set<String> serverInstanceSet = new HashSet<>(); + if (externalView != null) { + serverInstanceSet = getCurrentlyAssignedServersFromExternalView(externalView); + } return getServerSetReloadCheckResponses(tableNameWithType, timeoutMs, serverInstanceSet); } - /** - * Check if segments need a reload on any servers based on a provided server set (useful for rebalance where the - * currently assigned servers may not match the currently tagged server list) - * @return response containing a) number of failed responses, b) reload responses returned - */ - public TableReloadJsonResponse getServerSetCheckSegmentsReloadMetadata(String tableNameWithType, - int timeoutMs, Set<String> serverSet) - throws InvalidConfigException, IOException { - ServerSegmentMetadataReader.TableReloadResponse segmentsMetadataResponse = getServerSetReloadCheckResponses( - tableNameWithType, timeoutMs, serverSet); - return processSegmentMetadataReloadResponse(segmentsMetadataResponse); + private Set<String> getCurrentlyAssignedServersFromExternalView(ExternalView externalView) { + Map<String, Map<String, String>> assignment = externalView.getRecord().getMapFields(); + Set<String> servers = new HashSet<>(); + for (Map<String, String> serverStateMap : assignment.values()) { + for (Map.Entry<String, String> entry : serverStateMap.entrySet()) { + String state = entry.getValue(); + // Skip adding the server if the segment is in ERROR or OFFLINE state + if (CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE.equals(state) + || CommonConstants.Helix.StateModel.SegmentStateModel.CONSUMING.equals(state)) { + servers.add(entry.getKey()); + } + } + } + return servers; } public ServerSegmentMetadataReader.TableReloadResponse getServerSetReloadCheckResponses(String tableNameWithType, --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org