This is an automated email from the ASF dual-hosted git repository.
somandal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 9c2948b91df [controller] Refactor. Moved reload job status logic into
a dedicated PinotTableReloadStatusReporter class (#17036)
9c2948b91df is described below
commit 9c2948b91dfec02cb31fb8f52debdcceca588b8d
Author: Suvodeep Pyne <[email protected]>
AuthorDate: Fri Oct 17 12:37:38 2025 -0700
[controller] Refactor. Moved reload job status logic into a dedicated
PinotTableReloadStatusReporter class (#17036)
* [controller] Refactor reload job status logic into a dedicated
PinotTableReloadStatusReporter class
* [controller] Refactor ServerReloadControllerJobStatusResponse setters to
enable method chaining and simplify reload job status logic
* [controller] Chain setters in PinotTableReloadStatusReporter for
simplification and clean-up redundant code
* [controller] Bind PinotTableReloadStatusReporter as a Singleton in
BaseControllerStarter
---
.../pinot/controller/BaseControllerStarter.java | 2 +
.../api/resources/PinotTableReloadResource.java | 20 ++-
.../ServerReloadControllerJobStatusResponse.java | 25 ++-
.../services/PinotTableReloadService.java | 131 +-------------
.../services/PinotTableReloadStatusReporter.java | 192 +++++++++++++++++++++
...ava => PinotTableReloadStatusReporterTest.java} | 18 +-
6 files changed, 233 insertions(+), 155 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
index 7e0198f89bf..af59efac1a0 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
@@ -117,6 +117,7 @@ import
org.apache.pinot.controller.helix.core.statemodel.LeadControllerResourceM
import org.apache.pinot.controller.helix.core.util.HelixSetupUtils;
import org.apache.pinot.controller.helix.starter.HelixConfig;
import org.apache.pinot.controller.services.PinotTableReloadService;
+import org.apache.pinot.controller.services.PinotTableReloadStatusReporter;
import org.apache.pinot.controller.tuner.TableConfigTunerRegistry;
import org.apache.pinot.controller.util.BrokerServiceHelper;
import org.apache.pinot.controller.util.TableSizeReader;
@@ -683,6 +684,7 @@ public abstract class BaseControllerStarter implements
ServiceStartable {
bind(controllerStartTime).named(ControllerAdminApiApplication.START_TIME);
bindAsContract(PinotTableReloadService.class).in(Singleton.class);
+
bindAsContract(PinotTableReloadStatusReporter.class).in(Singleton.class);
String loggerRootDir =
_config.getProperty(CommonConstants.Controller.CONFIG_OF_LOGGER_ROOT_DIR);
if (loggerRootDir != null) {
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableReloadResource.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableReloadResource.java
index 0d9fa86d8b2..9cb4d8c2c9c 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableReloadResource.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableReloadResource.java
@@ -44,6 +44,7 @@ import javax.ws.rs.core.MediaType;
import org.apache.pinot.controller.api.access.AccessType;
import org.apache.pinot.controller.api.access.Authenticate;
import org.apache.pinot.controller.services.PinotTableReloadService;
+import org.apache.pinot.controller.services.PinotTableReloadStatusReporter;
import org.apache.pinot.core.auth.Actions;
import org.apache.pinot.core.auth.Authorize;
import org.apache.pinot.core.auth.TargetType;
@@ -87,11 +88,14 @@ import static
org.apache.pinot.spi.utils.CommonConstants.SWAGGER_AUTHORIZATION_K
public class PinotTableReloadResource {
private static final Logger LOG =
LoggerFactory.getLogger(PinotTableReloadResource.class);
- private final PinotTableReloadService _pinotTableReloadService;
+ private final PinotTableReloadService _service;
+ private final PinotTableReloadStatusReporter _statusReporter;
@Inject
- public PinotTableReloadResource(PinotTableReloadService
pinotTableReloadService) {
- _pinotTableReloadService = pinotTableReloadService;
+ public PinotTableReloadResource(PinotTableReloadService service,
+ PinotTableReloadStatusReporter statusReporter) {
+ _service = service;
+ _statusReporter = statusReporter;
}
@POST
@@ -114,7 +118,7 @@ public class PinotTableReloadResource {
@QueryParam("forceDownload") @DefaultValue("false") boolean
forceDownload,
@ApiParam(value = "Target specific server instance")
@QueryParam("targetInstance") @Nullable
String targetInstance, @Context HttpHeaders headers) {
- return _pinotTableReloadService.reloadSegment(tableName, segmentName,
forceDownload, targetInstance, headers);
+ return _service.reloadSegment(tableName, segmentName, forceDownload,
targetInstance, headers);
}
@POST
@@ -140,7 +144,7 @@ public class PinotTableReloadResource {
@ApiParam(value = "JSON map of instance to segment lists (overrides
targetInstance)")
@QueryParam("instanceToSegmentsMap") @Nullable String
instanceToSegmentsMapInJson, @Context HttpHeaders headers)
throws IOException {
- return _pinotTableReloadService.reloadAllSegments(tableName, tableTypeStr,
forceDownload, targetInstance,
+ return _service.reloadAllSegments(tableName, tableTypeStr, forceDownload,
targetInstance,
instanceToSegmentsMapInJson, headers);
}
@@ -157,7 +161,7 @@ public class PinotTableReloadResource {
public ServerReloadControllerJobStatusResponse getReloadJobStatus(
@ApiParam(value = "Reload job ID returned from reload endpoint",
required = true) @PathParam("jobId")
String reloadJobId) throws Exception {
- return _pinotTableReloadService.getReloadJobStatus(reloadJobId);
+ return _statusReporter.getReloadJobStatus(reloadJobId);
}
@GET
@@ -170,11 +174,11 @@ public class PinotTableReloadResource {
@ApiResponse(code = 200, message = "Reload check completed
successfully"),
@ApiResponse(code = 400, message = "Invalid table configuration")
})
- public String getTableReloadMetadata(
+ public String needReload(
@ApiParam(value = "Table name with type suffix", required = true,
example = "myTable_REALTIME")
@PathParam("tableNameWithType") String tableNameWithType,
@ApiParam(value = "Include detailed server responses", defaultValue =
"false") @QueryParam("verbose")
@DefaultValue("false") boolean verbose, @Context HttpHeaders headers) {
- return _pinotTableReloadService.getTableReloadMetadata(tableNameWithType,
verbose, headers);
+ return _service.needReload(tableNameWithType, verbose, headers);
}
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/ServerReloadControllerJobStatusResponse.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/ServerReloadControllerJobStatusResponse.java
index 2b2c259857a..9a0467043d7 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/ServerReloadControllerJobStatusResponse.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/ServerReloadControllerJobStatusResponse.java
@@ -33,55 +33,64 @@ public class ServerReloadControllerJobStatusResponse {
return _totalSegmentCount;
}
- public void setTotalSegmentCount(int totalSegmentCount) {
+ public ServerReloadControllerJobStatusResponse setTotalSegmentCount(int
totalSegmentCount) {
_totalSegmentCount = totalSegmentCount;
+ return this;
}
public int getSuccessCount() {
return _successCount;
}
- public void setSuccessCount(int successCount) {
+ public ServerReloadControllerJobStatusResponse setSuccessCount(int
successCount) {
_successCount = successCount;
+ return this;
}
public double getEstimatedTimeRemainingInMinutes() {
return _estimatedTimeRemainingInMinutes;
}
- public void setEstimatedTimeRemainingInMinutes(double
estimatedTimeRemainingInMillis) {
- _estimatedTimeRemainingInMinutes = estimatedTimeRemainingInMillis;
+ public ServerReloadControllerJobStatusResponse
setEstimatedTimeRemainingInMinutes(
+ double estimatedTimeRemainingInMinutes) {
+ _estimatedTimeRemainingInMinutes = estimatedTimeRemainingInMinutes;
+ return this;
}
public double getTimeElapsedInMinutes() {
return _timeElapsedInMinutes;
}
- public void setTimeElapsedInMinutes(double timeElapsedInMinutes) {
+ public ServerReloadControllerJobStatusResponse
setTimeElapsedInMinutes(double timeElapsedInMinutes) {
_timeElapsedInMinutes = timeElapsedInMinutes;
+ return this;
}
+
public int getTotalServersQueried() {
return _totalServersQueried;
}
- public void setTotalServersQueried(int totalServersQueried) {
+ public ServerReloadControllerJobStatusResponse setTotalServersQueried(int
totalServersQueried) {
_totalServersQueried = totalServersQueried;
+ return this;
}
public int getTotalServerCallsFailed() {
return _totalServerCallsFailed;
}
- public void setTotalServerCallsFailed(int totalServerCallsFailed) {
+ public ServerReloadControllerJobStatusResponse setTotalServerCallsFailed(int
totalServerCallsFailed) {
_totalServerCallsFailed = totalServerCallsFailed;
+ return this;
}
public Map<String, String> getMetadata() {
return _metadata;
}
- public void setMetadata(Map<String, String> metadata) {
+ public ServerReloadControllerJobStatusResponse setMetadata(Map<String,
String> metadata) {
_metadata = metadata;
+ return this;
}
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/services/PinotTableReloadService.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/services/PinotTableReloadService.java
index 5037f32c959..8c257076eae 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/services/PinotTableReloadService.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/services/PinotTableReloadService.java
@@ -20,20 +20,12 @@ package org.apache.pinot.controller.services;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.BiMap;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.Executor;
-import javax.annotation.Nullable;
import javax.inject.Inject;
import javax.inject.Singleton;
import javax.ws.rs.core.HttpHeaders;
@@ -52,15 +44,11 @@ import org.apache.pinot.controller.ControllerConf;
import
org.apache.pinot.controller.api.exception.ControllerApplicationException;
import org.apache.pinot.controller.api.resources.Constants;
import org.apache.pinot.controller.api.resources.ResourceUtils;
-import
org.apache.pinot.controller.api.resources.ServerReloadControllerJobStatusResponse;
import org.apache.pinot.controller.api.resources.SuccessResponse;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
-import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobTypes;
-import org.apache.pinot.controller.util.CompletionServiceHelper;
import org.apache.pinot.controller.util.TableMetadataReader;
import org.apache.pinot.segment.spi.creator.name.SegmentNameUtils;
import org.apache.pinot.spi.config.table.TableType;
-import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.slf4j.Logger;
@@ -181,101 +169,8 @@ public class PinotTableReloadService {
return new SuccessResponse(JsonUtils.objectToString(perTableMsgData));
}
- public ServerReloadControllerJobStatusResponse getReloadJobStatus(String
reloadJobId)
- throws InvalidConfigException {
- Map<String, String> controllerJobZKMetadata =
- _pinotHelixResourceManager.getControllerJobZKMetadata(reloadJobId,
ControllerJobTypes.RELOAD_SEGMENT);
- if (controllerJobZKMetadata == null) {
- throw new ControllerApplicationException(LOG, "Failed to find controller
job id: " + reloadJobId,
- Response.Status.NOT_FOUND);
- }
-
- String tableNameWithType =
controllerJobZKMetadata.get(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE);
- String segmentNames =
controllerJobZKMetadata.get(CommonConstants.ControllerJob.SEGMENT_RELOAD_JOB_SEGMENT_NAME);
- String instanceName =
controllerJobZKMetadata.get(CommonConstants.ControllerJob.SEGMENT_RELOAD_JOB_INSTANCE_NAME);
- Map<String, List<String>> serverToSegments =
getServerToSegments(tableNameWithType, segmentNames, instanceName);
-
- BiMap<String, String> serverEndPoints =
-
_pinotHelixResourceManager.getDataInstanceAdminEndpoints(serverToSegments.keySet());
- CompletionServiceHelper completionServiceHelper =
- new CompletionServiceHelper(_executor, _connectionManager,
serverEndPoints);
-
- List<String> serverUrls = new ArrayList<>();
- for (Map.Entry<String, String> entry : serverEndPoints.entrySet()) {
- String server = entry.getKey();
- String endpoint = entry.getValue();
- String reloadTaskStatusEndpoint =
- endpoint + "/controllerJob/reloadStatus/" + tableNameWithType +
"?reloadJobTimestamp="
- +
controllerJobZKMetadata.get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS);
- if (segmentNames != null) {
- List<String> segmentsForServer = serverToSegments.get(server);
- StringBuilder encodedSegmentsBuilder = new StringBuilder();
- if (!segmentsForServer.isEmpty()) {
- Iterator<String> segmentIterator = segmentsForServer.iterator();
- // Append first segment without a leading separator
-
encodedSegmentsBuilder.append(URIUtils.encode(segmentIterator.next()));
- // Append remaining segments, each prefixed by the separator
- while (segmentIterator.hasNext()) {
-
encodedSegmentsBuilder.append(SegmentNameUtils.SEGMENT_NAME_SEPARATOR)
- .append(URIUtils.encode(segmentIterator.next()));
- }
- }
- reloadTaskStatusEndpoint += "&segmentName=" + encodedSegmentsBuilder;
- }
- serverUrls.add(reloadTaskStatusEndpoint);
- }
-
- CompletionServiceHelper.CompletionServiceResponse serviceResponse =
- completionServiceHelper.doMultiGetRequest(serverUrls, null, true,
10000);
-
- ServerReloadControllerJobStatusResponse
serverReloadControllerJobStatusResponse =
- new ServerReloadControllerJobStatusResponse();
- serverReloadControllerJobStatusResponse.setSuccessCount(0);
-
- int totalSegments = 0;
- for (Map.Entry<String, List<String>> entry : serverToSegments.entrySet()) {
- totalSegments += entry.getValue().size();
- }
-
serverReloadControllerJobStatusResponse.setTotalSegmentCount(totalSegments);
-
serverReloadControllerJobStatusResponse.setTotalServersQueried(serverUrls.size());
-
serverReloadControllerJobStatusResponse.setTotalServerCallsFailed(serviceResponse._failedResponseCount);
-
- for (Map.Entry<String, String> streamResponse :
serviceResponse._httpResponses.entrySet()) {
- String responseString = streamResponse.getValue();
- try {
- ServerReloadControllerJobStatusResponse response =
- JsonUtils.stringToObject(responseString,
ServerReloadControllerJobStatusResponse.class);
- serverReloadControllerJobStatusResponse.setSuccessCount(
- serverReloadControllerJobStatusResponse.getSuccessCount() +
response.getSuccessCount());
- } catch (Exception e) {
- serverReloadControllerJobStatusResponse.setTotalServerCallsFailed(
-
serverReloadControllerJobStatusResponse.getTotalServerCallsFailed() + 1);
- }
- }
-
- // Add ZK fields
-
serverReloadControllerJobStatusResponse.setMetadata(controllerJobZKMetadata);
-
- // Add derived fields
- long submissionTime =
Long.parseLong(controllerJobZKMetadata.get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS));
- double timeElapsedInMinutes = ((double) System.currentTimeMillis() -
(double) submissionTime) / (1000.0 * 60.0);
- int remainingSegments =
serverReloadControllerJobStatusResponse.getTotalSegmentCount()
- - serverReloadControllerJobStatusResponse.getSuccessCount();
-
- double estimatedRemainingTimeInMinutes = -1;
- if (serverReloadControllerJobStatusResponse.getSuccessCount() > 0) {
- estimatedRemainingTimeInMinutes =
- ((double) remainingSegments / (double)
serverReloadControllerJobStatusResponse.getSuccessCount())
- * timeElapsedInMinutes;
- }
-
serverReloadControllerJobStatusResponse.setTimeElapsedInMinutes(timeElapsedInMinutes);
-
serverReloadControllerJobStatusResponse.setEstimatedTimeRemainingInMinutes(estimatedRemainingTimeInMinutes);
-
- return serverReloadControllerJobStatusResponse;
- }
-
- public String getTableReloadMetadata(String tableNameWithType, boolean
verbose, HttpHeaders headers) {
+ public String needReload(String tableNameWithType, boolean verbose,
HttpHeaders headers) {
tableNameWithType = DatabaseUtils.translateTableName(tableNameWithType,
headers);
LOG.info("Received a request to check reload for all servers hosting
segments for table {}", tableNameWithType);
try {
@@ -307,30 +202,6 @@ public class PinotTableReloadService {
}
}
- @VisibleForTesting
- Map<String, List<String>> getServerToSegments(String tableNameWithType,
@Nullable String segmentNames,
- @Nullable String instanceName) {
- if (segmentNames == null) {
- // instanceName can be null or not null, and this method below can
handle both cases.
- return
_pinotHelixResourceManager.getServerToSegmentsMap(tableNameWithType,
instanceName, true);
- }
- // Skip servers and segments not involved in the segment reloading job.
- List<String> segmnetNameList = new ArrayList<>();
- Collections.addAll(segmnetNameList, StringUtils.split(segmentNames,
SegmentNameUtils.SEGMENT_NAME_SEPARATOR));
- if (instanceName != null) {
- return Map.of(instanceName, segmnetNameList);
- }
- // If instance is null, then either one or all segments are being reloaded
via current segment reload restful APIs.
- // And the if-check at the beginning of this method has handled the case
of reloading all segments. So here we
- // expect only one segment name.
- Preconditions.checkState(segmnetNameList.size() == 1, "Only one segment is
expected but got: %s", segmnetNameList);
- Map<String, List<String>> serverToSegments = new HashMap<>();
- Set<String> servers =
_pinotHelixResourceManager.getServers(tableNameWithType, segmentNames);
- for (String server : servers) {
- serverToSegments.put(server, Collections.singletonList(segmentNames));
- }
- return serverToSegments;
- }
/**
* Helper method to find the existing table based on the given table name
(with or without type suffix) and segment
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/services/PinotTableReloadStatusReporter.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/services/PinotTableReloadStatusReporter.java
new file mode 100644
index 00000000000..ed0950d63f7
--- /dev/null
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/services/PinotTableReloadStatusReporter.java
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.services;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.BiMap;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executor;
+import javax.annotation.Nullable;
+import javax.inject.Inject;
+import javax.inject.Singleton;
+import javax.ws.rs.core.Response;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hc.client5.http.io.HttpClientConnectionManager;
+import org.apache.pinot.common.exception.InvalidConfigException;
+import org.apache.pinot.common.utils.URIUtils;
+import
org.apache.pinot.controller.api.exception.ControllerApplicationException;
+import
org.apache.pinot.controller.api.resources.ServerReloadControllerJobStatusResponse;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobTypes;
+import org.apache.pinot.controller.util.CompletionServiceHelper;
+import org.apache.pinot.segment.spi.creator.name.SegmentNameUtils;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static com.google.common.base.Preconditions.checkState;
+
+
+@Singleton
+public class PinotTableReloadStatusReporter {
+ private static final Logger LOG =
LoggerFactory.getLogger(PinotTableReloadStatusReporter.class);
+
+ private final PinotHelixResourceManager _pinotHelixResourceManager;
+ private final Executor _executor;
+ private final HttpClientConnectionManager _connectionManager;
+
+ @Inject
+ public PinotTableReloadStatusReporter(PinotHelixResourceManager
pinotHelixResourceManager, Executor executor,
+ HttpClientConnectionManager connectionManager) {
+ _pinotHelixResourceManager = pinotHelixResourceManager;
+ _executor = executor;
+ _connectionManager = connectionManager;
+ }
+
+ private static double
computeEstimatedRemainingTimeInMinutes(ServerReloadControllerJobStatusResponse
finalResponse,
+ double timeElapsedInMinutes) {
+ int remainingSegments = finalResponse.getTotalSegmentCount() -
finalResponse.getSuccessCount();
+
+ double estimatedRemainingTimeInMinutes = -1;
+ if (finalResponse.getSuccessCount() > 0) {
+ estimatedRemainingTimeInMinutes =
+ ((double) remainingSegments / (double)
finalResponse.getSuccessCount()) * timeElapsedInMinutes;
+ }
+ return estimatedRemainingTimeInMinutes;
+ }
+
+ private static double computeTimeElapsedInMinutes(double submissionTime) {
+ return ((double) System.currentTimeMillis() - submissionTime) / (1000.0 *
60.0);
+ }
+
+ private static int computeTotalSegments(Map<String, List<String>>
serverToSegments) {
+ int totalSegments = 0;
+ for (Map.Entry<String, List<String>> entry : serverToSegments.entrySet()) {
+ totalSegments += entry.getValue().size();
+ }
+ return totalSegments;
+ }
+
+ public ServerReloadControllerJobStatusResponse getReloadJobStatus(String
reloadJobId)
+ throws InvalidConfigException {
+ Map<String, String> controllerJobZKMetadata =
+ _pinotHelixResourceManager.getControllerJobZKMetadata(reloadJobId,
ControllerJobTypes.RELOAD_SEGMENT);
+ if (controllerJobZKMetadata == null) {
+ throw new ControllerApplicationException(LOG, "Failed to find controller
job id: " + reloadJobId,
+ Response.Status.NOT_FOUND);
+ }
+
+ String tableNameWithType =
controllerJobZKMetadata.get(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE);
+ String segmentNames =
controllerJobZKMetadata.get(CommonConstants.ControllerJob.SEGMENT_RELOAD_JOB_SEGMENT_NAME);
+ String instanceName =
controllerJobZKMetadata.get(CommonConstants.ControllerJob.SEGMENT_RELOAD_JOB_INSTANCE_NAME);
+ Map<String, List<String>> serverToSegments =
getServerToSegments(tableNameWithType, segmentNames, instanceName);
+
+ BiMap<String, String> serverEndPoints =
+
_pinotHelixResourceManager.getDataInstanceAdminEndpoints(serverToSegments.keySet());
+ CompletionServiceHelper completionServiceHelper =
+ new CompletionServiceHelper(_executor, _connectionManager,
serverEndPoints);
+
+ List<String> serverUrls = new ArrayList<>();
+ for (Map.Entry<String, String> entry : serverEndPoints.entrySet()) {
+ String server = entry.getKey();
+ String endpoint = entry.getValue();
+ String reloadTaskStatusEndpoint =
+ endpoint + "/controllerJob/reloadStatus/" + tableNameWithType +
"?reloadJobTimestamp="
+ +
controllerJobZKMetadata.get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS);
+ if (segmentNames != null) {
+ List<String> segmentsForServer = serverToSegments.get(server);
+ StringBuilder encodedSegmentsBuilder = new StringBuilder();
+ if (!segmentsForServer.isEmpty()) {
+ Iterator<String> segmentIterator = segmentsForServer.iterator();
+ // Append first segment without a leading separator
+
encodedSegmentsBuilder.append(URIUtils.encode(segmentIterator.next()));
+ // Append remaining segments, each prefixed by the separator
+ while (segmentIterator.hasNext()) {
+
encodedSegmentsBuilder.append(SegmentNameUtils.SEGMENT_NAME_SEPARATOR)
+ .append(URIUtils.encode(segmentIterator.next()));
+ }
+ }
+ reloadTaskStatusEndpoint += "&segmentName=" + encodedSegmentsBuilder;
+ }
+ serverUrls.add(reloadTaskStatusEndpoint);
+ }
+
+ CompletionServiceHelper.CompletionServiceResponse serviceResponse =
+ completionServiceHelper.doMultiGetRequest(serverUrls, null, true,
10000);
+
+ ServerReloadControllerJobStatusResponse response = new
ServerReloadControllerJobStatusResponse().setSuccessCount(0)
+ .setTotalSegmentCount(computeTotalSegments(serverToSegments))
+ .setTotalServersQueried(serverUrls.size())
+ .setTotalServerCallsFailed(serviceResponse._failedResponseCount);
+
+ for (Map.Entry<String, String> streamResponse :
serviceResponse._httpResponses.entrySet()) {
+ String responseString = streamResponse.getValue();
+ try {
+ ServerReloadControllerJobStatusResponse r =
+ JsonUtils.stringToObject(responseString,
ServerReloadControllerJobStatusResponse.class);
+ response.setSuccessCount(response.getSuccessCount() +
r.getSuccessCount());
+ } catch (Exception e) {
+
response.setTotalServerCallsFailed(response.getTotalServerCallsFailed() + 1);
+ }
+ }
+
+ // Add derived fields
+ final long submissionTime =
+
Long.parseLong(controllerJobZKMetadata.get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS));
+ final double timeElapsedInMinutes = computeTimeElapsedInMinutes((double)
submissionTime);
+ final double estimatedRemainingTimeInMinutes =
+ computeEstimatedRemainingTimeInMinutes(response, timeElapsedInMinutes);
+
+ return response.setMetadata(controllerJobZKMetadata)
+ .setTimeElapsedInMinutes(timeElapsedInMinutes)
+ .setEstimatedTimeRemainingInMinutes(estimatedRemainingTimeInMinutes);
+ }
+
+ @VisibleForTesting
+ Map<String, List<String>> getServerToSegments(String tableNameWithType,
@Nullable String segmentNames,
+ @Nullable String instanceName) {
+ if (segmentNames == null) {
+ // instanceName can be null or not null, and this method below can
handle both cases.
+ return
_pinotHelixResourceManager.getServerToSegmentsMap(tableNameWithType,
instanceName, true);
+ }
+ // Skip servers and segments not involved in the segment reloading job.
+ List<String> segmnetNameList = new ArrayList<>();
+ Collections.addAll(segmnetNameList, StringUtils.split(segmentNames,
SegmentNameUtils.SEGMENT_NAME_SEPARATOR));
+ if (instanceName != null) {
+ return Map.of(instanceName, segmnetNameList);
+ }
+ // If instance is null, then either one or all segments are being reloaded
via current segment reload restful APIs.
+ // And the if-check at the beginning of this method has handled the case
of reloading all segments. So here we
+ // expect only one segment name.
+ checkState(segmnetNameList.size() == 1, "Only one segment is expected but
got: %s", segmnetNameList);
+ Map<String, List<String>> serverToSegments = new HashMap<>();
+ Set<String> servers =
_pinotHelixResourceManager.getServers(tableNameWithType, segmentNames);
+ for (String server : servers) {
+ serverToSegments.put(server, Collections.singletonList(segmentNames));
+ }
+ return serverToSegments;
+ }
+}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/services/PinotTableReloadServiceTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/services/PinotTableReloadStatusReporterTest.java
similarity index 81%
rename from
pinot-controller/src/test/java/org/apache/pinot/controller/services/PinotTableReloadServiceTest.java
rename to
pinot-controller/src/test/java/org/apache/pinot/controller/services/PinotTableReloadStatusReporterTest.java
index a18f8c0ccf5..8fa3b512fa3 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/services/PinotTableReloadServiceTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/services/PinotTableReloadStatusReporterTest.java
@@ -35,12 +35,12 @@ import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
-public class PinotTableReloadServiceTest {
+public class PinotTableReloadStatusReporterTest {
@Mock
PinotHelixResourceManager _pinotHelixResourceManager;
@InjectMocks
- PinotTableReloadService _service;
+ PinotTableReloadStatusReporter _instance;
@BeforeMethod
public void setup() {
@@ -60,26 +60,26 @@ public class PinotTableReloadServiceTest {
when(_pinotHelixResourceManager.getServers(tableName,
"seg01")).thenReturn(Set.of("svr01", "svr03"));
// Get all servers and all their segments.
- Map<String, List<String>> serverToSegmentsMap =
_service.getServerToSegments(tableName, null, null);
+ Map<String, List<String>> serverToSegmentsMap =
_instance.getServerToSegments(tableName, null, null);
assertEquals(serverToSegmentsMap, fullServerToSegmentsMap);
// Get all segments on svr02.
- serverToSegmentsMap = _service.getServerToSegments(tableName, null,
"svr02");
+ serverToSegmentsMap = _instance.getServerToSegments(tableName, null,
"svr02");
assertEquals(serverToSegmentsMap, Map.of("svr02", List.of("seg02",
"seg03")));
// Get all servers with seg01.
- serverToSegmentsMap = _service.getServerToSegments(tableName, "seg01",
null);
+ serverToSegmentsMap = _instance.getServerToSegments(tableName, "seg01",
null);
assertEquals(serverToSegmentsMap, Map.of("svr01", List.of("seg01"),
"svr03", List.of("seg01")));
// Simply map the provided server to the provided segments.
- serverToSegmentsMap = _service.getServerToSegments(tableName, "seg01",
"svr01");
+ serverToSegmentsMap = _instance.getServerToSegments(tableName, "seg01",
"svr01");
assertEquals(serverToSegmentsMap, Map.of("svr01", List.of("seg01")));
- serverToSegmentsMap = _service.getServerToSegments(tableName,
"anySegment", "anyServer");
+ serverToSegmentsMap = _instance.getServerToSegments(tableName,
"anySegment", "anyServer");
assertEquals(serverToSegmentsMap, Map.of("anyServer",
List.of("anySegment")));
- serverToSegmentsMap = _service.getServerToSegments(tableName,
"seg01|seg02", "svr02");
+ serverToSegmentsMap = _instance.getServerToSegments(tableName,
"seg01|seg02", "svr02");
assertEquals(serverToSegmentsMap, Map.of("svr02", List.of("seg01",
"seg02")));
try {
- _service.getServerToSegments(tableName, "seg01,seg02", null);
+ _instance.getServerToSegments(tableName, "seg01,seg02", null);
} catch (Exception e) {
assertTrue(e.getMessage().contains("Only one segment is expected but
got: [seg01, seg02]"));
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]