This is an automated email from the ASF dual-hosted git repository.

somandal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 9265b964855 [controller] Refactor table reload status API to improve 
type safety (#17051)
9265b964855 is described below

commit 9265b964855a9abc10f19de05147ef191ae147d1
Author: Suvodeep Pyne <[email protected]>
AuthorDate: Wed Oct 22 19:35:06 2025 -0700

    [controller] Refactor table reload status API to improve type safety 
(#17051)
    
    * [controller] Refactor reload job status logic into a dedicated 
PinotTableReloadStatusReporter class
    
    * [controller] Chain setters in PinotTableReloadStatusReporter for 
simplification and clean-up redundant code
    
    * [controller] Cosmetic. Rename ServerReloadControllerJobStatusResponse to 
PinotTableReloadStatusResponse
    
    * [controller] Introduce PinotControllerJobDto to simplify and type-safe 
reload job metadata handling
    
    * [controller] Refactor reload job status logic for clarity
    
    * [controller] Rename PinotControllerJobDto to 
PinotControllerJobMetadataDto for better clarity and consistency
    
    * [controller] Add package-info.java for DTOs and update 
PinotControllerJobMetadataDto javadoc for clarity
    
    * [controller] Fix javadoc style in package-info.java for DTOs
---
 .../api/dto/PinotControllerJobMetadataDto.java     | 109 ++++++++++++++++
 .../PinotTableReloadStatusResponse.java}           |  24 ++--
 .../pinot/controller/api/dto/package-info.java     |  38 ++++++
 .../api/resources/PinotTableReloadResource.java    |   3 +-
 .../services/PinotTableReloadStatusReporter.java   | 139 ++++++++++++---------
 ...PartialUpsertTableRebalanceIntegrationTest.java |   6 +-
 .../tests/TableRebalanceIntegrationTest.java       |   6 +-
 7 files changed, 248 insertions(+), 77 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/dto/PinotControllerJobMetadataDto.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/dto/PinotControllerJobMetadataDto.java
new file mode 100644
index 00000000000..295bd6517c3
--- /dev/null
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/dto/PinotControllerJobMetadataDto.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.dto;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import javax.annotation.Nullable;
+
+
+/**
+ * Type-safe DTO (Data Transfer Object) for controller job ZK metadata.
+ * Provides structured access to job metadata fields instead of using raw 
Map<String, String>
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class PinotControllerJobMetadataDto {
+  private String _jobId;
+
+  @JsonProperty("tableName")
+  private String _tableNameWithType;
+
+  private String _jobType;
+  private long _submissionTimeMs;
+  private int _messageCount;
+  private String _segmentName;
+  private String _instanceName;
+
+  public String getJobId() {
+    return _jobId;
+  }
+
+  public PinotControllerJobMetadataDto setJobId(String jobId) {
+    _jobId = jobId;
+    return this;
+  }
+
+  public String getTableNameWithType() {
+    return _tableNameWithType;
+  }
+
+  public PinotControllerJobMetadataDto setTableNameWithType(String 
tableNameWithType) {
+    _tableNameWithType = tableNameWithType;
+    return this;
+  }
+
+  public String getJobType() {
+    return _jobType;
+  }
+
+  public PinotControllerJobMetadataDto setJobType(String jobType) {
+    _jobType = jobType;
+    return this;
+  }
+
+  public long getSubmissionTimeMs() {
+    return _submissionTimeMs;
+  }
+
+  public PinotControllerJobMetadataDto setSubmissionTimeMs(long 
submissionTimeMs) {
+    _submissionTimeMs = submissionTimeMs;
+    return this;
+  }
+
+  public int getMessageCount() {
+    return _messageCount;
+  }
+
+  public PinotControllerJobMetadataDto setMessageCount(int messageCount) {
+    _messageCount = messageCount;
+    return this;
+  }
+
+  @Nullable
+  public String getSegmentName() {
+    return _segmentName;
+  }
+
+  public PinotControllerJobMetadataDto setSegmentName(@Nullable String 
segmentName) {
+    _segmentName = segmentName;
+    return this;
+  }
+
+  @Nullable
+  public String getInstanceName() {
+    return _instanceName;
+  }
+
+  public PinotControllerJobMetadataDto setInstanceName(@Nullable String 
instanceName) {
+    _instanceName = instanceName;
+    return this;
+  }
+}
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/ServerReloadControllerJobStatusResponse.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/dto/PinotTableReloadStatusResponse.java
similarity index 70%
rename from 
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/ServerReloadControllerJobStatusResponse.java
rename to 
pinot-controller/src/main/java/org/apache/pinot/controller/api/dto/PinotTableReloadStatusResponse.java
index 9a0467043d7..8518ffdc57a 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/ServerReloadControllerJobStatusResponse.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/dto/PinotTableReloadStatusResponse.java
@@ -16,24 +16,22 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.controller.api.resources;
+package org.apache.pinot.controller.api.dto;
 
-import java.util.Map;
-
-public class ServerReloadControllerJobStatusResponse {
+public class PinotTableReloadStatusResponse {
   private double _timeElapsedInMinutes;
   private double _estimatedTimeRemainingInMinutes;
   private int _totalSegmentCount;
   private int _successCount;
   private int _totalServersQueried;
   private int _totalServerCallsFailed;
-  private Map<String, String> _metadata;
+  private PinotControllerJobMetadataDto _metadata;
 
   public int getTotalSegmentCount() {
     return _totalSegmentCount;
   }
 
-  public ServerReloadControllerJobStatusResponse setTotalSegmentCount(int 
totalSegmentCount) {
+  public PinotTableReloadStatusResponse setTotalSegmentCount(int 
totalSegmentCount) {
     _totalSegmentCount = totalSegmentCount;
     return this;
   }
@@ -42,7 +40,7 @@ public class ServerReloadControllerJobStatusResponse {
     return _successCount;
   }
 
-  public ServerReloadControllerJobStatusResponse setSuccessCount(int 
successCount) {
+  public PinotTableReloadStatusResponse setSuccessCount(int successCount) {
     _successCount = successCount;
     return this;
   }
@@ -51,7 +49,7 @@ public class ServerReloadControllerJobStatusResponse {
     return _estimatedTimeRemainingInMinutes;
   }
 
-  public ServerReloadControllerJobStatusResponse 
setEstimatedTimeRemainingInMinutes(
+  public PinotTableReloadStatusResponse setEstimatedTimeRemainingInMinutes(
       double estimatedTimeRemainingInMinutes) {
     _estimatedTimeRemainingInMinutes = estimatedTimeRemainingInMinutes;
     return this;
@@ -61,7 +59,7 @@ public class ServerReloadControllerJobStatusResponse {
     return _timeElapsedInMinutes;
   }
 
-  public ServerReloadControllerJobStatusResponse 
setTimeElapsedInMinutes(double timeElapsedInMinutes) {
+  public PinotTableReloadStatusResponse setTimeElapsedInMinutes(double 
timeElapsedInMinutes) {
     _timeElapsedInMinutes = timeElapsedInMinutes;
     return this;
   }
@@ -71,7 +69,7 @@ public class ServerReloadControllerJobStatusResponse {
     return _totalServersQueried;
   }
 
-  public ServerReloadControllerJobStatusResponse setTotalServersQueried(int 
totalServersQueried) {
+  public PinotTableReloadStatusResponse setTotalServersQueried(int 
totalServersQueried) {
     _totalServersQueried = totalServersQueried;
     return this;
   }
@@ -80,16 +78,16 @@ public class ServerReloadControllerJobStatusResponse {
     return _totalServerCallsFailed;
   }
 
-  public ServerReloadControllerJobStatusResponse setTotalServerCallsFailed(int 
totalServerCallsFailed) {
+  public PinotTableReloadStatusResponse setTotalServerCallsFailed(int 
totalServerCallsFailed) {
     _totalServerCallsFailed = totalServerCallsFailed;
     return this;
   }
 
-  public Map<String, String> getMetadata() {
+  public PinotControllerJobMetadataDto getMetadata() {
     return _metadata;
   }
 
-  public ServerReloadControllerJobStatusResponse setMetadata(Map<String, 
String> metadata) {
+  public PinotTableReloadStatusResponse 
setMetadata(PinotControllerJobMetadataDto metadata) {
     _metadata = metadata;
     return this;
   }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/dto/package-info.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/dto/package-info.java
new file mode 100644
index 00000000000..915c785760f
--- /dev/null
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/dto/package-info.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Data Transfer Objects (DTOs) for Pinot REST APIs.
+ *
+ * <p>
+ * DTOs are Plain Old Java Objects (POJOs) that serve as the contract between 
Pinot Controller's
+ * REST APIs and external clients. These objects are serialized to/from JSON 
when exchanged via
+ * HTTP endpoints.
+ * </p>
+ *
+ * <h2>Guidelines</h2>
+ * <ul>
+ *   <li><b>Public API Contract:</b> Classes here are part of Pinot's public 
API.
+ *       Maintain backward compatibility or follow proper deprecation 
practices.</li>
+ *   <li><b>Simple POJOs:</b> Keep DTOs as simple data containers without 
business logic.</li>
+ *   <li><b>Serialization-Friendly:</b> All fields should be 
JSON-serializable.</li>
+ *   <li><b>Fluent Setters:</b> Use fluent-style setters that return {@code 
this}.</li>
+ *   <li><b>Documentation:</b> Document each DTO and its fields clearly.</li>
+ * </ul>
+ */
+package org.apache.pinot.controller.api.dto;
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableReloadResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableReloadResource.java
index 9cb4d8c2c9c..6e6c62c00c5 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableReloadResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableReloadResource.java
@@ -43,6 +43,7 @@ import javax.ws.rs.core.HttpHeaders;
 import javax.ws.rs.core.MediaType;
 import org.apache.pinot.controller.api.access.AccessType;
 import org.apache.pinot.controller.api.access.Authenticate;
+import org.apache.pinot.controller.api.dto.PinotTableReloadStatusResponse;
 import org.apache.pinot.controller.services.PinotTableReloadService;
 import org.apache.pinot.controller.services.PinotTableReloadStatusReporter;
 import org.apache.pinot.core.auth.Actions;
@@ -158,7 +159,7 @@ public class PinotTableReloadResource {
       @ApiResponse(code = 200, message = "Job status retrieved successfully"),
       @ApiResponse(code = 404, message = "Job ID not found")
   })
