This is an automated email from the ASF dual-hosted git repository.

somandal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 9c2948b91df [controller] Refactor. Moved reload job status logic into 
a dedicated PinotTableReloadStatusReporter class (#17036)
9c2948b91df is described below

commit 9c2948b91dfec02cb31fb8f52debdcceca588b8d
Author: Suvodeep Pyne <[email protected]>
AuthorDate: Fri Oct 17 12:37:38 2025 -0700

    [controller] Refactor. Moved reload job status logic into a dedicated 
PinotTableReloadStatusReporter class (#17036)
    
    * [controller] Refactor reload job status logic into a dedicated 
PinotTableReloadStatusReporter class
    
    * [controller] Refactor ServerReloadControllerJobStatusResponse setters to 
enable method chaining and simplify reload job status logic
    
    * [controller] Chain setters in PinotTableReloadStatusReporter for 
simplification and clean-up redundant code
    
    * [controller] Bind PinotTableReloadStatusReporter as a Singleton in 
BaseControllerStarter
---
 .../pinot/controller/BaseControllerStarter.java    |   2 +
 .../api/resources/PinotTableReloadResource.java    |  20 ++-
 .../ServerReloadControllerJobStatusResponse.java   |  25 ++-
 .../services/PinotTableReloadService.java          | 131 +-------------
 .../services/PinotTableReloadStatusReporter.java   | 192 +++++++++++++++++++++
 ...ava => PinotTableReloadStatusReporterTest.java} |  18 +-
 6 files changed, 233 insertions(+), 155 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
index 7e0198f89bf..af59efac1a0 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
@@ -117,6 +117,7 @@ import 
org.apache.pinot.controller.helix.core.statemodel.LeadControllerResourceM
 import org.apache.pinot.controller.helix.core.util.HelixSetupUtils;
 import org.apache.pinot.controller.helix.starter.HelixConfig;
 import org.apache.pinot.controller.services.PinotTableReloadService;
+import org.apache.pinot.controller.services.PinotTableReloadStatusReporter;
 import org.apache.pinot.controller.tuner.TableConfigTunerRegistry;
 import org.apache.pinot.controller.util.BrokerServiceHelper;
 import org.apache.pinot.controller.util.TableSizeReader;
@@ -683,6 +684,7 @@ public abstract class BaseControllerStarter implements 
ServiceStartable {
         
bind(controllerStartTime).named(ControllerAdminApiApplication.START_TIME);
 
         bindAsContract(PinotTableReloadService.class).in(Singleton.class);
+        
bindAsContract(PinotTableReloadStatusReporter.class).in(Singleton.class);
 
         String loggerRootDir = 
_config.getProperty(CommonConstants.Controller.CONFIG_OF_LOGGER_ROOT_DIR);
         if (loggerRootDir != null) {
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableReloadResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableReloadResource.java
index 0d9fa86d8b2..9cb4d8c2c9c 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableReloadResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableReloadResource.java
@@ -44,6 +44,7 @@ import javax.ws.rs.core.MediaType;
 import org.apache.pinot.controller.api.access.AccessType;
 import org.apache.pinot.controller.api.access.Authenticate;
 import org.apache.pinot.controller.services.PinotTableReloadService;
+import org.apache.pinot.controller.services.PinotTableReloadStatusReporter;
 import org.apache.pinot.core.auth.Actions;
 import org.apache.pinot.core.auth.Authorize;
 import org.apache.pinot.core.auth.TargetType;
@@ -87,11 +88,14 @@ import static 
org.apache.pinot.spi.utils.CommonConstants.SWAGGER_AUTHORIZATION_K
 public class PinotTableReloadResource {
   private static final Logger LOG = 
LoggerFactory.getLogger(PinotTableReloadResource.class);
 
-  private final PinotTableReloadService _pinotTableReloadService;
+  private final PinotTableReloadService _service;
+  private final PinotTableReloadStatusReporter _statusReporter;
 
   @Inject
-  public PinotTableReloadResource(PinotTableReloadService 
pinotTableReloadService) {
-    _pinotTableReloadService = pinotTableReloadService;
+  public PinotTableReloadResource(PinotTableReloadService service,
+      PinotTableReloadStatusReporter statusReporter) {
+    _service = service;
+    _statusReporter = statusReporter;
   }
 
   @POST
@@ -114,7 +118,7 @@ public class PinotTableReloadResource {
       @QueryParam("forceDownload") @DefaultValue("false") boolean 
forceDownload,
       @ApiParam(value = "Target specific server instance") 
@QueryParam("targetInstance") @Nullable
       String targetInstance, @Context HttpHeaders headers) {
-    return _pinotTableReloadService.reloadSegment(tableName, segmentName, 
forceDownload, targetInstance, headers);
+    return _service.reloadSegment(tableName, segmentName, forceDownload, 
targetInstance, headers);
   }
 
   @POST
@@ -140,7 +144,7 @@ public class PinotTableReloadResource {
       @ApiParam(value = "JSON map of instance to segment lists (overrides 
targetInstance)")
       @QueryParam("instanceToSegmentsMap") @Nullable String 
instanceToSegmentsMapInJson, @Context HttpHeaders headers)
       throws IOException {
-    return _pinotTableReloadService.reloadAllSegments(tableName, tableTypeStr, 
forceDownload, targetInstance,
+    return _service.reloadAllSegments(tableName, tableTypeStr, forceDownload, 
targetInstance,
         instanceToSegmentsMapInJson, headers);
   }
 
@@ -157,7 +161,7 @@ public class PinotTableReloadResource {
   public ServerReloadControllerJobStatusResponse getReloadJobStatus(
       @ApiParam(value = "Reload job ID returned from reload endpoint", 
required = true) @PathParam("jobId")
       String reloadJobId) throws Exception {
-    return _pinotTableReloadService.getReloadJobStatus(reloadJobId);
+    return _statusReporter.getReloadJobStatus(reloadJobId);
   }
 
   @GET
@@ -170,11 +174,11 @@ public class PinotTableReloadResource {
       @ApiResponse(code = 200, message = "Reload check completed 
successfully"),
       @ApiResponse(code = 400, message = "Invalid table configuration")
   })
-  public String getTableReloadMetadata(
+  public String needReload(
       @ApiParam(value = "Table name with type suffix", required = true, 
example = "myTable_REALTIME")
       @PathParam("tableNameWithType") String tableNameWithType,
       @ApiParam(value = "Include detailed server responses", defaultValue = 
"false") @QueryParam("verbose")
       @DefaultValue("false") boolean verbose, @Context HttpHeaders headers) {
-    return _pinotTableReloadService.getTableReloadMetadata(tableNameWithType, 
verbose, headers);
+    return _service.needReload(tableNameWithType, verbose, headers);
   }
 }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/ServerReloadControllerJobStatusResponse.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/ServerReloadControllerJobStatusResponse.java
index 2b2c259857a..9a0467043d7 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/ServerReloadControllerJobStatusResponse.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/ServerReloadControllerJobStatusResponse.java
@@ -33,55 +33,64 @@ public class ServerReloadControllerJobStatusResponse {
     return _totalSegmentCount;
   }
 
-  public void setTotalSegmentCount(int totalSegmentCount) {
+  public ServerReloadControllerJobStatusResponse setTotalSegmentCount(int 
totalSegmentCount) {
     _totalSegmentCount = totalSegmentCount;
+    return this;
   }
 
   public int getSuccessCount() {
     return _successCount;
   }
 
-  public void setSuccessCount(int successCount) {
+  public ServerReloadControllerJobStatusResponse setSuccessCount(int 
successCount) {
     _successCount = successCount;
+    return this;
   }
 
   public double getEstimatedTimeRemainingInMinutes() {
     return _estimatedTimeRemainingInMinutes;
   }
 
-  public void setEstimatedTimeRemainingInMinutes(double 
estimatedTimeRemainingInMillis) {
-    _estimatedTimeRemainingInMinutes = estimatedTimeRemainingInMillis;
+  public ServerReloadControllerJobStatusResponse 
setEstimatedTimeRemainingInMinutes(
+      double estimatedTimeRemainingInMinutes) {
+    _estimatedTimeRemainingInMinutes = estimatedTimeRemainingInMinutes;
+    return this;
   }
 
   public double getTimeElapsedInMinutes() {
     return _timeElapsedInMinutes;
   }
 
-  public void setTimeElapsedInMinutes(double timeElapsedInMinutes) {
+  public ServerReloadControllerJobStatusResponse 
setTimeElapsedInMinutes(double timeElapsedInMinutes) {
     _timeElapsedInMinutes = timeElapsedInMinutes;
+    return this;
   }
 
+
   public int getTotalServersQueried() {
     return _totalServersQueried;
   }
 
-  public void setTotalServersQueried(int totalServersQueried) {
+  public ServerReloadControllerJobStatusResponse setTotalServersQueried(int 
totalServersQueried) {
     _totalServersQueried = totalServersQueried;
+    return this;
   }
 
   public int getTotalServerCallsFailed() {
     return _totalServerCallsFailed;
   }
 
-  public void setTotalServerCallsFailed(int totalServerCallsFailed) {
+  public ServerReloadControllerJobStatusResponse setTotalServerCallsFailed(int 
totalServerCallsFailed) {
     _totalServerCallsFailed = totalServerCallsFailed;
+    return this;
   }
 
   public Map<String, String> getMetadata() {
     return _metadata;
   }
 
-  public void setMetadata(Map<String, String> metadata) {
+  public ServerReloadControllerJobStatusResponse setMetadata(Map<String, 
String> metadata) {
     _metadata = metadata;
+    return this;
   }
 }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/services/PinotTableReloadService.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/services/PinotTableReloadService.java
index 5037f32c959..8c257076eae 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/services/PinotTableReloadService.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/services/PinotTableReloadService.java
@@ -20,20 +20,12 @@ package org.apache.pinot.controller.services;
 
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.JsonNode;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.BiMap;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.Executor;
-import javax.annotation.Nullable;
 import javax.inject.Inject;
 import javax.inject.Singleton;
 import javax.ws.rs.core.HttpHeaders;
@@ -52,15 +44,11 @@ import org.apache.pinot.controller.ControllerConf;
 import 
org.apache.pinot.controller.api.exception.ControllerApplicationException;
 import org.apache.pinot.controller.api.resources.Constants;
 import org.apache.pinot.controller.api.resources.ResourceUtils;
-import 
org.apache.pinot.controller.api.resources.ServerReloadControllerJobStatusResponse;
 import org.apache.pinot.controller.api.resources.SuccessResponse;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
-import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobTypes;
-import org.apache.pinot.controller.util.CompletionServiceHelper;
 import org.apache.pinot.controller.util.TableMetadataReader;
 import org.apache.pinot.segment.spi.creator.name.SegmentNameUtils;
 import org.apache.pinot.spi.config.table.TableType;
-import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.JsonUtils;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.slf4j.Logger;
@@ -181,101 +169,8 @@ public class PinotTableReloadService {
     return new SuccessResponse(JsonUtils.objectToString(perTableMsgData));
   }
 
-  public ServerReloadControllerJobStatusResponse getReloadJobStatus(String 
reloadJobId)
-      throws InvalidConfigException {
-    Map<String, String> controllerJobZKMetadata =
-        _pinotHelixResourceManager.getControllerJobZKMetadata(reloadJobId, 
ControllerJobTypes.RELOAD_SEGMENT);
-    if (controllerJobZKMetadata == null) {
-      throw new ControllerApplicationException(LOG, "Failed to find controller 
job id: " + reloadJobId,
-          Response.Status.NOT_FOUND);
-    }
-
-    String tableNameWithType = 
controllerJobZKMetadata.get(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE);
-    String segmentNames = 
controllerJobZKMetadata.get(CommonConstants.ControllerJob.SEGMENT_RELOAD_JOB_SEGMENT_NAME);
-    String instanceName = 
controllerJobZKMetadata.get(CommonConstants.ControllerJob.SEGMENT_RELOAD_JOB_INSTANCE_NAME);
-    Map<String, List<String>> serverToSegments = 
getServerToSegments(tableNameWithType, segmentNames, instanceName);
-
-    BiMap<String, String> serverEndPoints =
-        
_pinotHelixResourceManager.getDataInstanceAdminEndpoints(serverToSegments.keySet());
-    CompletionServiceHelper completionServiceHelper =
-        new CompletionServiceHelper(_executor, _connectionManager, 
serverEndPoints);
-
-    List<String> serverUrls = new ArrayList<>();
-    for (Map.Entry<String, String> entry : serverEndPoints.entrySet()) {
-      String server = entry.getKey();
-      String endpoint = entry.getValue();
-      String reloadTaskStatusEndpoint =
-          endpoint + "/controllerJob/reloadStatus/" + tableNameWithType + 
"?reloadJobTimestamp="
-              + 
controllerJobZKMetadata.get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS);
-      if (segmentNames != null) {
-        List<String> segmentsForServer = serverToSegments.get(server);
-        StringBuilder encodedSegmentsBuilder = new StringBuilder();
-        if (!segmentsForServer.isEmpty()) {
-          Iterator<String> segmentIterator = segmentsForServer.iterator();
-          // Append first segment without a leading separator
-          
encodedSegmentsBuilder.append(URIUtils.encode(segmentIterator.next()));
-          // Append remaining segments, each prefixed by the separator
-          while (segmentIterator.hasNext()) {
-            
encodedSegmentsBuilder.append(SegmentNameUtils.SEGMENT_NAME_SEPARATOR)
-                .append(URIUtils.encode(segmentIterator.next()));
-          }
-        }
-        reloadTaskStatusEndpoint += "&segmentName=" + encodedSegmentsBuilder;
-      }
-      serverUrls.add(reloadTaskStatusEndpoint);
-    }
-
-    CompletionServiceHelper.CompletionServiceResponse serviceResponse =
-        completionServiceHelper.doMultiGetRequest(serverUrls, null, true, 
10000);
-
-    ServerReloadControllerJobStatusResponse 
serverReloadControllerJobStatusResponse =
-        new ServerReloadControllerJobStatusResponse();
-    serverReloadControllerJobStatusResponse.setSuccessCount(0);
-
-    int totalSegments = 0;
-    for (Map.Entry<String, List<String>> entry : serverToSegments.entrySet()) {
-      totalSegments += entry.getValue().size();
-    }
-    
serverReloadControllerJobStatusResponse.setTotalSegmentCount(totalSegments);
-    
serverReloadControllerJobStatusResponse.setTotalServersQueried(serverUrls.size());
-    
serverReloadControllerJobStatusResponse.setTotalServerCallsFailed(serviceResponse._failedResponseCount);
-
-    for (Map.Entry<String, String> streamResponse : 
serviceResponse._httpResponses.entrySet()) {
-      String responseString = streamResponse.getValue();
-      try {
-        ServerReloadControllerJobStatusResponse response =
-            JsonUtils.stringToObject(responseString, 
ServerReloadControllerJobStatusResponse.class);
-        serverReloadControllerJobStatusResponse.setSuccessCount(
-            serverReloadControllerJobStatusResponse.getSuccessCount() + 
response.getSuccessCount());
-      } catch (Exception e) {
-        serverReloadControllerJobStatusResponse.setTotalServerCallsFailed(
-            
serverReloadControllerJobStatusResponse.getTotalServerCallsFailed() + 1);
-      }
-    }
-
-    // Add ZK fields
-    
serverReloadControllerJobStatusResponse.setMetadata(controllerJobZKMetadata);
-
-    // Add derived fields
-    long submissionTime = 
Long.parseLong(controllerJobZKMetadata.get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS));
-    double timeElapsedInMinutes = ((double) System.currentTimeMillis() - 
(double) submissionTime) / (1000.0 * 60.0);
-    int remainingSegments = 
serverReloadControllerJobStatusResponse.getTotalSegmentCount()
-        - serverReloadControllerJobStatusResponse.getSuccessCount();
-
-    double estimatedRemainingTimeInMinutes = -1;
-    if (serverReloadControllerJobStatusResponse.getSuccessCount() > 0) {
-      estimatedRemainingTimeInMinutes =
-          ((double) remainingSegments / (double) 
serverReloadControllerJobStatusResponse.getSuccessCount())
-              * timeElapsedInMinutes;
-    }
 
-    
serverReloadControllerJobStatusResponse.setTimeElapsedInMinutes(timeElapsedInMinutes);
-    
serverReloadControllerJobStatusResponse.setEstimatedTimeRemainingInMinutes(estimatedRemainingTimeInMinutes);
-
-    return serverReloadControllerJobStatusResponse;
-  }
-
-  public String getTableReloadMetadata(String tableNameWithType, boolean 
verbose, HttpHeaders headers) {
+  public String needReload(String tableNameWithType, boolean verbose, 
HttpHeaders headers) {
     tableNameWithType = DatabaseUtils.translateTableName(tableNameWithType, 
headers);
     LOG.info("Received a request to check reload for all servers hosting 
segments for table {}", tableNameWithType);
     try {
@@ -307,30 +202,6 @@ public class PinotTableReloadService {
     }
   }
 
-  @VisibleForTesting
-  Map<String, List<String>> getServerToSegments(String tableNameWithType, 
@Nullable String segmentNames,
-      @Nullable String instanceName) {
-    if (segmentNames == null) {
-      // instanceName can be null or not null, and this method below can 
handle both cases.
-      return 
_pinotHelixResourceManager.getServerToSegmentsMap(tableNameWithType, 
instanceName, true);
-    }
-    // Skip servers and segments not involved in the segment reloading job.
-    List<String> segmnetNameList = new ArrayList<>();
-    Collections.addAll(segmnetNameList, StringUtils.split(segmentNames, 
SegmentNameUtils.SEGMENT_NAME_SEPARATOR));
-    if (instanceName != null) {
-      return Map.of(instanceName, segmnetNameList);
-    }
-    // If instance is null, then either one or all segments are being reloaded 
via current segment reload restful APIs.
-    // And the if-check at the beginning of this method has handled the case 
of reloading all segments. So here we
-    // expect only one segment name.
-    Preconditions.checkState(segmnetNameList.size() == 1, "Only one segment is 
expected but got: %s", segmnetNameList);
-    Map<String, List<String>> serverToSegments = new HashMap<>();
-    Set<String> servers = 
_pinotHelixResourceManager.getServers(tableNameWithType, segmentNames);
-    for (String server : servers) {
-      serverToSegments.put(server, Collections.singletonList(segmentNames));
-    }
-    return serverToSegments;
-  }
 
   /**
    * Helper method to find the existing table based on the given table name 
(with or without type suffix) and segment
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/services/PinotTableReloadStatusReporter.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/services/PinotTableReloadStatusReporter.java
new file mode 100644
index 00000000000..ed0950d63f7
--- /dev/null
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/services/PinotTableReloadStatusReporter.java
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.services;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.BiMap;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executor;
+import javax.annotation.Nullable;
+import javax.inject.Inject;
+import javax.inject.Singleton;
+import javax.ws.rs.core.Response;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hc.client5.http.io.HttpClientConnectionManager;
+import org.apache.pinot.common.exception.InvalidConfigException;
+import org.apache.pinot.common.utils.URIUtils;
+import 
org.apache.pinot.controller.api.exception.ControllerApplicationException;
+import 
org.apache.pinot.controller.api.resources.ServerReloadControllerJobStatusResponse;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobTypes;
+import org.apache.pinot.controller.util.CompletionServiceHelper;
+import org.apache.pinot.segment.spi.creator.name.SegmentNameUtils;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static com.google.common.base.Preconditions.checkState;
+
+
+@Singleton
+public class PinotTableReloadStatusReporter {
+  private static final Logger LOG = 
LoggerFactory.getLogger(PinotTableReloadStatusReporter.class);
+
+  private final PinotHelixResourceManager _pinotHelixResourceManager;
+  private final Executor _executor;
+  private final HttpClientConnectionManager _connectionManager;
+
+  @Inject
+  public PinotTableReloadStatusReporter(PinotHelixResourceManager 
pinotHelixResourceManager, Executor executor,
+      HttpClientConnectionManager connectionManager) {
+    _pinotHelixResourceManager = pinotHelixResourceManager;
+    _executor = executor;
+    _connectionManager = connectionManager;
+  }
+
+  private static double 
computeEstimatedRemainingTimeInMinutes(ServerReloadControllerJobStatusResponse 
finalResponse,
+      double timeElapsedInMinutes) {
+    int remainingSegments = finalResponse.getTotalSegmentCount() - 
finalResponse.getSuccessCount();
+
+    double estimatedRemainingTimeInMinutes = -1;
+    if (finalResponse.getSuccessCount() > 0) {
+      estimatedRemainingTimeInMinutes =
+          ((double) remainingSegments / (double) 
finalResponse.getSuccessCount()) * timeElapsedInMinutes;
+    }
+    return estimatedRemainingTimeInMinutes;
+  }
+
+  private static double computeTimeElapsedInMinutes(double submissionTime) {
+    return ((double) System.currentTimeMillis() - submissionTime) / (1000.0 * 
60.0);
+  }
+
+  private static int computeTotalSegments(Map<String, List<String>> 
serverToSegments) {
+    int totalSegments = 0;
+    for (Map.Entry<String, List<String>> entry : serverToSegments.entrySet()) {
+      totalSegments += entry.getValue().size();
+    }
+    return totalSegments;
+  }
+
+  public ServerReloadControllerJobStatusResponse getReloadJobStatus(String 
reloadJobId)
+      throws InvalidConfigException {
+    Map<String, String> controllerJobZKMetadata =
+        _pinotHelixResourceManager.getControllerJobZKMetadata(reloadJobId, 
ControllerJobTypes.RELOAD_SEGMENT);
+    if (controllerJobZKMetadata == null) {
+      throw new ControllerApplicationException(LOG, "Failed to find controller 
job id: " + reloadJobId,
+          Response.Status.NOT_FOUND);
+    }
+
+    String tableNameWithType = 
controllerJobZKMetadata.get(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE);
+    String segmentNames = 
controllerJobZKMetadata.get(CommonConstants.ControllerJob.SEGMENT_RELOAD_JOB_SEGMENT_NAME);
+    String instanceName = 
controllerJobZKMetadata.get(CommonConstants.ControllerJob.SEGMENT_RELOAD_JOB_INSTANCE_NAME);
+    Map<String, List<String>> serverToSegments = 
getServerToSegments(tableNameWithType, segmentNames, instanceName);
+
+    BiMap<String, String> serverEndPoints =
+        
_pinotHelixResourceManager.getDataInstanceAdminEndpoints(serverToSegments.keySet());
+    CompletionServiceHelper completionServiceHelper =
+        new CompletionServiceHelper(_executor, _connectionManager, 
serverEndPoints);
+
+    List<String> serverUrls = new ArrayList<>();
+    for (Map.Entry<String, String> entry : serverEndPoints.entrySet()) {
+      String server = entry.getKey();
+      String endpoint = entry.getValue();
+      String reloadTaskStatusEndpoint =
+          endpoint + "/controllerJob/reloadStatus/" + tableNameWithType + 
"?reloadJobTimestamp="
+              + 
controllerJobZKMetadata.get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS);
+      if (segmentNames != null) {
+        List<String> segmentsForServer = serverToSegments.get(server);
+        StringBuilder encodedSegmentsBuilder = new StringBuilder();
+        if (!segmentsForServer.isEmpty()) {
+          Iterator<String> segmentIterator = segmentsForServer.iterator();
+          // Append first segment without a leading separator
+          
encodedSegmentsBuilder.append(URIUtils.encode(segmentIterator.next()));
+          // Append remaining segments, each prefixed by the separator
+          while (segmentIterator.hasNext()) {
+            
encodedSegmentsBuilder.append(SegmentNameUtils.SEGMENT_NAME_SEPARATOR)
+                .append(URIUtils.encode(segmentIterator.next()));
+          }
+        }
+        reloadTaskStatusEndpoint += "&segmentName=" + encodedSegmentsBuilder;
+      }
+      serverUrls.add(reloadTaskStatusEndpoint);
+    }
+
+    CompletionServiceHelper.CompletionServiceResponse serviceResponse =
+        completionServiceHelper.doMultiGetRequest(serverUrls, null, true, 
10000);
+
+    ServerReloadControllerJobStatusResponse response = new 
ServerReloadControllerJobStatusResponse().setSuccessCount(0)
+        .setTotalSegmentCount(computeTotalSegments(serverToSegments))
+        .setTotalServersQueried(serverUrls.size())
+        .setTotalServerCallsFailed(serviceResponse._failedResponseCount);
+
+    for (Map.Entry<String, String> streamResponse : 
serviceResponse._httpResponses.entrySet()) {
+      String responseString = streamResponse.getValue();
+      try {
+        ServerReloadControllerJobStatusResponse r =
+            JsonUtils.stringToObject(responseString, 
ServerReloadControllerJobStatusResponse.class);
+        response.setSuccessCount(response.getSuccessCount() + 
r.getSuccessCount());
+      } catch (Exception e) {
+        
response.setTotalServerCallsFailed(response.getTotalServerCallsFailed() + 1);
+      }
+    }
+
+    // Add derived fields
+    final long submissionTime =
+        
Long.parseLong(controllerJobZKMetadata.get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS));
+    final double timeElapsedInMinutes = computeTimeElapsedInMinutes((double) 
submissionTime);
+    final double estimatedRemainingTimeInMinutes =
+        computeEstimatedRemainingTimeInMinutes(response, timeElapsedInMinutes);
+
+    return response.setMetadata(controllerJobZKMetadata)
+        .setTimeElapsedInMinutes(timeElapsedInMinutes)
+        .setEstimatedTimeRemainingInMinutes(estimatedRemainingTimeInMinutes);
+  }
+
+  @VisibleForTesting
+  Map<String, List<String>> getServerToSegments(String tableNameWithType, 
@Nullable String segmentNames,
+      @Nullable String instanceName) {
+    if (segmentNames == null) {
+      // instanceName can be null or not null, and this method below can 
handle both cases.
+      return 
_pinotHelixResourceManager.getServerToSegmentsMap(tableNameWithType, 
instanceName, true);
+    }
+    // Skip servers and segments not involved in the segment reloading job.
+    List<String> segmnetNameList = new ArrayList<>();
+    Collections.addAll(segmnetNameList, StringUtils.split(segmentNames, 
SegmentNameUtils.SEGMENT_NAME_SEPARATOR));
+    if (instanceName != null) {
+      return Map.of(instanceName, segmnetNameList);
+    }
+    // If instance is null, then either one or all segments are being reloaded 
via current segment reload restful APIs.
+    // And the if-check at the beginning of this method has handled the case 
of reloading all segments. So here we
+    // expect only one segment name.
+    checkState(segmnetNameList.size() == 1, "Only one segment is expected but 
got: %s", segmnetNameList);
+    Map<String, List<String>> serverToSegments = new HashMap<>();
+    Set<String> servers = 
_pinotHelixResourceManager.getServers(tableNameWithType, segmentNames);
+    for (String server : servers) {
+      serverToSegments.put(server, Collections.singletonList(segmentNames));
+    }
+    return serverToSegments;
+  }
+}
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/services/PinotTableReloadServiceTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/services/PinotTableReloadStatusReporterTest.java
similarity index 81%
rename from 
pinot-controller/src/test/java/org/apache/pinot/controller/services/PinotTableReloadServiceTest.java
rename to 
pinot-controller/src/test/java/org/apache/pinot/controller/services/PinotTableReloadStatusReporterTest.java
index a18f8c0ccf5..8fa3b512fa3 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/services/PinotTableReloadServiceTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/services/PinotTableReloadStatusReporterTest.java
@@ -35,12 +35,12 @@ import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
 
 
-public class PinotTableReloadServiceTest {
+public class PinotTableReloadStatusReporterTest {
   @Mock
   PinotHelixResourceManager _pinotHelixResourceManager;
 
   @InjectMocks
-  PinotTableReloadService _service;
+  PinotTableReloadStatusReporter _instance;
 
   @BeforeMethod
   public void setup() {
@@ -60,26 +60,26 @@ public class PinotTableReloadServiceTest {
     when(_pinotHelixResourceManager.getServers(tableName, 
"seg01")).thenReturn(Set.of("svr01", "svr03"));
 
     // Get all servers and all their segments.
-    Map<String, List<String>> serverToSegmentsMap = 
_service.getServerToSegments(tableName, null, null);
+    Map<String, List<String>> serverToSegmentsMap = 
_instance.getServerToSegments(tableName, null, null);
     assertEquals(serverToSegmentsMap, fullServerToSegmentsMap);
 
     // Get all segments on svr02.
-    serverToSegmentsMap = _service.getServerToSegments(tableName, null, 
"svr02");
+    serverToSegmentsMap = _instance.getServerToSegments(tableName, null, 
"svr02");
     assertEquals(serverToSegmentsMap, Map.of("svr02", List.of("seg02", 
"seg03")));
 
     // Get all servers with seg01.
-    serverToSegmentsMap = _service.getServerToSegments(tableName, "seg01", 
null);
+    serverToSegmentsMap = _instance.getServerToSegments(tableName, "seg01", 
null);
     assertEquals(serverToSegmentsMap, Map.of("svr01", List.of("seg01"), 
"svr03", List.of("seg01")));
 
     // Simply map the provided server to the provided segments.
-    serverToSegmentsMap = _service.getServerToSegments(tableName, "seg01", 
"svr01");
+    serverToSegmentsMap = _instance.getServerToSegments(tableName, "seg01", 
"svr01");
     assertEquals(serverToSegmentsMap, Map.of("svr01", List.of("seg01")));
-    serverToSegmentsMap = _service.getServerToSegments(tableName, 
"anySegment", "anyServer");
+    serverToSegmentsMap = _instance.getServerToSegments(tableName, 
"anySegment", "anyServer");
     assertEquals(serverToSegmentsMap, Map.of("anyServer", 
List.of("anySegment")));
-    serverToSegmentsMap = _service.getServerToSegments(tableName, 
"seg01|seg02", "svr02");
+    serverToSegmentsMap = _instance.getServerToSegments(tableName, 
"seg01|seg02", "svr02");
     assertEquals(serverToSegmentsMap, Map.of("svr02", List.of("seg01", 
"seg02")));
     try {
-      _service.getServerToSegments(tableName, "seg01,seg02", null);
+      _instance.getServerToSegments(tableName, "seg01,seg02", null);
     } catch (Exception e) {
       assertTrue(e.getMessage().contains("Only one segment is expected but 
got: [seg01, seg02]"));
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to