This is an automated email from the ASF dual-hosted git repository.

somandal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 3426cb46deb [controller] Refactor segment reload APIs into dedicated 
resource class (#17016)
3426cb46deb is described below

commit 3426cb46deb7b32887cb0bdb048a5e8f2da18afe
Author: Suvodeep Pyne <[email protected]>
AuthorDate: Wed Oct 15 13:50:02 2025 -0700

    [controller] Refactor segment reload APIs into dedicated resource class 
(#17016)
    
    * [reload] Moving reload APIs to a dedicated PinotTableReloadResource class
    
    * [reload] Enhance and document reload APIs with detailed Swagger 
annotations
    
    * [reload] Remove redundant 500 error response from reload API annotations
    
    * [reload] Add ASF license headers to PinotTableReloadResource and its test 
file
    
    * [reload] Document and organize reload APIs; move duplicated annotations 
to PinotTableReloadResource
---
 .../api/resources/PinotSegmentRestletResource.java | 346 --------------
 .../api/resources/PinotTableReloadResource.java    | 504 +++++++++++++++++++++
 .../resources/PinotSegmentRestletResourceTest.java |  44 --
 .../resources/PinotTableReloadResourceTest.java    |  87 ++++
 4 files changed, 591 insertions(+), 390 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
index def5f3fa843..0aca6417f04 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
@@ -19,12 +19,10 @@
 package org.apache.pinot.controller.api.resources;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
-import com.google.common.collect.BiMap;
 import io.swagger.annotations.Api;
 import io.swagger.annotations.ApiKeyAuthDefinition;
 import io.swagger.annotations.ApiOperation;
@@ -39,7 +37,6 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -74,8 +71,6 @@ import 
org.apache.pinot.common.exception.InvalidConfigException;
 import org.apache.pinot.common.lineage.SegmentLineage;
 import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
-import 
org.apache.pinot.common.restlet.resources.ServerSegmentsReloadCheckResponse;
-import 
org.apache.pinot.common.restlet.resources.TableSegmentsReloadCheckResponse;
 import org.apache.pinot.common.utils.DatabaseUtils;
 import org.apache.pinot.common.utils.LLCSegmentName;
 import org.apache.pinot.common.utils.PauselessConsumptionUtils;
@@ -87,19 +82,15 @@ import org.apache.pinot.controller.api.access.Authenticate;
 import 
org.apache.pinot.controller.api.exception.ControllerApplicationException;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.helix.core.PinotResourceManagerResponse;
-import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobTypes;
-import org.apache.pinot.controller.util.CompletionServiceHelper;
 import org.apache.pinot.controller.util.TableMetadataReader;
 import org.apache.pinot.controller.util.TableTierReader;
 import org.apache.pinot.core.auth.Actions;
 import org.apache.pinot.core.auth.Authorize;
 import org.apache.pinot.core.auth.TargetType;
-import org.apache.pinot.segment.spi.creator.name.SegmentNameUtils;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.data.DateTimeFieldSpec;
 import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.JsonUtils;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.slf4j.Logger;
@@ -128,8 +119,6 @@ import static 
org.apache.pinot.spi.utils.CommonConstants.SWAGGER_AUTHORIZATION_K
  *   <li>
  *     POST requests:
  *     <ul>
- *       <li>"/segments/{tableName}/{segmentName}/reload": reload a 
segment</li>
- *       <li>"/segments/{tableName}/reload": reload all segments</li>
  *       <li>"/segments/{tableNameWithType}/{segmentName}/reset": reset a 
segment</li>
  *       <li>"/segments/{tableNameWithType}/reset": reset all segments</li>
  *       <li>"/segments/{tableName}/delete": delete the segments in the 
payload</li>
@@ -155,7 +144,6 @@ import static 
org.apache.pinot.spi.utils.CommonConstants.SWAGGER_AUTHORIZATION_K
  *         <ul>
  *           <li>"GET /segments/{tableName}"</li>
  *           <li>"GET /segments/{tableName}/servers"</li>
- *           <li>"POST /segments/{tableName}/reload"</li>
  *         </ul>
  *       </li>
  *       <li>
@@ -174,10 +162,6 @@ import static 
org.apache.pinot.spi.utils.CommonConstants.SWAGGER_AUTHORIZATION_K
  *       <li>"GET /tables/{tableName}/segments/crc"</li>
  *       <li>"GET /tables/{tableName}/segments/{segmentName}"</li>
  *       <li>"GET /tables/{tableName}/segments/{segmentName}/metadata"</li>
- *       <li>"GET /tables/{tableName}/segments/{segmentName}/reload"</li>
- *       <li>"POST /tables/{tableName}/segments/{segmentName}/reload"</li>
- *       <li>"GET /tables/{tableName}/segments/reload"</li>
- *       <li>"POST /tables/{tableName}/segments/reload"</li>
  *     </ul>
  *   </li>
  * </ul>
@@ -388,48 +372,6 @@ public class PinotSegmentRestletResource {
     return segmentZKMetadata != null ? segmentZKMetadata.toMap() : null;
   }
 
-  @POST
-  @Path("segments/{tableName}/{segmentName}/reload")
-  @Authorize(targetType = TargetType.TABLE, paramName = "tableName", action = 
Actions.Table.RELOAD_SEGMENT)
-  @Authenticate(AccessType.UPDATE)
-  @Produces(MediaType.APPLICATION_JSON)
-  @ApiOperation(value = "Reload a segment", notes = "Reload a segment")
-  public SuccessResponse reloadSegment(
-      @ApiParam(value = "Name of the table", required = true) 
@PathParam("tableName") String tableName,
-      @ApiParam(value = "Name of the segment", required = true) 
@PathParam("segmentName") @Encoded String segmentName,
-      @ApiParam(value = "Whether to force server to download segment") 
@QueryParam("forceDownload")
-      @DefaultValue("false") boolean forceDownload,
-      @ApiParam(value = "Name of the target instance to reload") 
@QueryParam("targetInstance") @Nullable
-      String targetInstance, @Context HttpHeaders headers) {
-    tableName = DatabaseUtils.translateTableName(tableName, headers);
-    long startTimeMs = System.currentTimeMillis();
-    segmentName = URIUtils.decode(segmentName);
-    String tableNameWithType = getExistingTable(tableName, segmentName);
-    Pair<Integer, String> msgInfo =
-        _pinotHelixResourceManager.reloadSegment(tableNameWithType, 
segmentName, forceDownload, targetInstance);
-    boolean zkJobMetaWriteSuccess = false;
-    int numReloadMsgSent = msgInfo.getLeft();
-    if (numReloadMsgSent > 0) {
-      try {
-        if 
(_pinotHelixResourceManager.addNewReloadSegmentJob(tableNameWithType, 
segmentName, targetInstance,
-            msgInfo.getRight(), startTimeMs, numReloadMsgSent)) {
-          zkJobMetaWriteSuccess = true;
-        } else {
-          LOGGER.error("Failed to add reload segment job meta into zookeeper 
for table: {}, segment: {}",
-              tableNameWithType, segmentName);
-        }
-      } catch (Exception e) {
-        LOGGER.error("Failed to add reload segment job meta into zookeeper for 
table: {}, segment: {}",
-            tableNameWithType, segmentName, e);
-      }
-      return new SuccessResponse(
-          String.format("Submitted reload job id: %s, sent %d reload messages. 
Job meta ZK storage status: %s",
-              msgInfo.getRight(), numReloadMsgSent, zkJobMetaWriteSuccess ? 
"SUCCESS" : "FAILED"));
-    }
-    throw new ControllerApplicationException(LOGGER,
-        String.format("Failed to find segment: %s in table: %s on %s", 
segmentName, tableName,
-            targetInstance == null ? "every instance" : targetInstance), 
Status.NOT_FOUND);
-  }
 
   /**
    * Helper method to find the existing table based on the given table name 
(with or without type suffix) and segment
@@ -524,254 +466,6 @@ public class PinotSegmentRestletResource {
     }
   }
 
-  @GET
-  @Path("segments/segmentReloadStatus/{jobId}")
-  @Authorize(targetType = TargetType.CLUSTER, action = 
Actions.Cluster.GET_SEGMENT_RELOAD_STATUS)
-  @Produces(MediaType.APPLICATION_JSON)
-  @ApiOperation(value = "Get status for a submitted reload operation",
-      notes = "Get status for a submitted reload operation")
-  public ServerReloadControllerJobStatusResponse getReloadJobStatus(
-      @ApiParam(value = "Reload job id", required = true) @PathParam("jobId") 
String reloadJobId)
-      throws Exception {
-    Map<String, String> controllerJobZKMetadata =
-        _pinotHelixResourceManager.getControllerJobZKMetadata(reloadJobId, 
ControllerJobTypes.RELOAD_SEGMENT);
-    if (controllerJobZKMetadata == null) {
-      throw new ControllerApplicationException(LOGGER, "Failed to find 
controller job id: " + reloadJobId,
-          Status.NOT_FOUND);
-    }
-
-    String tableNameWithType = 
controllerJobZKMetadata.get(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE);
-    String segmentNames =
-        
controllerJobZKMetadata.get(CommonConstants.ControllerJob.SEGMENT_RELOAD_JOB_SEGMENT_NAME);
-    String instanceName =
-        
controllerJobZKMetadata.get(CommonConstants.ControllerJob.SEGMENT_RELOAD_JOB_INSTANCE_NAME);
-    Map<String, List<String>> serverToSegments = 
getServerToSegments(tableNameWithType, segmentNames, instanceName);
-
-    BiMap<String, String> serverEndPoints =
-        
_pinotHelixResourceManager.getDataInstanceAdminEndpoints(serverToSegments.keySet());
-    CompletionServiceHelper completionServiceHelper =
-        new CompletionServiceHelper(_executor, _connectionManager, 
serverEndPoints);
-
-    List<String> serverUrls = new ArrayList<>();
-    for (Map.Entry<String, String> entry : serverEndPoints.entrySet()) {
-      String server = entry.getKey();
-      String endpoint = entry.getValue();
-      String reloadTaskStatusEndpoint =
-          endpoint + "/controllerJob/reloadStatus/" + tableNameWithType + 
"?reloadJobTimestamp="
-              + 
controllerJobZKMetadata.get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS);
-      if (segmentNames != null) {
-        List<String> segmentsForServer = serverToSegments.get(server);
-        StringBuilder encodedSegmentsBuilder = new StringBuilder();
-        if (!segmentsForServer.isEmpty()) {
-          Iterator<String> segmentIterator = segmentsForServer.iterator();
-          // Append first segment without a leading separator
-          
encodedSegmentsBuilder.append(URIUtils.encode(segmentIterator.next()));
-          // Append remaining segments, each prefixed by the separator
-          while (segmentIterator.hasNext()) {
-            
encodedSegmentsBuilder.append(SegmentNameUtils.SEGMENT_NAME_SEPARATOR)
-                                  
.append(URIUtils.encode(segmentIterator.next()));
-          }
-        }
-        reloadTaskStatusEndpoint += "&segmentName=" + encodedSegmentsBuilder;
-      }
-      serverUrls.add(reloadTaskStatusEndpoint);
-    }
-
-    CompletionServiceHelper.CompletionServiceResponse serviceResponse =
-        completionServiceHelper.doMultiGetRequest(serverUrls, null, true, 
10000);
-
-    ServerReloadControllerJobStatusResponse 
serverReloadControllerJobStatusResponse =
-        new ServerReloadControllerJobStatusResponse();
-    serverReloadControllerJobStatusResponse.setSuccessCount(0);
-
-    int totalSegments = 0;
-    for (Map.Entry<String, List<String>> entry : serverToSegments.entrySet()) {
-      totalSegments += entry.getValue().size();
-    }
-    
serverReloadControllerJobStatusResponse.setTotalSegmentCount(totalSegments);
-    
serverReloadControllerJobStatusResponse.setTotalServersQueried(serverUrls.size());
-    
serverReloadControllerJobStatusResponse.setTotalServerCallsFailed(serviceResponse._failedResponseCount);
-
-    for (Map.Entry<String, String> streamResponse : 
serviceResponse._httpResponses.entrySet()) {
-      String responseString = streamResponse.getValue();
-      try {
-        ServerReloadControllerJobStatusResponse response =
-            JsonUtils.stringToObject(responseString, 
ServerReloadControllerJobStatusResponse.class);
-        serverReloadControllerJobStatusResponse.setSuccessCount(
-            serverReloadControllerJobStatusResponse.getSuccessCount() + 
response.getSuccessCount());
-      } catch (Exception e) {
-        serverReloadControllerJobStatusResponse.setTotalServerCallsFailed(
-            
serverReloadControllerJobStatusResponse.getTotalServerCallsFailed() + 1);
-      }
-    }
-
-    // Add ZK fields
-    
serverReloadControllerJobStatusResponse.setMetadata(controllerJobZKMetadata);
-
-    // Add derived fields
-    long submissionTime = 
Long.parseLong(controllerJobZKMetadata.get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS));
-    double timeElapsedInMinutes = ((double) System.currentTimeMillis() - 
(double) submissionTime) / (1000.0 * 60.0);
-    int remainingSegments = 
serverReloadControllerJobStatusResponse.getTotalSegmentCount()
-        - serverReloadControllerJobStatusResponse.getSuccessCount();
-
-    double estimatedRemainingTimeInMinutes = -1;
-    if (serverReloadControllerJobStatusResponse.getSuccessCount() > 0) {
-      estimatedRemainingTimeInMinutes =
-          ((double) remainingSegments / (double) 
serverReloadControllerJobStatusResponse.getSuccessCount())
-              * timeElapsedInMinutes;
-    }
-
-    
serverReloadControllerJobStatusResponse.setTimeElapsedInMinutes(timeElapsedInMinutes);
-    
serverReloadControllerJobStatusResponse.setEstimatedTimeRemainingInMinutes(estimatedRemainingTimeInMinutes);
-
-    return serverReloadControllerJobStatusResponse;
-  }
-
-  @VisibleForTesting
-  Map<String, List<String>> getServerToSegments(String tableNameWithType, 
@Nullable String segmentNames,
-      @Nullable String instanceName) {
-    if (segmentNames == null) {
-      // instanceName can be null or not null, and this method below can 
handle both cases.
-      return 
_pinotHelixResourceManager.getServerToSegmentsMap(tableNameWithType, 
instanceName, true);
-    }
-    // Skip servers and segments not involved in the segment reloading job.
-    List<String> segmnetNameList = new ArrayList<>();
-    Collections.addAll(segmnetNameList, StringUtils.split(segmentNames, 
SegmentNameUtils.SEGMENT_NAME_SEPARATOR));
-    if (instanceName != null) {
-      return Map.of(instanceName, segmnetNameList);
-    }
-    // If instance is null, then either one or all segments are being reloaded 
via current segment reload restful APIs.
-    // And the if-check at the beginning of this method has handled the case 
of reloading all segments. So here we
-    // expect only one segment name.
-    Preconditions.checkState(segmnetNameList.size() == 1, "Only one segment is 
expected but got: %s", segmnetNameList);
-    Map<String, List<String>> serverToSegments = new HashMap<>();
-    Set<String> servers = 
_pinotHelixResourceManager.getServers(tableNameWithType, segmentNames);
-    for (String server : servers) {
-      serverToSegments.put(server, Collections.singletonList(segmentNames));
-    }
-    return serverToSegments;
-  }
-
-  @POST
-  @Path("segments/{tableName}/reload")
-  @Authorize(targetType = TargetType.TABLE, paramName = "tableName", action = 
Actions.Table.RELOAD_SEGMENT)
-  @Authenticate(AccessType.UPDATE)
-  @Produces(MediaType.APPLICATION_JSON)
-  @ApiOperation(value = "Reload all segments", notes = "Reload all segments")
-  public SuccessResponse reloadAllSegments(
-      @ApiParam(value = "Name of the table", required = true) 
@PathParam("tableName") String tableName,
-      @ApiParam(value = "OFFLINE|REALTIME") @QueryParam("type") String 
tableTypeStr,
-      @ApiParam(value = "Whether to force server to download segment") 
@QueryParam("forceDownload")
-      @DefaultValue("false") boolean forceDownload,
-      @ApiParam(value = "Name of the target instance to reload") 
@QueryParam("targetInstance") @Nullable
-      String targetInstance,
-      @ApiParam(value = "Map from instances to segments to reload. This param 
takes precedence over targetInstance")
-      @QueryParam("instanceToSegmentsMap") @Nullable String 
instanceToSegmentsMapInJson, @Context HttpHeaders headers)
-      throws IOException {
-    tableName = DatabaseUtils.translateTableName(tableName, headers);
-    TableType tableTypeFromTableName = 
TableNameBuilder.getTableTypeFromTableName(tableName);
-    TableType tableTypeFromRequest = Constants.validateTableType(tableTypeStr);
-    // When rawTableName is provided but w/o table type, Pinot tries to reload 
both OFFLINE
-    // and REALTIME tables for the raw table. But forceDownload option only 
works with
-    // OFFLINE table currently, so we limit the table type to OFFLINE to let 
Pinot continue
-    // to reload w/o being accidentally aborted upon REALTIME table type.
-    // TODO: support to force download immutable segments from RealTime table.
-    if (forceDownload && (tableTypeFromTableName == null && 
tableTypeFromRequest == null)) {
-      tableTypeFromRequest = TableType.OFFLINE;
-    }
-    List<String> tableNamesWithType =
-        
ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager, 
tableName, tableTypeFromRequest,
-            LOGGER);
-    if (instanceToSegmentsMapInJson != null) {
-      Map<String, List<String>> instanceToSegmentsMap =
-          JsonUtils.stringToObject(instanceToSegmentsMapInJson, new 
TypeReference<>() {
-          });
-      Map<String, Map<String, Map<String, String>>> tableInstanceMsgData =
-          reloadSegments(tableNamesWithType, forceDownload, 
instanceToSegmentsMap);
-      if (tableInstanceMsgData.isEmpty()) {
-        throw new ControllerApplicationException(LOGGER,
-            String.format("Failed to find any segments in table: %s with 
instanceToSegmentsMap: %s", tableName,
-                instanceToSegmentsMap), Status.NOT_FOUND);
-      }
-      return new 
SuccessResponse(JsonUtils.objectToString(tableInstanceMsgData));
-    }
-    long startTimeMs = System.currentTimeMillis();
-    Map<String, Map<String, String>> perTableMsgData = new LinkedHashMap<>();
-    for (String tableNameWithType : tableNamesWithType) {
-      Pair<Integer, String> msgInfo =
-          _pinotHelixResourceManager.reloadAllSegments(tableNameWithType, 
forceDownload, targetInstance);
-      int numReloadMsgSent = msgInfo.getLeft();
-      if (numReloadMsgSent <= 0) {
-        continue;
-      }
-      Map<String, String> tableReloadMeta = new HashMap<>();
-      tableReloadMeta.put("numMessagesSent", String.valueOf(numReloadMsgSent));
-      tableReloadMeta.put("reloadJobId", msgInfo.getRight());
-      perTableMsgData.put(tableNameWithType, tableReloadMeta);
-      // Store in ZK
-      try {
-        if 
(_pinotHelixResourceManager.addNewReloadAllSegmentsJob(tableNameWithType, 
targetInstance, msgInfo.getRight(),
-            startTimeMs, numReloadMsgSent)) {
-          tableReloadMeta.put("reloadJobMetaZKStorageStatus", "SUCCESS");
-        } else {
-          tableReloadMeta.put("reloadJobMetaZKStorageStatus", "FAILED");
-          LOGGER.error("Failed to add reload all segments job meta into 
zookeeper for table: {}", tableNameWithType);
-        }
-      } catch (Exception e) {
-        tableReloadMeta.put("reloadJobMetaZKStorageStatus", "FAILED");
-        LOGGER.error("Failed to add reload all segments job meta into 
zookeeper for table: {}", tableNameWithType, e);
-      }
-    }
-    if (perTableMsgData.isEmpty()) {
-      throw new ControllerApplicationException(LOGGER,
-          String.format("Failed to find any segments in table: %s on %s", 
tableName,
-              targetInstance == null ? "every instance" : targetInstance), 
Status.NOT_FOUND);
-    }
-    return new SuccessResponse(JsonUtils.objectToString(perTableMsgData));
-  }
-
-  private Map<String, Map<String, Map<String, String>>> 
reloadSegments(List<String> tableNamesWithType,
-      boolean forceDownload, Map<String, List<String>> instanceToSegmentsMap) {
-    long startTimeMs = System.currentTimeMillis();
-    Map<String, Map<String, Map<String, String>>> tableInstanceMsgData = new 
LinkedHashMap<>();
-    for (String tableNameWithType : tableNamesWithType) {
-      Map<String, Pair<Integer, String>> instanceMsgInfoMap =
-          _pinotHelixResourceManager.reloadSegments(tableNameWithType, 
forceDownload, instanceToSegmentsMap);
-      Map<String, Map<String, String>> instanceMsgData =
-          tableInstanceMsgData.computeIfAbsent(tableNameWithType, t -> new 
HashMap<>());
-      for (Map.Entry<String, Pair<Integer, String>> instanceMsgInfo : 
instanceMsgInfoMap.entrySet()) {
-        String instance = instanceMsgInfo.getKey();
-        Pair<Integer, String> msgInfo = instanceMsgInfo.getValue();
-        int numReloadMsgSent = msgInfo.getLeft();
-        if (numReloadMsgSent <= 0) {
-          continue;
-        }
-        Map<String, String> tableReloadMeta = new HashMap<>();
-        tableReloadMeta.put("numMessagesSent", 
String.valueOf(numReloadMsgSent));
-        tableReloadMeta.put("reloadJobId", msgInfo.getRight());
-        instanceMsgData.put(instance, tableReloadMeta);
-        // Store in ZK
-        try {
-          String segmentNames =
-              StringUtils.join(instanceToSegmentsMap.get(instance), 
SegmentNameUtils.SEGMENT_NAME_SEPARATOR);
-          if 
(_pinotHelixResourceManager.addNewReloadSegmentJob(tableNameWithType, 
segmentNames, instance,
-              msgInfo.getRight(), startTimeMs, numReloadMsgSent)) {
-            tableReloadMeta.put("reloadJobMetaZKStorageStatus", "SUCCESS");
-          } else {
-            tableReloadMeta.put("reloadJobMetaZKStorageStatus", "FAILED");
-            LOGGER.error("Failed to add batch reload job meta into zookeeper 
for table: {} targeted instance: {}",
-                tableNameWithType, instance);
-          }
-        } catch (Exception e) {
-          tableReloadMeta.put("reloadJobMetaZKStorageStatus", "FAILED");
-          LOGGER.error("Failed to add batch reload job meta into zookeeper for 
table: {} targeted instance: {}",
-              tableNameWithType, instance, e);
-        }
-      }
-    }
-    return tableInstanceMsgData;
-  }
-
   @DELETE
   @Produces(MediaType.APPLICATION_JSON)
   @Path("/segments/{tableName}/{segmentName}")
@@ -951,46 +645,6 @@ public class PinotSegmentRestletResource {
     return segmentsMetadata;
   }
 
-  @GET
-  @Path("segments/{tableNameWithType}/needReload")
-  @Authorize(targetType = TargetType.TABLE, paramName = "tableNameWithType", 
action = Actions.Table.GET_METADATA)
-  @Produces(MediaType.APPLICATION_JSON)
-  @ApiOperation(value = "Gets the metadata of reload segments check from 
servers hosting the table", notes =
-      "Returns true if reload is needed on the table from any one of the 
servers")
-  public String getTableReloadMetadata(
-      @ApiParam(value = "Table name with type", required = true, example = 
"myTable_REALTIME")
-      @PathParam("tableNameWithType") String tableNameWithType,
-      @QueryParam("verbose") @DefaultValue("false") boolean verbose, @Context 
HttpHeaders headers) {
-    tableNameWithType = DatabaseUtils.translateTableName(tableNameWithType, 
headers);
-    LOGGER.info("Received a request to check reload for all servers hosting 
segments for table {}", tableNameWithType);
-    try {
-      TableMetadataReader tableMetadataReader =
-          new TableMetadataReader(_executor, _connectionManager, 
_pinotHelixResourceManager);
-      Map<String, JsonNode> needReloadMetadata =
-          
tableMetadataReader.getServerCheckSegmentsReloadMetadata(tableNameWithType,
-              _controllerConf.getServerAdminRequestTimeoutSeconds() * 
1000).getServerReloadJsonResponses();
-      boolean needReload =
-          needReloadMetadata.values().stream().anyMatch(value -> 
value.get("needReload").booleanValue());
-      Map<String, ServerSegmentsReloadCheckResponse> serverResponses = new 
HashMap<>();
-      TableSegmentsReloadCheckResponse tableNeedReloadResponse;
-      if (verbose) {
-        for (Map.Entry<String, JsonNode> entry : 
needReloadMetadata.entrySet()) {
-          serverResponses.put(entry.getKey(),
-              new 
ServerSegmentsReloadCheckResponse(entry.getValue().get("needReload").booleanValue(),
-                  entry.getValue().get("instanceId").asText()));
-        }
-        tableNeedReloadResponse = new 
TableSegmentsReloadCheckResponse(needReload, serverResponses);
-      } else {
-        tableNeedReloadResponse = new 
TableSegmentsReloadCheckResponse(needReload, serverResponses);
-      }
-      return JsonUtils.objectToPrettyString(tableNeedReloadResponse);
-    } catch (InvalidConfigException e) {
-      throw new ControllerApplicationException(LOGGER, e.getMessage(), 
Status.BAD_REQUEST);
-    } catch (IOException ioe) {
-      throw new ControllerApplicationException(LOGGER, "Error parsing Pinot 
server response: " + ioe.getMessage(),
-          Status.INTERNAL_SERVER_ERROR, ioe);
-    }
-  }
 
   @GET
   @Path("segments/{tableNameWithType}/isStale")
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableReloadResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableReloadResource.java
new file mode 100644
index 00000000000..a72ec188eaf
--- /dev/null
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableReloadResource.java
@@ -0,0 +1,504 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.resources;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.BiMap;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiKeyAuthDefinition;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
+import io.swagger.annotations.Authorization;
+import io.swagger.annotations.SecurityDefinition;
+import io.swagger.annotations.SwaggerDefinition;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executor;
+import javax.annotation.Nullable;
+import javax.inject.Inject;
+import javax.ws.rs.DefaultValue;
+import javax.ws.rs.Encoded;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hc.client5.http.io.HttpClientConnectionManager;
+import org.apache.pinot.common.exception.InvalidConfigException;
+import 
org.apache.pinot.common.restlet.resources.ServerSegmentsReloadCheckResponse;
+import 
org.apache.pinot.common.restlet.resources.TableSegmentsReloadCheckResponse;
+import org.apache.pinot.common.utils.DatabaseUtils;
+import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.common.utils.URIUtils;
+import org.apache.pinot.common.utils.UploadedRealtimeSegmentName;
+import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.api.access.AccessType;
+import org.apache.pinot.controller.api.access.Authenticate;
+import 
org.apache.pinot.controller.api.exception.ControllerApplicationException;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobTypes;
+import org.apache.pinot.controller.util.CompletionServiceHelper;
+import org.apache.pinot.controller.util.TableMetadataReader;
+import org.apache.pinot.core.auth.Actions;
+import org.apache.pinot.core.auth.Authorize;
+import org.apache.pinot.core.auth.TargetType;
+import org.apache.pinot.segment.spi.creator.name.SegmentNameUtils;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.pinot.spi.utils.CommonConstants.DATABASE;
+import static 
org.apache.pinot.spi.utils.CommonConstants.SWAGGER_AUTHORIZATION_KEY;
+
+/**
+ * REST API resource for reloading table segments.
+ * <ul>
+ *   <li>
+ *     POST requests:
+ *     <ul>
+ *       <li>"/segments/{tableName}/{segmentName}/reload": reload a specific 
segment</li>
+ *       <li>"/segments/{tableName}/reload": reload all segments in a 
table</li>
+ *     </ul>
+ *   </li>
+ *   <li>
+ *     GET requests:
+ *     <ul>
+ *       <li>"/segments/segmentReloadStatus/{jobId}": get status for a 
submitted reload job</li>
+ *       <li>"/segments/{tableNameWithType}/needReload": check if table 
segments need reloading</li>
+ *     </ul>
+ *   </li>
+ * </ul>
+ */
+@Api(tags = Constants.SEGMENT_TAG, authorizations = {
+    @Authorization(value = SWAGGER_AUTHORIZATION_KEY), @Authorization(value = 
DATABASE)
+})
+@SwaggerDefinition(securityDefinition = 
@SecurityDefinition(apiKeyAuthDefinitions = {
+    @ApiKeyAuthDefinition(name = HttpHeaders.AUTHORIZATION, in = 
ApiKeyAuthDefinition.ApiKeyLocation.HEADER, key =
+        SWAGGER_AUTHORIZATION_KEY, description = "The format of the key is  
```\"Basic <token>\" or \"Bearer "
+        + "<token>\"```"), @ApiKeyAuthDefinition(name = DATABASE, in = 
ApiKeyAuthDefinition.ApiKeyLocation.HEADER,
+    key = DATABASE, description =
+    "Database context passed through http header. If no context is provided 
'default' database "
+        + "context will be considered.")
+}))
+@Path("/")
+public class PinotTableReloadResource {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PinotTableReloadResource.class);
+
+  @Inject
+  ControllerConf _controllerConf;
+
+  @Inject
+  PinotHelixResourceManager _pinotHelixResourceManager;
+
+  @Inject
+  Executor _executor;
+
+  @Inject
+  HttpClientConnectionManager _connectionManager;
+
+  @POST
+  @Path("segments/{tableName}/{segmentName}/reload")
+  @Authorize(targetType = TargetType.TABLE, paramName = "tableName", action = 
Actions.Table.RELOAD_SEGMENT)
+  @Authenticate(AccessType.UPDATE)
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Reload a specific segment",
+      notes = "Triggers segment reload on servers. Returns job ID and message 
count.")
+  @ApiResponses(value = {
+      @ApiResponse(code = 200, message = "Reload job submitted successfully"),
+      @ApiResponse(code = 404, message = "Segment or table not found")
+  })
+  public SuccessResponse reloadSegment(
+      @ApiParam(value = "Table name with or without type suffix", required = 
true, example = "myTable_OFFLINE")
+      @PathParam("tableName") String tableName,
+      @ApiParam(value = "Segment name", required = true, example = "myTable_0")
+      @PathParam("segmentName") @Encoded String segmentName,
+      @ApiParam(value = "Force server to re-download segment from deep store", 
defaultValue = "false")
+      @QueryParam("forceDownload") @DefaultValue("false") boolean 
forceDownload,
+      @ApiParam(value = "Target specific server instance") 
@QueryParam("targetInstance") @Nullable
+      String targetInstance, @Context HttpHeaders headers) {
+    tableName = DatabaseUtils.translateTableName(tableName, headers);
+    long startTimeMs = System.currentTimeMillis();
+    segmentName = URIUtils.decode(segmentName);
+    String tableNameWithType = getExistingTable(tableName, segmentName);
+    Pair<Integer, String> msgInfo =
+        _pinotHelixResourceManager.reloadSegment(tableNameWithType, 
segmentName, forceDownload, targetInstance);
+    boolean zkJobMetaWriteSuccess = false;
+    int numReloadMsgSent = msgInfo.getLeft();
+    if (numReloadMsgSent > 0) {
+      try {
+        if 
(_pinotHelixResourceManager.addNewReloadSegmentJob(tableNameWithType, 
segmentName, targetInstance,
+            msgInfo.getRight(), startTimeMs, numReloadMsgSent)) {
+          zkJobMetaWriteSuccess = true;
+        } else {
+          LOGGER.error("Failed to add reload segment job meta into zookeeper 
for table: {}, segment: {}",
+              tableNameWithType, segmentName);
+        }
+      } catch (Exception e) {
+        LOGGER.error("Failed to add reload segment job meta into zookeeper for 
table: {}, segment: {}",
+            tableNameWithType, segmentName, e);
+      }
+      return new SuccessResponse(
+          String.format("Submitted reload job id: %s, sent %d reload messages. 
Job meta ZK storage status: %s",
+              msgInfo.getRight(), numReloadMsgSent, zkJobMetaWriteSuccess ? 
"SUCCESS" : "FAILED"));
+    }
+    throw new ControllerApplicationException(LOGGER,
+        String.format("Failed to find segment: %s in table: %s on %s", 
segmentName, tableName,
+            targetInstance == null ? "every instance" : targetInstance), 
Response.Status.NOT_FOUND);
+  }
+
+  @POST
+  @Path("segments/{tableName}/reload")
+  @Authorize(targetType = TargetType.TABLE, paramName = "tableName", action = 
Actions.Table.RELOAD_SEGMENT)
+  @Authenticate(AccessType.UPDATE)
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Reload all segments in a table",
+      notes = "Reloads all segments for the specified table. Supports 
filtering by type, instance, or custom mapping.")
+  @ApiResponses(value = {
+      @ApiResponse(code = 200, message = "Reload jobs submitted successfully"),
+      @ApiResponse(code = 404, message = "No segments found")
+  })
+  public SuccessResponse reloadAllSegments(
+      @ApiParam(value = "Table name with or without type suffix", required = 
true, example = "myTable")
+      @PathParam("tableName") String tableName,
+      @ApiParam(value = "Table type filter", allowableValues = 
"OFFLINE,REALTIME") @QueryParam("type")
+      String tableTypeStr,
+      @ApiParam(value = "Force server to re-download segments from deep 
store", defaultValue = "false")
+      @QueryParam("forceDownload") @DefaultValue("false") boolean 
forceDownload,
+      @ApiParam(value = "Target specific server instance") 
@QueryParam("targetInstance") @Nullable
+      String targetInstance,
+      @ApiParam(value = "JSON map of instance to segment lists (overrides 
targetInstance)")
+      @QueryParam("instanceToSegmentsMap") @Nullable String 
instanceToSegmentsMapInJson, @Context HttpHeaders headers)
+      throws IOException {
+    tableName = DatabaseUtils.translateTableName(tableName, headers);
+    TableType tableTypeFromTableName = 
TableNameBuilder.getTableTypeFromTableName(tableName);
+    TableType tableTypeFromRequest = Constants.validateTableType(tableTypeStr);
+    // When rawTableName is provided but w/o table type, Pinot tries to reload 
both OFFLINE
+    // and REALTIME tables for the raw table. But forceDownload option only 
works with
+    // OFFLINE table currently, so we limit the table type to OFFLINE to let 
Pinot continue
+    // to reload w/o being accidentally aborted upon REALTIME table type.
+    // TODO: support to force download immutable segments from RealTime table.
+    if (forceDownload && (tableTypeFromTableName == null && 
tableTypeFromRequest == null)) {
+      tableTypeFromRequest = TableType.OFFLINE;
+    }
+    List<String> tableNamesWithType =
+        
ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager, 
tableName, tableTypeFromRequest,
+            LOGGER);
+    if (instanceToSegmentsMapInJson != null) {
+      Map<String, List<String>> instanceToSegmentsMap =
+          JsonUtils.stringToObject(instanceToSegmentsMapInJson, new 
TypeReference<>() {
+          });
+      Map<String, Map<String, Map<String, String>>> tableInstanceMsgData =
+          reloadSegments(tableNamesWithType, forceDownload, 
instanceToSegmentsMap);
+      if (tableInstanceMsgData.isEmpty()) {
+        throw new ControllerApplicationException(LOGGER,
+            String.format("Failed to find any segments in table: %s with 
instanceToSegmentsMap: %s", tableName,
+                instanceToSegmentsMap), Response.Status.NOT_FOUND);
+      }
+      return new 
SuccessResponse(JsonUtils.objectToString(tableInstanceMsgData));
+    }
+    long startTimeMs = System.currentTimeMillis();
+    Map<String, Map<String, String>> perTableMsgData = new LinkedHashMap<>();
+    for (String tableNameWithType : tableNamesWithType) {
+      Pair<Integer, String> msgInfo =
+          _pinotHelixResourceManager.reloadAllSegments(tableNameWithType, 
forceDownload, targetInstance);
+      int numReloadMsgSent = msgInfo.getLeft();
+      if (numReloadMsgSent <= 0) {
+        continue;
+      }
+      Map<String, String> tableReloadMeta = new HashMap<>();
+      tableReloadMeta.put("numMessagesSent", String.valueOf(numReloadMsgSent));
+      tableReloadMeta.put("reloadJobId", msgInfo.getRight());
+      perTableMsgData.put(tableNameWithType, tableReloadMeta);
+      // Store in ZK
+      try {
+        if 
(_pinotHelixResourceManager.addNewReloadAllSegmentsJob(tableNameWithType, 
targetInstance, msgInfo.getRight(),
+            startTimeMs, numReloadMsgSent)) {
+          tableReloadMeta.put("reloadJobMetaZKStorageStatus", "SUCCESS");
+        } else {
+          tableReloadMeta.put("reloadJobMetaZKStorageStatus", "FAILED");
+          LOGGER.error("Failed to add reload all segments job meta into 
zookeeper for table: {}", tableNameWithType);
+        }
+      } catch (Exception e) {
+        tableReloadMeta.put("reloadJobMetaZKStorageStatus", "FAILED");
+        LOGGER.error("Failed to add reload all segments job meta into 
zookeeper for table: {}", tableNameWithType, e);
+      }
+    }
+    if (perTableMsgData.isEmpty()) {
+      throw new ControllerApplicationException(LOGGER,
+          String.format("Failed to find any segments in table: %s on %s", 
tableName,
+              targetInstance == null ? "every instance" : targetInstance), 
Response.Status.NOT_FOUND);
+    }
+    return new SuccessResponse(JsonUtils.objectToString(perTableMsgData));
+  }
+
+  /**
+   * Helper method to find the existing table based on the given table name 
(with or without type suffix) and segment
+   * name.
+   */
+  private String getExistingTable(String tableName, String segmentName) {
+    TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableName);
+    if (tableType == null) {
+      // Derive table type from segment name if the given table name doesn't 
have type suffix
+      tableType = LLCSegmentName.isLLCSegment(segmentName) ? TableType.REALTIME
+          : 
(UploadedRealtimeSegmentName.isUploadedRealtimeSegmentName(segmentName) ? 
TableType.REALTIME
+              : TableType.OFFLINE);
+    }
+    return 
ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager, 
tableName, tableType, LOGGER).get(0);
+  }
+
+  private Map<String, Map<String, Map<String, String>>> 
reloadSegments(List<String> tableNamesWithType,
+      boolean forceDownload, Map<String, List<String>> instanceToSegmentsMap) {
+    long startTimeMs = System.currentTimeMillis();
+    Map<String, Map<String, Map<String, String>>> tableInstanceMsgData = new 
LinkedHashMap<>();
+    for (String tableNameWithType : tableNamesWithType) {
+      Map<String, Pair<Integer, String>> instanceMsgInfoMap =
+          _pinotHelixResourceManager.reloadSegments(tableNameWithType, 
forceDownload, instanceToSegmentsMap);
+      Map<String, Map<String, String>> instanceMsgData =
+          tableInstanceMsgData.computeIfAbsent(tableNameWithType, t -> new 
HashMap<>());
+      for (Map.Entry<String, Pair<Integer, String>> instanceMsgInfo : 
instanceMsgInfoMap.entrySet()) {
+        String instance = instanceMsgInfo.getKey();
+        Pair<Integer, String> msgInfo = instanceMsgInfo.getValue();
+        int numReloadMsgSent = msgInfo.getLeft();
+        if (numReloadMsgSent <= 0) {
+          continue;
+        }
+        Map<String, String> tableReloadMeta = new HashMap<>();
+        tableReloadMeta.put("numMessagesSent", 
String.valueOf(numReloadMsgSent));
+        tableReloadMeta.put("reloadJobId", msgInfo.getRight());
+        instanceMsgData.put(instance, tableReloadMeta);
+        // Store in ZK
+        try {
+          String segmentNames =
+              StringUtils.join(instanceToSegmentsMap.get(instance), 
SegmentNameUtils.SEGMENT_NAME_SEPARATOR);
+          if 
(_pinotHelixResourceManager.addNewReloadSegmentJob(tableNameWithType, 
segmentNames, instance,
+              msgInfo.getRight(), startTimeMs, numReloadMsgSent)) {
+            tableReloadMeta.put("reloadJobMetaZKStorageStatus", "SUCCESS");
+          } else {
+            tableReloadMeta.put("reloadJobMetaZKStorageStatus", "FAILED");
+            LOGGER.error("Failed to add batch reload job meta into zookeeper 
for table: {} targeted instance: {}",
+                tableNameWithType, instance);
+          }
+        } catch (Exception e) {
+          tableReloadMeta.put("reloadJobMetaZKStorageStatus", "FAILED");
+          LOGGER.error("Failed to add batch reload job meta into zookeeper for 
table: {} targeted instance: {}",
+              tableNameWithType, instance, e);
+        }
+      }
+    }
+    return tableInstanceMsgData;
+  }
+
+  @GET
+  @Path("segments/segmentReloadStatus/{jobId}")
+  @Authorize(targetType = TargetType.CLUSTER, action = 
Actions.Cluster.GET_SEGMENT_RELOAD_STATUS)
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Get reload job status",
+      notes = "Returns progress and metadata for a reload job including 
completion stats and time estimates.")
+  @ApiResponses(value = {
+      @ApiResponse(code = 200, message = "Job status retrieved successfully"),
+      @ApiResponse(code = 404, message = "Job ID not found")
+  })
+  public ServerReloadControllerJobStatusResponse getReloadJobStatus(
+      @ApiParam(value = "Reload job ID returned from reload endpoint", 
required = true) @PathParam("jobId")
+      String reloadJobId) throws Exception {
+    Map<String, String> controllerJobZKMetadata =
+        _pinotHelixResourceManager.getControllerJobZKMetadata(reloadJobId, 
ControllerJobTypes.RELOAD_SEGMENT);
+    if (controllerJobZKMetadata == null) {
+      throw new ControllerApplicationException(LOGGER, "Failed to find 
controller job id: " + reloadJobId,
+          Response.Status.NOT_FOUND);
+    }
+
+    String tableNameWithType = 
controllerJobZKMetadata.get(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE);
+    String segmentNames = 
controllerJobZKMetadata.get(CommonConstants.ControllerJob.SEGMENT_RELOAD_JOB_SEGMENT_NAME);
+    String instanceName = 
controllerJobZKMetadata.get(CommonConstants.ControllerJob.SEGMENT_RELOAD_JOB_INSTANCE_NAME);
+    Map<String, List<String>> serverToSegments = 
getServerToSegments(tableNameWithType, segmentNames, instanceName);
+
+    BiMap<String, String> serverEndPoints =
+        
_pinotHelixResourceManager.getDataInstanceAdminEndpoints(serverToSegments.keySet());
+    CompletionServiceHelper completionServiceHelper =
+        new CompletionServiceHelper(_executor, _connectionManager, 
serverEndPoints);
+
+    List<String> serverUrls = new ArrayList<>();
+    for (Map.Entry<String, String> entry : serverEndPoints.entrySet()) {
+      String server = entry.getKey();
+      String endpoint = entry.getValue();
+      String reloadTaskStatusEndpoint =
+          endpoint + "/controllerJob/reloadStatus/" + tableNameWithType + 
"?reloadJobTimestamp="
+              + 
controllerJobZKMetadata.get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS);
+      if (segmentNames != null) {
+        List<String> segmentsForServer = serverToSegments.get(server);
+        StringBuilder encodedSegmentsBuilder = new StringBuilder();
+        if (!segmentsForServer.isEmpty()) {
+          Iterator<String> segmentIterator = segmentsForServer.iterator();
+          // Append first segment without a leading separator
+          
encodedSegmentsBuilder.append(URIUtils.encode(segmentIterator.next()));
+          // Append remaining segments, each prefixed by the separator
+          while (segmentIterator.hasNext()) {
+            
encodedSegmentsBuilder.append(SegmentNameUtils.SEGMENT_NAME_SEPARATOR)
+                .append(URIUtils.encode(segmentIterator.next()));
+          }
+        }
+        reloadTaskStatusEndpoint += "&segmentName=" + encodedSegmentsBuilder;
+      }
+      serverUrls.add(reloadTaskStatusEndpoint);
+    }
+
+    CompletionServiceHelper.CompletionServiceResponse serviceResponse =
+        completionServiceHelper.doMultiGetRequest(serverUrls, null, true, 
10000);
+
+    ServerReloadControllerJobStatusResponse 
serverReloadControllerJobStatusResponse =
+        new ServerReloadControllerJobStatusResponse();
+    serverReloadControllerJobStatusResponse.setSuccessCount(0);
+
+    int totalSegments = 0;
+    for (Map.Entry<String, List<String>> entry : serverToSegments.entrySet()) {
+      totalSegments += entry.getValue().size();
+    }
+    
serverReloadControllerJobStatusResponse.setTotalSegmentCount(totalSegments);
+    
serverReloadControllerJobStatusResponse.setTotalServersQueried(serverUrls.size());
+    
serverReloadControllerJobStatusResponse.setTotalServerCallsFailed(serviceResponse._failedResponseCount);
+
+    for (Map.Entry<String, String> streamResponse : 
serviceResponse._httpResponses.entrySet()) {
+      String responseString = streamResponse.getValue();
+      try {
+        ServerReloadControllerJobStatusResponse response =
+            JsonUtils.stringToObject(responseString, 
ServerReloadControllerJobStatusResponse.class);
+        serverReloadControllerJobStatusResponse.setSuccessCount(
+            serverReloadControllerJobStatusResponse.getSuccessCount() + 
response.getSuccessCount());
+      } catch (Exception e) {
+        serverReloadControllerJobStatusResponse.setTotalServerCallsFailed(
+            
serverReloadControllerJobStatusResponse.getTotalServerCallsFailed() + 1);
+      }
+    }
+
+    // Add ZK fields
+    
serverReloadControllerJobStatusResponse.setMetadata(controllerJobZKMetadata);
+
+    // Add derived fields
+    long submissionTime = 
Long.parseLong(controllerJobZKMetadata.get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS));
+    double timeElapsedInMinutes = ((double) System.currentTimeMillis() - 
(double) submissionTime) / (1000.0 * 60.0);
+    int remainingSegments = 
serverReloadControllerJobStatusResponse.getTotalSegmentCount()
+        - serverReloadControllerJobStatusResponse.getSuccessCount();
+
+    double estimatedRemainingTimeInMinutes = -1;
+    if (serverReloadControllerJobStatusResponse.getSuccessCount() > 0) {
+      estimatedRemainingTimeInMinutes =
+          ((double) remainingSegments / (double) 
serverReloadControllerJobStatusResponse.getSuccessCount())
+              * timeElapsedInMinutes;
+    }
+
+    
serverReloadControllerJobStatusResponse.setTimeElapsedInMinutes(timeElapsedInMinutes);
+    
serverReloadControllerJobStatusResponse.setEstimatedTimeRemainingInMinutes(estimatedRemainingTimeInMinutes);
+
+    return serverReloadControllerJobStatusResponse;
+  }
+
+  @GET
+  @Path("segments/{tableNameWithType}/needReload")
+  @Authorize(targetType = TargetType.TABLE, paramName = "tableNameWithType", 
action = Actions.Table.GET_METADATA)
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Check if table needs reload",
+      notes = "Queries all servers hosting the table to determine if segments 
need reloading.")
+  @ApiResponses(value = {
+      @ApiResponse(code = 200, message = "Reload check completed 
successfully"),
+      @ApiResponse(code = 400, message = "Invalid table configuration")
+  })
+  public String getTableReloadMetadata(
+      @ApiParam(value = "Table name with type suffix", required = true, 
example = "myTable_REALTIME")
+      @PathParam("tableNameWithType") String tableNameWithType,
+      @ApiParam(value = "Include detailed server responses", defaultValue = 
"false") @QueryParam("verbose")
+      @DefaultValue("false") boolean verbose, @Context HttpHeaders headers) {
+    tableNameWithType = DatabaseUtils.translateTableName(tableNameWithType, 
headers);
+    LOGGER.info("Received a request to check reload for all servers hosting 
segments for table {}", tableNameWithType);
+    try {
+      TableMetadataReader tableMetadataReader =
+          new TableMetadataReader(_executor, _connectionManager, 
_pinotHelixResourceManager);
+      Map<String, JsonNode> needReloadMetadata =
+          
tableMetadataReader.getServerCheckSegmentsReloadMetadata(tableNameWithType,
+              _controllerConf.getServerAdminRequestTimeoutSeconds() * 
1000).getServerReloadJsonResponses();
+      boolean needReload =
+          needReloadMetadata.values().stream().anyMatch(value -> 
value.get("needReload").booleanValue());
+      Map<String, ServerSegmentsReloadCheckResponse> serverResponses = new 
HashMap<>();
+      TableSegmentsReloadCheckResponse tableNeedReloadResponse;
+      if (verbose) {
+        for (Map.Entry<String, JsonNode> entry : 
needReloadMetadata.entrySet()) {
+          serverResponses.put(entry.getKey(),
+              new 
ServerSegmentsReloadCheckResponse(entry.getValue().get("needReload").booleanValue(),
+                  entry.getValue().get("instanceId").asText()));
+        }
+        tableNeedReloadResponse = new 
TableSegmentsReloadCheckResponse(needReload, serverResponses);
+      } else {
+        tableNeedReloadResponse = new 
TableSegmentsReloadCheckResponse(needReload, serverResponses);
+      }
+      return JsonUtils.objectToPrettyString(tableNeedReloadResponse);
+    } catch (InvalidConfigException e) {
+      throw new ControllerApplicationException(LOGGER, e.getMessage(), 
Response.Status.BAD_REQUEST);
+    } catch (IOException ioe) {
+      throw new ControllerApplicationException(LOGGER, "Error parsing Pinot 
server response: " + ioe.getMessage(),
+          Response.Status.INTERNAL_SERVER_ERROR, ioe);
+    }
+  }
+
+  @VisibleForTesting
+  Map<String, List<String>> getServerToSegments(String tableNameWithType, 
@Nullable String segmentNames,
+      @Nullable String instanceName) {
+    if (segmentNames == null) {
+      // instanceName can be null or not null, and this method below can 
handle both cases.
+      return 
_pinotHelixResourceManager.getServerToSegmentsMap(tableNameWithType, 
instanceName, true);
+    }
+    // Skip servers and segments not involved in the segment reloading job.
+    List<String> segmnetNameList = new ArrayList<>();
+    Collections.addAll(segmnetNameList, StringUtils.split(segmentNames, 
SegmentNameUtils.SEGMENT_NAME_SEPARATOR));
+    if (instanceName != null) {
+      return Map.of(instanceName, segmnetNameList);
+    }
+    // If instance is null, then either one or all segments are being reloaded 
via current segment reload restful APIs.
+    // And the if-check at the beginning of this method has handled the case 
of reloading all segments. So here we
+    // expect only one segment name.
+    Preconditions.checkState(segmnetNameList.size() == 1, "Only one segment is 
expected but got: %s", segmnetNameList);
+    Map<String, List<String>> serverToSegments = new HashMap<>();
+    Set<String> servers = 
_pinotHelixResourceManager.getServers(tableNameWithType, segmentNames);
+    for (String server : servers) {
+      serverToSegments.put(server, Collections.singletonList(segmentNames));
+    }
+    return serverToSegments;
+  }
+}
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResourceTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResourceTest.java
index 9117467bdfa..97f14aa2ee7 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResourceTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResourceTest.java
@@ -28,9 +28,7 @@ import java.util.stream.Collectors;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.pinot.common.utils.LLCSegmentName;
-import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.mockito.InjectMocks;
-import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
@@ -38,12 +36,9 @@ import org.testng.annotations.Test;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertTrue;
 
 
 public class PinotSegmentRestletResourceTest {
-  @Mock
-  PinotHelixResourceManager _pinotHelixResourceManager;
 
   @InjectMocks
   PinotSegmentRestletResource _pinotSegmentRestletResource;
@@ -53,45 +48,6 @@ public class PinotSegmentRestletResourceTest {
     MockitoAnnotations.openMocks(this);
   }
 
-  @Test
-  public void testGetServerToSegments() {
-    String tableName = "testTable";
-    Map<String, List<String>> fullServerToSegmentsMap = new HashMap<>();
-    fullServerToSegmentsMap.put("svr01", new ArrayList<>(List.of("seg01", 
"seg02")));
-    fullServerToSegmentsMap.put("svr02", new ArrayList<>(List.of("seg02", 
"seg03")));
-    fullServerToSegmentsMap.put("svr03", new ArrayList<>(List.of("seg03", 
"seg01")));
-    when(_pinotHelixResourceManager.getServerToSegmentsMap(tableName, null, 
true)).thenReturn(fullServerToSegmentsMap);
-    when(_pinotHelixResourceManager.getServerToSegmentsMap(tableName, "svr02", 
true)).thenReturn(
-        Map.of("svr02", new ArrayList<>(List.of("seg02", "seg03"))));
-    when(_pinotHelixResourceManager.getServers(tableName, 
"seg01")).thenReturn(Set.of("svr01", "svr03"));
-
-    // Get all servers and all their segments.
-    Map<String, List<String>> serverToSegmentsMap =
-        _pinotSegmentRestletResource.getServerToSegments(tableName, null, 
null);
-    assertEquals(serverToSegmentsMap, fullServerToSegmentsMap);
-
-    // Get all segments on svr02.
-    serverToSegmentsMap = 
_pinotSegmentRestletResource.getServerToSegments(tableName, null, "svr02");
-    assertEquals(serverToSegmentsMap, Map.of("svr02", List.of("seg02", 
"seg03")));
-
-    // Get all servers with seg01.
-    serverToSegmentsMap = 
_pinotSegmentRestletResource.getServerToSegments(tableName, "seg01", null);
-    assertEquals(serverToSegmentsMap, Map.of("svr01", List.of("seg01"), 
"svr03", List.of("seg01")));
-
-    // Simply map the provided server to the provided segments.
-    serverToSegmentsMap = 
_pinotSegmentRestletResource.getServerToSegments(tableName, "seg01", "svr01");
-    assertEquals(serverToSegmentsMap, Map.of("svr01", List.of("seg01")));
-    serverToSegmentsMap = 
_pinotSegmentRestletResource.getServerToSegments(tableName, "anySegment", 
"anyServer");
-    assertEquals(serverToSegmentsMap, Map.of("anyServer", 
List.of("anySegment")));
-    serverToSegmentsMap = 
_pinotSegmentRestletResource.getServerToSegments(tableName, "seg01|seg02", 
"svr02");
-    assertEquals(serverToSegmentsMap, Map.of("svr02", List.of("seg01", 
"seg02")));
-    try {
-      _pinotSegmentRestletResource.getServerToSegments(tableName, 
"seg01,seg02", null);
-    } catch (Exception e) {
-      assertTrue(e.getMessage().contains("Only one segment is expected but 
got: [seg01, seg02]"));
-    }
-  }
-
   @Test
   public void testGetPartitionIdToSegmentsToDeleteMap() {
     IdealState idealState = mock(IdealState.class);
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotTableReloadResourceTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotTableReloadResourceTest.java
new file mode 100644
index 00000000000..ed8fabb366f
--- /dev/null
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotTableReloadResourceTest.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.resources;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+
+public class PinotTableReloadResourceTest {
+  @Mock
+  PinotHelixResourceManager _pinotHelixResourceManager;
+
+  @InjectMocks
+  PinotTableReloadResource _resource;
+
+  @BeforeMethod
+  public void setup() {
+    MockitoAnnotations.openMocks(this);
+  }
+
+  @Test
+  public void testGetServerToSegments() {
+    String tableName = "testTable";
+    Map<String, List<String>> fullServerToSegmentsMap = new HashMap<>();
+    fullServerToSegmentsMap.put("svr01", new ArrayList<>(List.of("seg01", 
"seg02")));
+    fullServerToSegmentsMap.put("svr02", new ArrayList<>(List.of("seg02", 
"seg03")));
+    fullServerToSegmentsMap.put("svr03", new ArrayList<>(List.of("seg03", 
"seg01")));
+    when(_pinotHelixResourceManager.getServerToSegmentsMap(tableName, null, 
true)).thenReturn(fullServerToSegmentsMap);
+    when(_pinotHelixResourceManager.getServerToSegmentsMap(tableName, "svr02", 
true)).thenReturn(
+        Map.of("svr02", new ArrayList<>(List.of("seg02", "seg03"))));
+    when(_pinotHelixResourceManager.getServers(tableName, 
"seg01")).thenReturn(Set.of("svr01", "svr03"));
+
+    // Get all servers and all their segments.
+    Map<String, List<String>> serverToSegmentsMap = 
_resource.getServerToSegments(tableName, null, null);
+    assertEquals(serverToSegmentsMap, fullServerToSegmentsMap);
+
+    // Get all segments on svr02.
+    serverToSegmentsMap = _resource.getServerToSegments(tableName, null, 
"svr02");
+    assertEquals(serverToSegmentsMap, Map.of("svr02", List.of("seg02", 
"seg03")));
+
+    // Get all servers with seg01.
+    serverToSegmentsMap = _resource.getServerToSegments(tableName, "seg01", 
null);
+    assertEquals(serverToSegmentsMap, Map.of("svr01", List.of("seg01"), 
"svr03", List.of("seg01")));
+
+    // Simply map the provided server to the provided segments.
+    serverToSegmentsMap = _resource.getServerToSegments(tableName, "seg01", 
"svr01");
+    assertEquals(serverToSegmentsMap, Map.of("svr01", List.of("seg01")));
+    serverToSegmentsMap = _resource.getServerToSegments(tableName, 
"anySegment", "anyServer");
+    assertEquals(serverToSegmentsMap, Map.of("anyServer", 
List.of("anySegment")));
+    serverToSegmentsMap = _resource.getServerToSegments(tableName, 
"seg01|seg02", "svr02");
+    assertEquals(serverToSegmentsMap, Map.of("svr02", List.of("seg01", 
"seg02")));
+    try {
+      _resource.getServerToSegments(tableName, "seg01,seg02", null);
+    } catch (Exception e) {
+      assertTrue(e.getMessage().contains("Only one segment is expected but 
got: [seg01, seg02]"));
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to