-  public ServerReloadControllerJobStatusResponse getReloadJobStatus(
+  public PinotTableReloadStatusResponse getReloadJobStatus(
       @ApiParam(value = "Reload job ID returned from reload endpoint", 
required = true) @PathParam("jobId")
       String reloadJobId) throws Exception {
     return _statusReporter.getReloadJobStatus(reloadJobId);
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/services/PinotTableReloadStatusReporter.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/services/PinotTableReloadStatusReporter.java
index ed0950d63f7..d8284e39063 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/services/PinotTableReloadStatusReporter.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/services/PinotTableReloadStatusReporter.java
@@ -36,13 +36,13 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.hc.client5.http.io.HttpClientConnectionManager;
 import org.apache.pinot.common.exception.InvalidConfigException;
 import org.apache.pinot.common.utils.URIUtils;
+import org.apache.pinot.controller.api.dto.PinotControllerJobMetadataDto;
+import org.apache.pinot.controller.api.dto.PinotTableReloadStatusResponse;
 import 
org.apache.pinot.controller.api.exception.ControllerApplicationException;
-import 
org.apache.pinot.controller.api.resources.ServerReloadControllerJobStatusResponse;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobTypes;
 import org.apache.pinot.controller.util.CompletionServiceHelper;
 import org.apache.pinot.segment.spi.creator.name.SegmentNameUtils;
-import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.JsonUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -66,7 +66,7 @@ public class PinotTableReloadStatusReporter {
     _connectionManager = connectionManager;
   }
 
-  private static double 
computeEstimatedRemainingTimeInMinutes(ServerReloadControllerJobStatusResponse 
finalResponse,
+  private static double 
computeEstimatedRemainingTimeInMinutes(PinotTableReloadStatusResponse 
finalResponse,
       double timeElapsedInMinutes) {
     int remainingSegments = finalResponse.getTotalSegmentCount() - 
finalResponse.getSuccessCount();
 
@@ -90,54 +90,61 @@ public class PinotTableReloadStatusReporter {
     return totalSegments;
   }
 
-  public ServerReloadControllerJobStatusResponse getReloadJobStatus(String 
reloadJobId)
-      throws InvalidConfigException {
-    Map<String, String> controllerJobZKMetadata =
-        _pinotHelixResourceManager.getControllerJobZKMetadata(reloadJobId, 
ControllerJobTypes.RELOAD_SEGMENT);
-    if (controllerJobZKMetadata == null) {
-      throw new ControllerApplicationException(LOG, "Failed to find controller 
job id: " + reloadJobId,
-          Response.Status.NOT_FOUND);
+  private static List<String> getServerUrls(BiMap<String, String> 
serverEndPoints,
+      PinotControllerJobMetadataDto reloadJob,
+      Map<String, List<String>> serverToSegments) {
+    List<String> serverUrls = new ArrayList<>();
+    for (Map.Entry<String, String> entry : serverEndPoints.entrySet()) {
+      final String server = entry.getKey();
+      final String endpoint = entry.getValue();
+      serverUrls.add(constructReloadTaskStatusEndpoint(reloadJob, 
serverToSegments, endpoint, server));
     }
+    return serverUrls;
+  }
 
-    String tableNameWithType = 
controllerJobZKMetadata.get(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE);
-    String segmentNames = 
controllerJobZKMetadata.get(CommonConstants.ControllerJob.SEGMENT_RELOAD_JOB_SEGMENT_NAME);
-    String instanceName = 
controllerJobZKMetadata.get(CommonConstants.ControllerJob.SEGMENT_RELOAD_JOB_INSTANCE_NAME);
-    Map<String, List<String>> serverToSegments = 
getServerToSegments(tableNameWithType, segmentNames, instanceName);
-
-    BiMap<String, String> serverEndPoints =
-        
_pinotHelixResourceManager.getDataInstanceAdminEndpoints(serverToSegments.keySet());
-    CompletionServiceHelper completionServiceHelper =
-        new CompletionServiceHelper(_executor, _connectionManager, 
serverEndPoints);
+  private static String 
constructReloadTaskStatusEndpoint(PinotControllerJobMetadataDto reloadJob,
+      Map<String, List<String>> serverToSegments, String endpoint, String 
server) {
+    String reloadTaskStatusEndpoint = constructReloadStatusEndpoint(reloadJob, 
endpoint);
+    if (reloadJob.getSegmentName() == null) {
+      return reloadTaskStatusEndpoint;
+    }
 
-    List<String> serverUrls = new ArrayList<>();
-    for (Map.Entry<String, String> entry : serverEndPoints.entrySet()) {
-      String server = entry.getKey();
-      String endpoint = entry.getValue();
-      String reloadTaskStatusEndpoint =
-          endpoint + "/controllerJob/reloadStatus/" + tableNameWithType + 
"?reloadJobTimestamp="
-              + 
controllerJobZKMetadata.get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS);
-      if (segmentNames != null) {
-        List<String> segmentsForServer = serverToSegments.get(server);
-        StringBuilder encodedSegmentsBuilder = new StringBuilder();
-        if (!segmentsForServer.isEmpty()) {
-          Iterator<String> segmentIterator = segmentsForServer.iterator();
-          // Append first segment without a leading separator
-          
encodedSegmentsBuilder.append(URIUtils.encode(segmentIterator.next()));
-          // Append remaining segments, each prefixed by the separator
-          while (segmentIterator.hasNext()) {
-            
encodedSegmentsBuilder.append(SegmentNameUtils.SEGMENT_NAME_SEPARATOR)
-                .append(URIUtils.encode(segmentIterator.next()));
-          }
-        }
-        reloadTaskStatusEndpoint += "&segmentName=" + encodedSegmentsBuilder;
+    List<String> segmentsForServer = serverToSegments.get(server);
+    StringBuilder encodedSegmentsBuilder = new StringBuilder();
+    if (!segmentsForServer.isEmpty()) {
+      Iterator<String> segmentIterator = segmentsForServer.iterator();
+      // Append first segment without a leading separator
+      encodedSegmentsBuilder.append(URIUtils.encode(segmentIterator.next()));
+      // Append remaining segments, each prefixed by the separator
+      while (segmentIterator.hasNext()) {
+        encodedSegmentsBuilder.append(SegmentNameUtils.SEGMENT_NAME_SEPARATOR)
+            .append(URIUtils.encode(segmentIterator.next()));
       }
-      serverUrls.add(reloadTaskStatusEndpoint);
     }
+    reloadTaskStatusEndpoint += "&segmentName=" + encodedSegmentsBuilder;
+    return reloadTaskStatusEndpoint;
+  }
+
+  private static String 
constructReloadStatusEndpoint(PinotControllerJobMetadataDto reloadJob, String 
endpoint) {
+    return endpoint + "/controllerJob/reloadStatus/" + 
reloadJob.getTableNameWithType() + "?reloadJobTimestamp="
+        + reloadJob.getSubmissionTimeMs();
+  }
 
-    CompletionServiceHelper.CompletionServiceResponse serviceResponse =
+  public PinotTableReloadStatusResponse getReloadJobStatus(String reloadJobId)
+      throws InvalidConfigException {
+    final PinotControllerJobMetadataDto reloadJobMetadata = 
getControllerJobMetadataFromZk(reloadJobId);
+    final Map<String, List<String>> serverToSegments = 
getServerToSegments(reloadJobMetadata);
+
+    final BiMap<String, String> serverEndPoints =
+        
_pinotHelixResourceManager.getDataInstanceAdminEndpoints(serverToSegments.keySet());
+    final List<String> serverUrls = getServerUrls(serverEndPoints, 
reloadJobMetadata, serverToSegments);
+
+    final CompletionServiceHelper completionServiceHelper =
+        new CompletionServiceHelper(_executor, _connectionManager, 
serverEndPoints);
+    final CompletionServiceHelper.CompletionServiceResponse serviceResponse =
         completionServiceHelper.doMultiGetRequest(serverUrls, null, true, 
10000);
 
-    ServerReloadControllerJobStatusResponse response = new 
ServerReloadControllerJobStatusResponse().setSuccessCount(0)
+    final PinotTableReloadStatusResponse response = new 
PinotTableReloadStatusResponse().setSuccessCount(0)
         .setTotalSegmentCount(computeTotalSegments(serverToSegments))
         .setTotalServersQueried(serverUrls.size())
         .setTotalServerCallsFailed(serviceResponse._failedResponseCount);
@@ -145,8 +152,8 @@ public class PinotTableReloadStatusReporter {
     for (Map.Entry<String, String> streamResponse : 
serviceResponse._httpResponses.entrySet()) {
       String responseString = streamResponse.getValue();
       try {
-        ServerReloadControllerJobStatusResponse r =
-            JsonUtils.stringToObject(responseString, 
ServerReloadControllerJobStatusResponse.class);
+        PinotTableReloadStatusResponse r =
+            JsonUtils.stringToObject(responseString, 
PinotTableReloadStatusResponse.class);
         response.setSuccessCount(response.getSuccessCount() + 
r.getSuccessCount());
       } catch (Exception e) {
         
response.setTotalServerCallsFailed(response.getTotalServerCallsFailed() + 1);
@@ -154,38 +161,56 @@ public class PinotTableReloadStatusReporter {
     }
 
     // Add derived fields
-    final long submissionTime =
-        
Long.parseLong(controllerJobZKMetadata.get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS));
-    final double timeElapsedInMinutes = computeTimeElapsedInMinutes((double) 
submissionTime);
+    final double timeElapsedInMinutes = 
computeTimeElapsedInMinutes(reloadJobMetadata.getSubmissionTimeMs());
     final double estimatedRemainingTimeInMinutes =
         computeEstimatedRemainingTimeInMinutes(response, timeElapsedInMinutes);
 
-    return response.setMetadata(controllerJobZKMetadata)
+    return response.setMetadata(reloadJobMetadata)
         .setTimeElapsedInMinutes(timeElapsedInMinutes)
         .setEstimatedTimeRemainingInMinutes(estimatedRemainingTimeInMinutes);
   }
 
+  private PinotControllerJobMetadataDto getControllerJobMetadataFromZk(String 
reloadJobId) {
+    Map<String, String> controllerJobZKMetadata =
+        _pinotHelixResourceManager.getControllerJobZKMetadata(reloadJobId, 
ControllerJobTypes.RELOAD_SEGMENT);
+    if (controllerJobZKMetadata == null) {
+      throw new ControllerApplicationException(LOG, "Failed to find controller 
job id: " + reloadJobId,
+          Response.Status.NOT_FOUND);
+    }
+    try {
+      return 
JsonUtils.jsonNodeToObject(JsonUtils.objectToJsonNode(controllerJobZKMetadata),
+          PinotControllerJobMetadataDto.class);
+    } catch (Exception e) {
+      throw new IllegalArgumentException("Failed to convert metadata to 
PinotControllerJobDTO", e);
+    }
+  }
+
+  @VisibleForTesting
+  Map<String, List<String>> getServerToSegments(PinotControllerJobMetadataDto 
job) {
+    return getServerToSegments(job.getTableNameWithType(), 
job.getSegmentName(), job.getInstanceName());
+  }
+
   @VisibleForTesting
-  Map<String, List<String>> getServerToSegments(String tableNameWithType, 
@Nullable String segmentNames,
+  Map<String, List<String>> getServerToSegments(String tableNameWithType, 
@Nullable String segmentNamesString,
       @Nullable String instanceName) {
-    if (segmentNames == null) {
+    if (segmentNamesString == null) {
       // instanceName can be null or not null, and this method below can 
handle both cases.
       return 
_pinotHelixResourceManager.getServerToSegmentsMap(tableNameWithType, 
instanceName, true);
     }
     // Skip servers and segments not involved in the segment reloading job.
-    List<String> segmnetNameList = new ArrayList<>();
-    Collections.addAll(segmnetNameList, StringUtils.split(segmentNames, 
SegmentNameUtils.SEGMENT_NAME_SEPARATOR));
+    List<String> segmentNames = new ArrayList<>();
+    Collections.addAll(segmentNames, StringUtils.split(segmentNamesString, 
SegmentNameUtils.SEGMENT_NAME_SEPARATOR));
     if (instanceName != null) {
-      return Map.of(instanceName, segmnetNameList);
+      return Map.of(instanceName, segmentNames);
     }
     // If instance is null, then either one or all segments are being reloaded 
via current segment reload restful APIs.
     // And the if-check at the beginning of this method has handled the case 
of reloading all segments. So here we
     // expect only one segment name.
-    checkState(segmnetNameList.size() == 1, "Only one segment is expected but 
got: %s", segmnetNameList);
+    checkState(segmentNames.size() == 1, "Only one segment is expected but 
got: %s", segmentNames);
     Map<String, List<String>> serverToSegments = new HashMap<>();
-    Set<String> servers = 
_pinotHelixResourceManager.getServers(tableNameWithType, segmentNames);
+    Set<String> servers = 
_pinotHelixResourceManager.getServers(tableNameWithType, segmentNamesString);
     for (String server : servers) {
-      serverToSegments.put(server, Collections.singletonList(segmentNames));
+      serverToSegments.put(server, 
Collections.singletonList(segmentNamesString));
     }
     return serverToSegments;
   }
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PartialUpsertTableRebalanceIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PartialUpsertTableRebalanceIntegrationTest.java
index 80ab7359ad9..6a3097c4c1a 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PartialUpsertTableRebalanceIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PartialUpsertTableRebalanceIntegrationTest.java
@@ -36,9 +36,9 @@ import org.apache.pinot.common.utils.LLCSegmentName;
 import org.apache.pinot.common.utils.SimpleHttpResponse;
 import org.apache.pinot.common.utils.helix.HelixHelper;
 import org.apache.pinot.common.utils.http.HttpClient;
+import org.apache.pinot.controller.api.dto.PinotTableReloadStatusResponse;
 import org.apache.pinot.controller.api.resources.PauseStatusDetails;
 import 
org.apache.pinot.controller.api.resources.ServerRebalanceJobStatusResponse;
-import 
org.apache.pinot.controller.api.resources.ServerReloadControllerJobStatusResponse;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig;
 import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
@@ -450,8 +450,8 @@ public class PartialUpsertTableRebalanceIntegrationTest 
extends BaseClusterInteg
         try {
           SimpleHttpResponse httpResponse =
               
HttpClient.wrapAndThrowHttpException(_httpClient.sendGetRequest(new 
URL(requestUrl).toURI(), null));
-          ServerReloadControllerJobStatusResponse segmentReloadStatusValue =
-              JsonUtils.stringToObject(httpResponse.getResponse(), 
ServerReloadControllerJobStatusResponse.class);
+          PinotTableReloadStatusResponse segmentReloadStatusValue =
+              JsonUtils.stringToObject(httpResponse.getResponse(), 
PinotTableReloadStatusResponse.class);
           return segmentReloadStatusValue.getSuccessCount() == 
segmentReloadStatusValue.getTotalSegmentCount();
         } catch (HttpErrorStatusException | URISyntaxException e) {
           throw new IOException(e);
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TableRebalanceIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TableRebalanceIntegrationTest.java
index 21658ec1277..42c8ef7b73b 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TableRebalanceIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TableRebalanceIntegrationTest.java
@@ -37,7 +37,7 @@ import org.apache.pinot.common.utils.regex.JavaUtilPattern;
 import org.apache.pinot.common.utils.regex.Matcher;
 import org.apache.pinot.common.utils.regex.Pattern;
 import org.apache.pinot.controller.ControllerConf;
-import 
org.apache.pinot.controller.api.resources.ServerReloadControllerJobStatusResponse;
+import org.apache.pinot.controller.api.dto.PinotTableReloadStatusResponse;
 import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobTypes;
 import 
org.apache.pinot.controller.helix.core.rebalance.DefaultRebalancePreChecker;
 import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig;
@@ -1495,8 +1495,8 @@ public class TableRebalanceIntegrationTest extends 
BaseHybridClusterIntegrationT
         String requestUrl = 
getControllerRequestURLBuilder().forSegmentReloadStatus(reloadJobId);
         SimpleHttpResponse httpResponse =
             
HttpClient.wrapAndThrowHttpException(getHttpClient().sendGetRequest(new 
URL(requestUrl).toURI(), null));
-        ServerReloadControllerJobStatusResponse reloadResult =
-            JsonUtils.stringToObject(httpResponse.getResponse(), 
ServerReloadControllerJobStatusResponse.class);
+        PinotTableReloadStatusResponse reloadResult =
+            JsonUtils.stringToObject(httpResponse.getResponse(), 
PinotTableReloadStatusResponse.class);
         return reloadResult.getEstimatedTimeRemainingInMinutes() == 0.0;
       } catch (Exception e) {
         return null;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to