This is an automated email from the ASF dual-hosted git repository.
somandal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 9265b964855 [controller] Refactor table reload status API to improve
type safety (#17051)
9265b964855 is described below
commit 9265b964855a9abc10f19de05147ef191ae147d1
Author: Suvodeep Pyne <[email protected]>
AuthorDate: Wed Oct 22 19:35:06 2025 -0700
[controller] Refactor table reload status API to improve type safety
(#17051)
* [controller] Refactor reload job status logic into a dedicated
PinotTableReloadStatusReporter class
* [controller] Chain setters in PinotTableReloadStatusReporter for
simplification and clean-up redundant code
* [controller] Cosmetic. Rename ServerReloadControllerJobStatusResponse to
PinotTableReloadStatusResponse
* [controller] Introduce PinotControllerJobDto to simplify and type-safe
reload job metadata handling
* [controller] Refactor reload job status logic for clarity
* [controller] Rename PinotControllerJobDto to
PinotControllerJobMetadataDto for better clarity and consistency
* [controller] Add package-info.java for DTOs and update
PinotControllerJobMetadataDto javadoc for clarity
* [controller] Fix javadoc style in package-info.java for DTOs
---
.../api/dto/PinotControllerJobMetadataDto.java | 109 ++++++++++++++++
.../PinotTableReloadStatusResponse.java} | 24 ++--
.../pinot/controller/api/dto/package-info.java | 38 ++++++
.../api/resources/PinotTableReloadResource.java | 3 +-
.../services/PinotTableReloadStatusReporter.java | 139 ++++++++++++---------
...PartialUpsertTableRebalanceIntegrationTest.java | 6 +-
.../tests/TableRebalanceIntegrationTest.java | 6 +-
7 files changed, 248 insertions(+), 77 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/dto/PinotControllerJobMetadataDto.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/dto/PinotControllerJobMetadataDto.java
new file mode 100644
index 00000000000..295bd6517c3
--- /dev/null
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/dto/PinotControllerJobMetadataDto.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.dto;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import javax.annotation.Nullable;
+
+
+/**
+ * Type-safe DTO (Data Transfer Object) for controller job ZK metadata.
+ * Provides structured access to job metadata fields instead of using raw
Map<String, String>
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class PinotControllerJobMetadataDto {
+ private String _jobId;
+
+ @JsonProperty("tableName")
+ private String _tableNameWithType;
+
+ private String _jobType;
+ private long _submissionTimeMs;
+ private int _messageCount;
+ private String _segmentName;
+ private String _instanceName;
+
+ public String getJobId() {
+ return _jobId;
+ }
+
+ public PinotControllerJobMetadataDto setJobId(String jobId) {
+ _jobId = jobId;
+ return this;
+ }
+
+ public String getTableNameWithType() {
+ return _tableNameWithType;
+ }
+
+ public PinotControllerJobMetadataDto setTableNameWithType(String
tableNameWithType) {
+ _tableNameWithType = tableNameWithType;
+ return this;
+ }
+
+ public String getJobType() {
+ return _jobType;
+ }
+
+ public PinotControllerJobMetadataDto setJobType(String jobType) {
+ _jobType = jobType;
+ return this;
+ }
+
+ public long getSubmissionTimeMs() {
+ return _submissionTimeMs;
+ }
+
+ public PinotControllerJobMetadataDto setSubmissionTimeMs(long
submissionTimeMs) {
+ _submissionTimeMs = submissionTimeMs;
+ return this;
+ }
+
+ public int getMessageCount() {
+ return _messageCount;
+ }
+
+ public PinotControllerJobMetadataDto setMessageCount(int messageCount) {
+ _messageCount = messageCount;
+ return this;
+ }
+
+ @Nullable
+ public String getSegmentName() {
+ return _segmentName;
+ }
+
+ public PinotControllerJobMetadataDto setSegmentName(@Nullable String
segmentName) {
+ _segmentName = segmentName;
+ return this;
+ }
+
+ @Nullable
+ public String getInstanceName() {
+ return _instanceName;
+ }
+
+ public PinotControllerJobMetadataDto setInstanceName(@Nullable String
instanceName) {
+ _instanceName = instanceName;
+ return this;
+ }
+}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/ServerReloadControllerJobStatusResponse.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/dto/PinotTableReloadStatusResponse.java
similarity index 70%
rename from
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/ServerReloadControllerJobStatusResponse.java
rename to
pinot-controller/src/main/java/org/apache/pinot/controller/api/dto/PinotTableReloadStatusResponse.java
index 9a0467043d7..8518ffdc57a 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/ServerReloadControllerJobStatusResponse.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/dto/PinotTableReloadStatusResponse.java
@@ -16,24 +16,22 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.controller.api.resources;
+package org.apache.pinot.controller.api.dto;
-import java.util.Map;
-
-public class ServerReloadControllerJobStatusResponse {
+public class PinotTableReloadStatusResponse {
private double _timeElapsedInMinutes;
private double _estimatedTimeRemainingInMinutes;
private int _totalSegmentCount;
private int _successCount;
private int _totalServersQueried;
private int _totalServerCallsFailed;
- private Map<String, String> _metadata;
+ private PinotControllerJobMetadataDto _metadata;
public int getTotalSegmentCount() {
return _totalSegmentCount;
}
- public ServerReloadControllerJobStatusResponse setTotalSegmentCount(int
totalSegmentCount) {
+ public PinotTableReloadStatusResponse setTotalSegmentCount(int
totalSegmentCount) {
_totalSegmentCount = totalSegmentCount;
return this;
}
@@ -42,7 +40,7 @@ public class ServerReloadControllerJobStatusResponse {
return _successCount;
}
- public ServerReloadControllerJobStatusResponse setSuccessCount(int
successCount) {
+ public PinotTableReloadStatusResponse setSuccessCount(int successCount) {
_successCount = successCount;
return this;
}
@@ -51,7 +49,7 @@ public class ServerReloadControllerJobStatusResponse {
return _estimatedTimeRemainingInMinutes;
}
- public ServerReloadControllerJobStatusResponse
setEstimatedTimeRemainingInMinutes(
+ public PinotTableReloadStatusResponse setEstimatedTimeRemainingInMinutes(
double estimatedTimeRemainingInMinutes) {
_estimatedTimeRemainingInMinutes = estimatedTimeRemainingInMinutes;
return this;
@@ -61,7 +59,7 @@ public class ServerReloadControllerJobStatusResponse {
return _timeElapsedInMinutes;
}
- public ServerReloadControllerJobStatusResponse
setTimeElapsedInMinutes(double timeElapsedInMinutes) {
+ public PinotTableReloadStatusResponse setTimeElapsedInMinutes(double
timeElapsedInMinutes) {
_timeElapsedInMinutes = timeElapsedInMinutes;
return this;
}
@@ -71,7 +69,7 @@ public class ServerReloadControllerJobStatusResponse {
return _totalServersQueried;
}
- public ServerReloadControllerJobStatusResponse setTotalServersQueried(int
totalServersQueried) {
+ public PinotTableReloadStatusResponse setTotalServersQueried(int
totalServersQueried) {
_totalServersQueried = totalServersQueried;
return this;
}
@@ -80,16 +78,16 @@ public class ServerReloadControllerJobStatusResponse {
return _totalServerCallsFailed;
}
- public ServerReloadControllerJobStatusResponse setTotalServerCallsFailed(int
totalServerCallsFailed) {
+ public PinotTableReloadStatusResponse setTotalServerCallsFailed(int
totalServerCallsFailed) {
_totalServerCallsFailed = totalServerCallsFailed;
return this;
}
- public Map<String, String> getMetadata() {
+ public PinotControllerJobMetadataDto getMetadata() {
return _metadata;
}
- public ServerReloadControllerJobStatusResponse setMetadata(Map<String,
String> metadata) {
+ public PinotTableReloadStatusResponse
setMetadata(PinotControllerJobMetadataDto metadata) {
_metadata = metadata;
return this;
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/dto/package-info.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/dto/package-info.java
new file mode 100644
index 00000000000..915c785760f
--- /dev/null
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/dto/package-info.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Data Transfer Objects (DTOs) for Pinot REST APIs.
+ *
+ * <p>
+ * DTOs are Plain Old Java Objects (POJOs) that serve as the contract between
Pinot Controller's
+ * REST APIs and external clients. These objects are serialized to/from JSON
when exchanged via
+ * HTTP endpoints.
+ * </p>
+ *
+ * <h2>Guidelines</h2>
+ * <ul>
+ * <li><b>Public API Contract:</b> Classes here are part of Pinot's public
API.
+ * Maintain backward compatibility or follow proper deprecation
practices.</li>
+ * <li><b>Simple POJOs:</b> Keep DTOs as simple data containers without
business logic.</li>
+ * <li><b>Serialization-Friendly:</b> All fields should be
JSON-serializable.</li>
+ * <li><b>Fluent Setters:</b> Use fluent-style setters that return {@code
this}.</li>
+ * <li><b>Documentation:</b> Document each DTO and its fields clearly.</li>
+ * </ul>
+ */
+package org.apache.pinot.controller.api.dto;
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableReloadResource.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableReloadResource.java
index 9cb4d8c2c9c..6e6c62c00c5 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableReloadResource.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableReloadResource.java
@@ -43,6 +43,7 @@ import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MediaType;
import org.apache.pinot.controller.api.access.AccessType;
import org.apache.pinot.controller.api.access.Authenticate;
+import org.apache.pinot.controller.api.dto.PinotTableReloadStatusResponse;
import org.apache.pinot.controller.services.PinotTableReloadService;
import org.apache.pinot.controller.services.PinotTableReloadStatusReporter;
import org.apache.pinot.core.auth.Actions;
@@ -158,7 +159,7 @@ public class PinotTableReloadResource {
@ApiResponse(code = 200, message = "Job status retrieved successfully"),
@ApiResponse(code = 404, message = "Job ID not found")
})
- public ServerReloadControllerJobStatusResponse getReloadJobStatus(
+ public PinotTableReloadStatusResponse getReloadJobStatus(
@ApiParam(value = "Reload job ID returned from reload endpoint",
required = true) @PathParam("jobId")
String reloadJobId) throws Exception {
return _statusReporter.getReloadJobStatus(reloadJobId);
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/services/PinotTableReloadStatusReporter.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/services/PinotTableReloadStatusReporter.java
index ed0950d63f7..d8284e39063 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/services/PinotTableReloadStatusReporter.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/services/PinotTableReloadStatusReporter.java
@@ -36,13 +36,13 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.hc.client5.http.io.HttpClientConnectionManager;
import org.apache.pinot.common.exception.InvalidConfigException;
import org.apache.pinot.common.utils.URIUtils;
+import org.apache.pinot.controller.api.dto.PinotControllerJobMetadataDto;
+import org.apache.pinot.controller.api.dto.PinotTableReloadStatusResponse;
import
org.apache.pinot.controller.api.exception.ControllerApplicationException;
-import
org.apache.pinot.controller.api.resources.ServerReloadControllerJobStatusResponse;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobTypes;
import org.apache.pinot.controller.util.CompletionServiceHelper;
import org.apache.pinot.segment.spi.creator.name.SegmentNameUtils;
-import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.JsonUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -66,7 +66,7 @@ public class PinotTableReloadStatusReporter {
_connectionManager = connectionManager;
}
- private static double
computeEstimatedRemainingTimeInMinutes(ServerReloadControllerJobStatusResponse
finalResponse,
+ private static double
computeEstimatedRemainingTimeInMinutes(PinotTableReloadStatusResponse
finalResponse,
double timeElapsedInMinutes) {
int remainingSegments = finalResponse.getTotalSegmentCount() -
finalResponse.getSuccessCount();
@@ -90,54 +90,61 @@ public class PinotTableReloadStatusReporter {
return totalSegments;
}
- public ServerReloadControllerJobStatusResponse getReloadJobStatus(String
reloadJobId)
- throws InvalidConfigException {
- Map<String, String> controllerJobZKMetadata =
- _pinotHelixResourceManager.getControllerJobZKMetadata(reloadJobId,
ControllerJobTypes.RELOAD_SEGMENT);
- if (controllerJobZKMetadata == null) {
- throw new ControllerApplicationException(LOG, "Failed to find controller
job id: " + reloadJobId,
- Response.Status.NOT_FOUND);
+ private static List<String> getServerUrls(BiMap<String, String>
serverEndPoints,
+ PinotControllerJobMetadataDto reloadJob,
+ Map<String, List<String>> serverToSegments) {
+ List<String> serverUrls = new ArrayList<>();
+ for (Map.Entry<String, String> entry : serverEndPoints.entrySet()) {
+ final String server = entry.getKey();
+ final String endpoint = entry.getValue();
+ serverUrls.add(constructReloadTaskStatusEndpoint(reloadJob,
serverToSegments, endpoint, server));
}
+ return serverUrls;
+ }
- String tableNameWithType =
controllerJobZKMetadata.get(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE);
- String segmentNames =
controllerJobZKMetadata.get(CommonConstants.ControllerJob.SEGMENT_RELOAD_JOB_SEGMENT_NAME);
- String instanceName =
controllerJobZKMetadata.get(CommonConstants.ControllerJob.SEGMENT_RELOAD_JOB_INSTANCE_NAME);
- Map<String, List<String>> serverToSegments =
getServerToSegments(tableNameWithType, segmentNames, instanceName);
-
- BiMap<String, String> serverEndPoints =
-
_pinotHelixResourceManager.getDataInstanceAdminEndpoints(serverToSegments.keySet());
- CompletionServiceHelper completionServiceHelper =
- new CompletionServiceHelper(_executor, _connectionManager,
serverEndPoints);
+ private static String
constructReloadTaskStatusEndpoint(PinotControllerJobMetadataDto reloadJob,
+ Map<String, List<String>> serverToSegments, String endpoint, String
server) {
+ String reloadTaskStatusEndpoint = constructReloadStatusEndpoint(reloadJob,
endpoint);
+ if (reloadJob.getSegmentName() == null) {
+ return reloadTaskStatusEndpoint;
+ }
- List<String> serverUrls = new ArrayList<>();
- for (Map.Entry<String, String> entry : serverEndPoints.entrySet()) {
- String server = entry.getKey();
- String endpoint = entry.getValue();
- String reloadTaskStatusEndpoint =
- endpoint + "/controllerJob/reloadStatus/" + tableNameWithType +
"?reloadJobTimestamp="
- +
controllerJobZKMetadata.get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS);
- if (segmentNames != null) {
- List<String> segmentsForServer = serverToSegments.get(server);
- StringBuilder encodedSegmentsBuilder = new StringBuilder();
- if (!segmentsForServer.isEmpty()) {
- Iterator<String> segmentIterator = segmentsForServer.iterator();
- // Append first segment without a leading separator
-
encodedSegmentsBuilder.append(URIUtils.encode(segmentIterator.next()));
- // Append remaining segments, each prefixed by the separator
- while (segmentIterator.hasNext()) {
-
encodedSegmentsBuilder.append(SegmentNameUtils.SEGMENT_NAME_SEPARATOR)
- .append(URIUtils.encode(segmentIterator.next()));
- }
- }
- reloadTaskStatusEndpoint += "&segmentName=" + encodedSegmentsBuilder;
+ List<String> segmentsForServer = serverToSegments.get(server);
+ StringBuilder encodedSegmentsBuilder = new StringBuilder();
+ if (!segmentsForServer.isEmpty()) {
+ Iterator<String> segmentIterator = segmentsForServer.iterator();
+ // Append first segment without a leading separator
+ encodedSegmentsBuilder.append(URIUtils.encode(segmentIterator.next()));
+ // Append remaining segments, each prefixed by the separator
+ while (segmentIterator.hasNext()) {
+ encodedSegmentsBuilder.append(SegmentNameUtils.SEGMENT_NAME_SEPARATOR)
+ .append(URIUtils.encode(segmentIterator.next()));
}
- serverUrls.add(reloadTaskStatusEndpoint);
}
+ reloadTaskStatusEndpoint += "&segmentName=" + encodedSegmentsBuilder;
+ return reloadTaskStatusEndpoint;
+ }
+
+ private static String
constructReloadStatusEndpoint(PinotControllerJobMetadataDto reloadJob, String
endpoint) {
+ return endpoint + "/controllerJob/reloadStatus/" +
reloadJob.getTableNameWithType() + "?reloadJobTimestamp="
+ + reloadJob.getSubmissionTimeMs();
+ }
- CompletionServiceHelper.CompletionServiceResponse serviceResponse =
+ public PinotTableReloadStatusResponse getReloadJobStatus(String reloadJobId)
+ throws InvalidConfigException {
+ final PinotControllerJobMetadataDto reloadJobMetadata =
getControllerJobMetadataFromZk(reloadJobId);
+ final Map<String, List<String>> serverToSegments =
getServerToSegments(reloadJobMetadata);
+
+ final BiMap<String, String> serverEndPoints =
+
_pinotHelixResourceManager.getDataInstanceAdminEndpoints(serverToSegments.keySet());
+ final List<String> serverUrls = getServerUrls(serverEndPoints,
reloadJobMetadata, serverToSegments);
+
+ final CompletionServiceHelper completionServiceHelper =
+ new CompletionServiceHelper(_executor, _connectionManager,
serverEndPoints);
+ final CompletionServiceHelper.CompletionServiceResponse serviceResponse =
completionServiceHelper.doMultiGetRequest(serverUrls, null, true,
10000);
- ServerReloadControllerJobStatusResponse response = new
ServerReloadControllerJobStatusResponse().setSuccessCount(0)
+ final PinotTableReloadStatusResponse response = new
PinotTableReloadStatusResponse().setSuccessCount(0)
.setTotalSegmentCount(computeTotalSegments(serverToSegments))
.setTotalServersQueried(serverUrls.size())
.setTotalServerCallsFailed(serviceResponse._failedResponseCount);
@@ -145,8 +152,8 @@ public class PinotTableReloadStatusReporter {
for (Map.Entry<String, String> streamResponse :
serviceResponse._httpResponses.entrySet()) {
String responseString = streamResponse.getValue();
try {
- ServerReloadControllerJobStatusResponse r =
- JsonUtils.stringToObject(responseString,
ServerReloadControllerJobStatusResponse.class);
+ PinotTableReloadStatusResponse r =
+ JsonUtils.stringToObject(responseString,
PinotTableReloadStatusResponse.class);
response.setSuccessCount(response.getSuccessCount() +
r.getSuccessCount());
} catch (Exception e) {
response.setTotalServerCallsFailed(response.getTotalServerCallsFailed() + 1);
@@ -154,38 +161,56 @@ public class PinotTableReloadStatusReporter {
}
// Add derived fields
- final long submissionTime =
-
Long.parseLong(controllerJobZKMetadata.get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS));
- final double timeElapsedInMinutes = computeTimeElapsedInMinutes((double)
submissionTime);
+ final double timeElapsedInMinutes =
computeTimeElapsedInMinutes(reloadJobMetadata.getSubmissionTimeMs());
final double estimatedRemainingTimeInMinutes =
computeEstimatedRemainingTimeInMinutes(response, timeElapsedInMinutes);
- return response.setMetadata(controllerJobZKMetadata)
+ return response.setMetadata(reloadJobMetadata)
.setTimeElapsedInMinutes(timeElapsedInMinutes)
.setEstimatedTimeRemainingInMinutes(estimatedRemainingTimeInMinutes);
}
+ private PinotControllerJobMetadataDto getControllerJobMetadataFromZk(String
reloadJobId) {
+ Map<String, String> controllerJobZKMetadata =
+ _pinotHelixResourceManager.getControllerJobZKMetadata(reloadJobId,
ControllerJobTypes.RELOAD_SEGMENT);
+ if (controllerJobZKMetadata == null) {
+ throw new ControllerApplicationException(LOG, "Failed to find controller
job id: " + reloadJobId,
+ Response.Status.NOT_FOUND);
+ }
+ try {
+ return
JsonUtils.jsonNodeToObject(JsonUtils.objectToJsonNode(controllerJobZKMetadata),
+ PinotControllerJobMetadataDto.class);
+ } catch (Exception e) {
+ throw new IllegalArgumentException("Failed to convert metadata to
PinotControllerJobDTO", e);
+ }
+ }
+
+ @VisibleForTesting
+ Map<String, List<String>> getServerToSegments(PinotControllerJobMetadataDto
job) {
+ return getServerToSegments(job.getTableNameWithType(),
job.getSegmentName(), job.getInstanceName());
+ }
+
@VisibleForTesting
- Map<String, List<String>> getServerToSegments(String tableNameWithType,
@Nullable String segmentNames,
+ Map<String, List<String>> getServerToSegments(String tableNameWithType,
@Nullable String segmentNamesString,
@Nullable String instanceName) {
- if (segmentNames == null) {
+ if (segmentNamesString == null) {
// instanceName can be null or not null, and this method below can
handle both cases.
return
_pinotHelixResourceManager.getServerToSegmentsMap(tableNameWithType,
instanceName, true);
}
// Skip servers and segments not involved in the segment reloading job.
- List<String> segmnetNameList = new ArrayList<>();
- Collections.addAll(segmnetNameList, StringUtils.split(segmentNames,
SegmentNameUtils.SEGMENT_NAME_SEPARATOR));
+ List<String> segmentNames = new ArrayList<>();
+ Collections.addAll(segmentNames, StringUtils.split(segmentNamesString,
SegmentNameUtils.SEGMENT_NAME_SEPARATOR));
if (instanceName != null) {
- return Map.of(instanceName, segmnetNameList);
+ return Map.of(instanceName, segmentNames);
}
// If instance is null, then either one or all segments are being reloaded
via current segment reload restful APIs.
// And the if-check at the beginning of this method has handled the case
of reloading all segments. So here we
// expect only one segment name.
- checkState(segmnetNameList.size() == 1, "Only one segment is expected but
got: %s", segmnetNameList);
+ checkState(segmentNames.size() == 1, "Only one segment is expected but
got: %s", segmentNames);
Map<String, List<String>> serverToSegments = new HashMap<>();
- Set<String> servers =
_pinotHelixResourceManager.getServers(tableNameWithType, segmentNames);
+ Set<String> servers =
_pinotHelixResourceManager.getServers(tableNameWithType, segmentNamesString);
for (String server : servers) {
- serverToSegments.put(server, Collections.singletonList(segmentNames));
+ serverToSegments.put(server,
Collections.singletonList(segmentNamesString));
}
return serverToSegments;
}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PartialUpsertTableRebalanceIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PartialUpsertTableRebalanceIntegrationTest.java
index 80ab7359ad9..6a3097c4c1a 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PartialUpsertTableRebalanceIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PartialUpsertTableRebalanceIntegrationTest.java
@@ -36,9 +36,9 @@ import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.common.utils.SimpleHttpResponse;
import org.apache.pinot.common.utils.helix.HelixHelper;
import org.apache.pinot.common.utils.http.HttpClient;
+import org.apache.pinot.controller.api.dto.PinotTableReloadStatusResponse;
import org.apache.pinot.controller.api.resources.PauseStatusDetails;
import
org.apache.pinot.controller.api.resources.ServerRebalanceJobStatusResponse;
-import
org.apache.pinot.controller.api.resources.ServerReloadControllerJobStatusResponse;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig;
import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
@@ -450,8 +450,8 @@ public class PartialUpsertTableRebalanceIntegrationTest
extends BaseClusterInteg
try {
SimpleHttpResponse httpResponse =
HttpClient.wrapAndThrowHttpException(_httpClient.sendGetRequest(new
URL(requestUrl).toURI(), null));
- ServerReloadControllerJobStatusResponse segmentReloadStatusValue =
- JsonUtils.stringToObject(httpResponse.getResponse(),
ServerReloadControllerJobStatusResponse.class);
+ PinotTableReloadStatusResponse segmentReloadStatusValue =
+ JsonUtils.stringToObject(httpResponse.getResponse(),
PinotTableReloadStatusResponse.class);
return segmentReloadStatusValue.getSuccessCount() ==
segmentReloadStatusValue.getTotalSegmentCount();
} catch (HttpErrorStatusException | URISyntaxException e) {
throw new IOException(e);
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TableRebalanceIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TableRebalanceIntegrationTest.java
index 21658ec1277..42c8ef7b73b 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TableRebalanceIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TableRebalanceIntegrationTest.java
@@ -37,7 +37,7 @@ import org.apache.pinot.common.utils.regex.JavaUtilPattern;
import org.apache.pinot.common.utils.regex.Matcher;
import org.apache.pinot.common.utils.regex.Pattern;
import org.apache.pinot.controller.ControllerConf;
-import
org.apache.pinot.controller.api.resources.ServerReloadControllerJobStatusResponse;
+import org.apache.pinot.controller.api.dto.PinotTableReloadStatusResponse;
import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobTypes;
import
org.apache.pinot.controller.helix.core.rebalance.DefaultRebalancePreChecker;
import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig;
@@ -1495,8 +1495,8 @@ public class TableRebalanceIntegrationTest extends
BaseHybridClusterIntegrationT
String requestUrl =
getControllerRequestURLBuilder().forSegmentReloadStatus(reloadJobId);
SimpleHttpResponse httpResponse =
HttpClient.wrapAndThrowHttpException(getHttpClient().sendGetRequest(new
URL(requestUrl).toURI(), null));
- ServerReloadControllerJobStatusResponse reloadResult =
- JsonUtils.stringToObject(httpResponse.getResponse(),
ServerReloadControllerJobStatusResponse.class);
+ PinotTableReloadStatusResponse reloadResult =
+ JsonUtils.stringToObject(httpResponse.getResponse(),
PinotTableReloadStatusResponse.class);
return reloadResult.getEstimatedTimeRemainingInMinutes() == 0.0;
} catch (Exception e) {
return null;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]