This is an automated email from the ASF dual-hosted git repository.
somandal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new d3e9259420a [controller] Refactor core reload logic out from the
resource class to a dedicated service class (#17023)
d3e9259420a is described below
commit d3e9259420a7068352646dd35143fdd107f73a8a
Author: Suvodeep Pyne <[email protected]>
AuthorDate: Fri Oct 17 09:42:11 2025 -0700
[controller] Refactor core reload logic out from the resource class to a
dedicated service class (#17023)
* [controller] Refactor core reload logic out from the resource class to a
dedicated service class
* [controller] Add ASF license headers to PinotTableReloadService and its
test class
---
.../pinot/controller/BaseControllerStarter.java | 4 +
.../api/resources/PinotTableReloadResource.java | 346 +--------------------
.../PinotTableReloadService.java} | 305 ++++++------------
.../PinotTableReloadServiceTest.java} | 20 +-
4 files changed, 121 insertions(+), 554 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
index f6717d92ea8..7e0198f89bf 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
@@ -37,6 +37,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
+import javax.inject.Singleton;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager;
@@ -115,6 +116,7 @@ import
org.apache.pinot.controller.helix.core.retention.RetentionManager;
import
org.apache.pinot.controller.helix.core.statemodel.LeadControllerResourceMasterSlaveStateModelFactory;
import org.apache.pinot.controller.helix.core.util.HelixSetupUtils;
import org.apache.pinot.controller.helix.starter.HelixConfig;
+import org.apache.pinot.controller.services.PinotTableReloadService;
import org.apache.pinot.controller.tuner.TableConfigTunerRegistry;
import org.apache.pinot.controller.util.BrokerServiceHelper;
import org.apache.pinot.controller.util.TableSizeReader;
@@ -680,6 +682,8 @@ public abstract class BaseControllerStarter implements
ServiceStartable {
bind(_resourceUtilizationManager).to(ResourceUtilizationManager.class);
bind(controllerStartTime).named(ControllerAdminApiApplication.START_TIME);
+ bindAsContract(PinotTableReloadService.class).in(Singleton.class);
+
String loggerRootDir =
_config.getProperty(CommonConstants.Controller.CONFIG_OF_LOGGER_ROOT_DIR);
if (loggerRootDir != null) {
bind(new LocalLogFileServer(loggerRootDir)).to(LogFileServer.class);
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableReloadResource.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableReloadResource.java
index a72ec188eaf..0d9fa86d8b2 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableReloadResource.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableReloadResource.java
@@ -18,11 +18,6 @@
*/
package org.apache.pinot.controller.api.resources;
-import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.BiMap;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiKeyAuthDefinition;
import io.swagger.annotations.ApiOperation;
@@ -33,15 +28,6 @@ import io.swagger.annotations.Authorization;
import io.swagger.annotations.SecurityDefinition;
import io.swagger.annotations.SwaggerDefinition;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Executor;
import javax.annotation.Nullable;
import javax.inject.Inject;
import javax.ws.rs.DefaultValue;
@@ -55,33 +41,12 @@ import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.hc.client5.http.io.HttpClientConnectionManager;
-import org.apache.pinot.common.exception.InvalidConfigException;
-import
org.apache.pinot.common.restlet.resources.ServerSegmentsReloadCheckResponse;
-import
org.apache.pinot.common.restlet.resources.TableSegmentsReloadCheckResponse;
-import org.apache.pinot.common.utils.DatabaseUtils;
-import org.apache.pinot.common.utils.LLCSegmentName;
-import org.apache.pinot.common.utils.URIUtils;
-import org.apache.pinot.common.utils.UploadedRealtimeSegmentName;
-import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.api.access.AccessType;
import org.apache.pinot.controller.api.access.Authenticate;
-import
org.apache.pinot.controller.api.exception.ControllerApplicationException;
-import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
-import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobTypes;
-import org.apache.pinot.controller.util.CompletionServiceHelper;
-import org.apache.pinot.controller.util.TableMetadataReader;
+import org.apache.pinot.controller.services.PinotTableReloadService;
import org.apache.pinot.core.auth.Actions;
import org.apache.pinot.core.auth.Authorize;
import org.apache.pinot.core.auth.TargetType;
-import org.apache.pinot.segment.spi.creator.name.SegmentNameUtils;
-import org.apache.pinot.spi.config.table.TableType;
-import org.apache.pinot.spi.utils.CommonConstants;
-import org.apache.pinot.spi.utils.JsonUtils;
-import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -120,19 +85,14 @@ import static
org.apache.pinot.spi.utils.CommonConstants.SWAGGER_AUTHORIZATION_K
}))
@Path("/")
public class PinotTableReloadResource {
- private static final Logger LOGGER =
LoggerFactory.getLogger(PinotTableReloadResource.class);
+ private static final Logger LOG =
LoggerFactory.getLogger(PinotTableReloadResource.class);
- @Inject
- ControllerConf _controllerConf;
-
- @Inject
- PinotHelixResourceManager _pinotHelixResourceManager;
+ private final PinotTableReloadService _pinotTableReloadService;
@Inject
- Executor _executor;
-
- @Inject
- HttpClientConnectionManager _connectionManager;
+ public PinotTableReloadResource(PinotTableReloadService
pinotTableReloadService) {
+ _pinotTableReloadService = pinotTableReloadService;
+ }
@POST
@Path("segments/{tableName}/{segmentName}/reload")
@@ -154,34 +114,7 @@ public class PinotTableReloadResource {
@QueryParam("forceDownload") @DefaultValue("false") boolean
forceDownload,
@ApiParam(value = "Target specific server instance")
@QueryParam("targetInstance") @Nullable
String targetInstance, @Context HttpHeaders headers) {
- tableName = DatabaseUtils.translateTableName(tableName, headers);
- long startTimeMs = System.currentTimeMillis();
- segmentName = URIUtils.decode(segmentName);
- String tableNameWithType = getExistingTable(tableName, segmentName);
- Pair<Integer, String> msgInfo =
- _pinotHelixResourceManager.reloadSegment(tableNameWithType,
segmentName, forceDownload, targetInstance);
- boolean zkJobMetaWriteSuccess = false;
- int numReloadMsgSent = msgInfo.getLeft();
- if (numReloadMsgSent > 0) {
- try {
- if
(_pinotHelixResourceManager.addNewReloadSegmentJob(tableNameWithType,
segmentName, targetInstance,
- msgInfo.getRight(), startTimeMs, numReloadMsgSent)) {
- zkJobMetaWriteSuccess = true;
- } else {
- LOGGER.error("Failed to add reload segment job meta into zookeeper
for table: {}, segment: {}",
- tableNameWithType, segmentName);
- }
- } catch (Exception e) {
- LOGGER.error("Failed to add reload segment job meta into zookeeper for
table: {}, segment: {}",
- tableNameWithType, segmentName, e);
- }
- return new SuccessResponse(
- String.format("Submitted reload job id: %s, sent %d reload messages.
Job meta ZK storage status: %s",
- msgInfo.getRight(), numReloadMsgSent, zkJobMetaWriteSuccess ?
"SUCCESS" : "FAILED"));
- }
- throw new ControllerApplicationException(LOGGER,
- String.format("Failed to find segment: %s in table: %s on %s",
segmentName, tableName,
- targetInstance == null ? "every instance" : targetInstance),
Response.Status.NOT_FOUND);
+ return _pinotTableReloadService.reloadSegment(tableName, segmentName,
forceDownload, targetInstance, headers);
}
@POST
@@ -207,123 +140,8 @@ public class PinotTableReloadResource {
@ApiParam(value = "JSON map of instance to segment lists (overrides
targetInstance)")
@QueryParam("instanceToSegmentsMap") @Nullable String
instanceToSegmentsMapInJson, @Context HttpHeaders headers)
throws IOException {
- tableName = DatabaseUtils.translateTableName(tableName, headers);
- TableType tableTypeFromTableName =
TableNameBuilder.getTableTypeFromTableName(tableName);
- TableType tableTypeFromRequest = Constants.validateTableType(tableTypeStr);
- // When rawTableName is provided but w/o table type, Pinot tries to reload
both OFFLINE
- // and REALTIME tables for the raw table. But forceDownload option only
works with
- // OFFLINE table currently, so we limit the table type to OFFLINE to let
Pinot continue
- // to reload w/o being accidentally aborted upon REALTIME table type.
- // TODO: support to force download immutable segments from RealTime table.
- if (forceDownload && (tableTypeFromTableName == null &&
tableTypeFromRequest == null)) {
- tableTypeFromRequest = TableType.OFFLINE;
- }
- List<String> tableNamesWithType =
-
ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager,
tableName, tableTypeFromRequest,
- LOGGER);
- if (instanceToSegmentsMapInJson != null) {
- Map<String, List<String>> instanceToSegmentsMap =
- JsonUtils.stringToObject(instanceToSegmentsMapInJson, new
TypeReference<>() {
- });
- Map<String, Map<String, Map<String, String>>> tableInstanceMsgData =
- reloadSegments(tableNamesWithType, forceDownload,
instanceToSegmentsMap);
- if (tableInstanceMsgData.isEmpty()) {
- throw new ControllerApplicationException(LOGGER,
- String.format("Failed to find any segments in table: %s with
instanceToSegmentsMap: %s", tableName,
- instanceToSegmentsMap), Response.Status.NOT_FOUND);
- }
- return new
SuccessResponse(JsonUtils.objectToString(tableInstanceMsgData));
- }
- long startTimeMs = System.currentTimeMillis();
- Map<String, Map<String, String>> perTableMsgData = new LinkedHashMap<>();
- for (String tableNameWithType : tableNamesWithType) {
- Pair<Integer, String> msgInfo =
- _pinotHelixResourceManager.reloadAllSegments(tableNameWithType,
forceDownload, targetInstance);
- int numReloadMsgSent = msgInfo.getLeft();
- if (numReloadMsgSent <= 0) {
- continue;
- }
- Map<String, String> tableReloadMeta = new HashMap<>();
- tableReloadMeta.put("numMessagesSent", String.valueOf(numReloadMsgSent));
- tableReloadMeta.put("reloadJobId", msgInfo.getRight());
- perTableMsgData.put(tableNameWithType, tableReloadMeta);
- // Store in ZK
- try {
- if
(_pinotHelixResourceManager.addNewReloadAllSegmentsJob(tableNameWithType,
targetInstance, msgInfo.getRight(),
- startTimeMs, numReloadMsgSent)) {
- tableReloadMeta.put("reloadJobMetaZKStorageStatus", "SUCCESS");
- } else {
- tableReloadMeta.put("reloadJobMetaZKStorageStatus", "FAILED");
- LOGGER.error("Failed to add reload all segments job meta into
zookeeper for table: {}", tableNameWithType);
- }
- } catch (Exception e) {
- tableReloadMeta.put("reloadJobMetaZKStorageStatus", "FAILED");
- LOGGER.error("Failed to add reload all segments job meta into
zookeeper for table: {}", tableNameWithType, e);
- }
- }
- if (perTableMsgData.isEmpty()) {
- throw new ControllerApplicationException(LOGGER,
- String.format("Failed to find any segments in table: %s on %s",
tableName,
- targetInstance == null ? "every instance" : targetInstance),
Response.Status.NOT_FOUND);
- }
- return new SuccessResponse(JsonUtils.objectToString(perTableMsgData));
- }
-
- /**
- * Helper method to find the existing table based on the given table name
(with or without type suffix) and segment
- * name.
- */
- private String getExistingTable(String tableName, String segmentName) {
- TableType tableType =
TableNameBuilder.getTableTypeFromTableName(tableName);
- if (tableType == null) {
- // Derive table type from segment name if the given table name doesn't
have type suffix
- tableType = LLCSegmentName.isLLCSegment(segmentName) ? TableType.REALTIME
- :
(UploadedRealtimeSegmentName.isUploadedRealtimeSegmentName(segmentName) ?
TableType.REALTIME
- : TableType.OFFLINE);
- }
- return
ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager,
tableName, tableType, LOGGER).get(0);
- }
-
- private Map<String, Map<String, Map<String, String>>>
reloadSegments(List<String> tableNamesWithType,
- boolean forceDownload, Map<String, List<String>> instanceToSegmentsMap) {
- long startTimeMs = System.currentTimeMillis();
- Map<String, Map<String, Map<String, String>>> tableInstanceMsgData = new
LinkedHashMap<>();
- for (String tableNameWithType : tableNamesWithType) {
- Map<String, Pair<Integer, String>> instanceMsgInfoMap =
- _pinotHelixResourceManager.reloadSegments(tableNameWithType,
forceDownload, instanceToSegmentsMap);
- Map<String, Map<String, String>> instanceMsgData =
- tableInstanceMsgData.computeIfAbsent(tableNameWithType, t -> new
HashMap<>());
- for (Map.Entry<String, Pair<Integer, String>> instanceMsgInfo :
instanceMsgInfoMap.entrySet()) {
- String instance = instanceMsgInfo.getKey();
- Pair<Integer, String> msgInfo = instanceMsgInfo.getValue();
- int numReloadMsgSent = msgInfo.getLeft();
- if (numReloadMsgSent <= 0) {
- continue;
- }
- Map<String, String> tableReloadMeta = new HashMap<>();
- tableReloadMeta.put("numMessagesSent",
String.valueOf(numReloadMsgSent));
- tableReloadMeta.put("reloadJobId", msgInfo.getRight());
- instanceMsgData.put(instance, tableReloadMeta);
- // Store in ZK
- try {
- String segmentNames =
- StringUtils.join(instanceToSegmentsMap.get(instance),
SegmentNameUtils.SEGMENT_NAME_SEPARATOR);
- if
(_pinotHelixResourceManager.addNewReloadSegmentJob(tableNameWithType,
segmentNames, instance,
- msgInfo.getRight(), startTimeMs, numReloadMsgSent)) {
- tableReloadMeta.put("reloadJobMetaZKStorageStatus", "SUCCESS");
- } else {
- tableReloadMeta.put("reloadJobMetaZKStorageStatus", "FAILED");
- LOGGER.error("Failed to add batch reload job meta into zookeeper
for table: {} targeted instance: {}",
- tableNameWithType, instance);
- }
- } catch (Exception e) {
- tableReloadMeta.put("reloadJobMetaZKStorageStatus", "FAILED");
- LOGGER.error("Failed to add batch reload job meta into zookeeper for
table: {} targeted instance: {}",
- tableNameWithType, instance, e);
- }
- }
- }
- return tableInstanceMsgData;
+ return _pinotTableReloadService.reloadAllSegments(tableName, tableTypeStr,
forceDownload, targetInstance,
+ instanceToSegmentsMapInJson, headers);
}
@GET
@@ -339,96 +157,7 @@ public class PinotTableReloadResource {
public ServerReloadControllerJobStatusResponse getReloadJobStatus(
@ApiParam(value = "Reload job ID returned from reload endpoint",
required = true) @PathParam("jobId")
String reloadJobId) throws Exception {
- Map<String, String> controllerJobZKMetadata =
- _pinotHelixResourceManager.getControllerJobZKMetadata(reloadJobId,
ControllerJobTypes.RELOAD_SEGMENT);
- if (controllerJobZKMetadata == null) {
- throw new ControllerApplicationException(LOGGER, "Failed to find
controller job id: " + reloadJobId,
- Response.Status.NOT_FOUND);
- }
-
- String tableNameWithType =
controllerJobZKMetadata.get(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE);
- String segmentNames =
controllerJobZKMetadata.get(CommonConstants.ControllerJob.SEGMENT_RELOAD_JOB_SEGMENT_NAME);
- String instanceName =
controllerJobZKMetadata.get(CommonConstants.ControllerJob.SEGMENT_RELOAD_JOB_INSTANCE_NAME);
- Map<String, List<String>> serverToSegments =
getServerToSegments(tableNameWithType, segmentNames, instanceName);
-
- BiMap<String, String> serverEndPoints =
-
_pinotHelixResourceManager.getDataInstanceAdminEndpoints(serverToSegments.keySet());
- CompletionServiceHelper completionServiceHelper =
- new CompletionServiceHelper(_executor, _connectionManager,
serverEndPoints);
-
- List<String> serverUrls = new ArrayList<>();
- for (Map.Entry<String, String> entry : serverEndPoints.entrySet()) {
- String server = entry.getKey();
- String endpoint = entry.getValue();
- String reloadTaskStatusEndpoint =
- endpoint + "/controllerJob/reloadStatus/" + tableNameWithType +
"?reloadJobTimestamp="
- +
controllerJobZKMetadata.get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS);
- if (segmentNames != null) {
- List<String> segmentsForServer = serverToSegments.get(server);
- StringBuilder encodedSegmentsBuilder = new StringBuilder();
- if (!segmentsForServer.isEmpty()) {
- Iterator<String> segmentIterator = segmentsForServer.iterator();
- // Append first segment without a leading separator
-
encodedSegmentsBuilder.append(URIUtils.encode(segmentIterator.next()));
- // Append remaining segments, each prefixed by the separator
- while (segmentIterator.hasNext()) {
-
encodedSegmentsBuilder.append(SegmentNameUtils.SEGMENT_NAME_SEPARATOR)
- .append(URIUtils.encode(segmentIterator.next()));
- }
- }
- reloadTaskStatusEndpoint += "&segmentName=" + encodedSegmentsBuilder;
- }
- serverUrls.add(reloadTaskStatusEndpoint);
- }
-
- CompletionServiceHelper.CompletionServiceResponse serviceResponse =
- completionServiceHelper.doMultiGetRequest(serverUrls, null, true,
10000);
-
- ServerReloadControllerJobStatusResponse
serverReloadControllerJobStatusResponse =
- new ServerReloadControllerJobStatusResponse();
- serverReloadControllerJobStatusResponse.setSuccessCount(0);
-
- int totalSegments = 0;
- for (Map.Entry<String, List<String>> entry : serverToSegments.entrySet()) {
- totalSegments += entry.getValue().size();
- }
-
serverReloadControllerJobStatusResponse.setTotalSegmentCount(totalSegments);
-
serverReloadControllerJobStatusResponse.setTotalServersQueried(serverUrls.size());
-
serverReloadControllerJobStatusResponse.setTotalServerCallsFailed(serviceResponse._failedResponseCount);
-
- for (Map.Entry<String, String> streamResponse :
serviceResponse._httpResponses.entrySet()) {
- String responseString = streamResponse.getValue();
- try {
- ServerReloadControllerJobStatusResponse response =
- JsonUtils.stringToObject(responseString,
ServerReloadControllerJobStatusResponse.class);
- serverReloadControllerJobStatusResponse.setSuccessCount(
- serverReloadControllerJobStatusResponse.getSuccessCount() +
response.getSuccessCount());
- } catch (Exception e) {
- serverReloadControllerJobStatusResponse.setTotalServerCallsFailed(
-
serverReloadControllerJobStatusResponse.getTotalServerCallsFailed() + 1);
- }
- }
-
- // Add ZK fields
-
serverReloadControllerJobStatusResponse.setMetadata(controllerJobZKMetadata);
-
- // Add derived fields
- long submissionTime =
Long.parseLong(controllerJobZKMetadata.get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS));
- double timeElapsedInMinutes = ((double) System.currentTimeMillis() -
(double) submissionTime) / (1000.0 * 60.0);
- int remainingSegments =
serverReloadControllerJobStatusResponse.getTotalSegmentCount()
- - serverReloadControllerJobStatusResponse.getSuccessCount();
-
- double estimatedRemainingTimeInMinutes = -1;
- if (serverReloadControllerJobStatusResponse.getSuccessCount() > 0) {
- estimatedRemainingTimeInMinutes =
- ((double) remainingSegments / (double)
serverReloadControllerJobStatusResponse.getSuccessCount())
- * timeElapsedInMinutes;
- }
-
-
serverReloadControllerJobStatusResponse.setTimeElapsedInMinutes(timeElapsedInMinutes);
-
serverReloadControllerJobStatusResponse.setEstimatedTimeRemainingInMinutes(estimatedRemainingTimeInMinutes);
-
- return serverReloadControllerJobStatusResponse;
+ return _pinotTableReloadService.getReloadJobStatus(reloadJobId);
}
@GET
@@ -446,59 +175,6 @@ public class PinotTableReloadResource {
@PathParam("tableNameWithType") String tableNameWithType,
@ApiParam(value = "Include detailed server responses", defaultValue =
"false") @QueryParam("verbose")
@DefaultValue("false") boolean verbose, @Context HttpHeaders headers) {
- tableNameWithType = DatabaseUtils.translateTableName(tableNameWithType,
headers);
- LOGGER.info("Received a request to check reload for all servers hosting
segments for table {}", tableNameWithType);
- try {
- TableMetadataReader tableMetadataReader =
- new TableMetadataReader(_executor, _connectionManager,
_pinotHelixResourceManager);
- Map<String, JsonNode> needReloadMetadata =
-
tableMetadataReader.getServerCheckSegmentsReloadMetadata(tableNameWithType,
- _controllerConf.getServerAdminRequestTimeoutSeconds() *
1000).getServerReloadJsonResponses();
- boolean needReload =
- needReloadMetadata.values().stream().anyMatch(value ->
value.get("needReload").booleanValue());
- Map<String, ServerSegmentsReloadCheckResponse> serverResponses = new
HashMap<>();
- TableSegmentsReloadCheckResponse tableNeedReloadResponse;
- if (verbose) {
- for (Map.Entry<String, JsonNode> entry :
needReloadMetadata.entrySet()) {
- serverResponses.put(entry.getKey(),
- new
ServerSegmentsReloadCheckResponse(entry.getValue().get("needReload").booleanValue(),
- entry.getValue().get("instanceId").asText()));
- }
- tableNeedReloadResponse = new
TableSegmentsReloadCheckResponse(needReload, serverResponses);
- } else {
- tableNeedReloadResponse = new
TableSegmentsReloadCheckResponse(needReload, serverResponses);
- }
- return JsonUtils.objectToPrettyString(tableNeedReloadResponse);
- } catch (InvalidConfigException e) {
- throw new ControllerApplicationException(LOGGER, e.getMessage(),
Response.Status.BAD_REQUEST);
- } catch (IOException ioe) {
- throw new ControllerApplicationException(LOGGER, "Error parsing Pinot
server response: " + ioe.getMessage(),
- Response.Status.INTERNAL_SERVER_ERROR, ioe);
- }
- }
-
- @VisibleForTesting
- Map<String, List<String>> getServerToSegments(String tableNameWithType,
@Nullable String segmentNames,
- @Nullable String instanceName) {
- if (segmentNames == null) {
- // instanceName can be null or not null, and this method below can
handle both cases.
- return
_pinotHelixResourceManager.getServerToSegmentsMap(tableNameWithType,
instanceName, true);
- }
- // Skip servers and segments not involved in the segment reloading job.
- List<String> segmnetNameList = new ArrayList<>();
- Collections.addAll(segmnetNameList, StringUtils.split(segmentNames,
SegmentNameUtils.SEGMENT_NAME_SEPARATOR));
- if (instanceName != null) {
- return Map.of(instanceName, segmnetNameList);
- }
- // If instance is null, then either one or all segments are being reloaded
via current segment reload restful APIs.
- // And the if-check at the beginning of this method has handled the case
of reloading all segments. So here we
- // expect only one segment name.
- Preconditions.checkState(segmnetNameList.size() == 1, "Only one segment is
expected but got: %s", segmnetNameList);
- Map<String, List<String>> serverToSegments = new HashMap<>();
- Set<String> servers =
_pinotHelixResourceManager.getServers(tableNameWithType, segmentNames);
- for (String server : servers) {
- serverToSegments.put(server, Collections.singletonList(segmentNames));
- }
- return serverToSegments;
+ return _pinotTableReloadService.getTableReloadMetadata(tableNameWithType,
verbose, headers);
}
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableReloadResource.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/services/PinotTableReloadService.java
similarity index 68%
copy from
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableReloadResource.java
copy to
pinot-controller/src/main/java/org/apache/pinot/controller/services/PinotTableReloadService.java
index a72ec188eaf..5037f32c959 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableReloadResource.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/services/PinotTableReloadService.java
@@ -16,22 +16,13 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.controller.api.resources;
+package org.apache.pinot.controller.services;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.BiMap;
-import io.swagger.annotations.Api;
-import io.swagger.annotations.ApiKeyAuthDefinition;
-import io.swagger.annotations.ApiOperation;
-import io.swagger.annotations.ApiParam;
-import io.swagger.annotations.ApiResponse;
-import io.swagger.annotations.ApiResponses;
-import io.swagger.annotations.Authorization;
-import io.swagger.annotations.SecurityDefinition;
-import io.swagger.annotations.SwaggerDefinition;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
@@ -44,17 +35,8 @@ import java.util.Set;
import java.util.concurrent.Executor;
import javax.annotation.Nullable;
import javax.inject.Inject;
-import javax.ws.rs.DefaultValue;
-import javax.ws.rs.Encoded;
-import javax.ws.rs.GET;
-import javax.ws.rs.POST;
-import javax.ws.rs.Path;
-import javax.ws.rs.PathParam;
-import javax.ws.rs.Produces;
-import javax.ws.rs.QueryParam;
-import javax.ws.rs.core.Context;
+import javax.inject.Singleton;
import javax.ws.rs.core.HttpHeaders;
-import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
@@ -67,16 +49,15 @@ import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.common.utils.URIUtils;
import org.apache.pinot.common.utils.UploadedRealtimeSegmentName;
import org.apache.pinot.controller.ControllerConf;
-import org.apache.pinot.controller.api.access.AccessType;
-import org.apache.pinot.controller.api.access.Authenticate;
import
org.apache.pinot.controller.api.exception.ControllerApplicationException;
+import org.apache.pinot.controller.api.resources.Constants;
+import org.apache.pinot.controller.api.resources.ResourceUtils;
+import
org.apache.pinot.controller.api.resources.ServerReloadControllerJobStatusResponse;
+import org.apache.pinot.controller.api.resources.SuccessResponse;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobTypes;
import org.apache.pinot.controller.util.CompletionServiceHelper;
import org.apache.pinot.controller.util.TableMetadataReader;
-import org.apache.pinot.core.auth.Actions;
-import org.apache.pinot.core.auth.Authorize;
-import org.apache.pinot.core.auth.TargetType;
import org.apache.pinot.segment.spi.creator.name.SegmentNameUtils;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.utils.CommonConstants;
@@ -85,75 +66,27 @@ import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.apache.pinot.spi.utils.CommonConstants.DATABASE;
-import static
org.apache.pinot.spi.utils.CommonConstants.SWAGGER_AUTHORIZATION_KEY;
+@Singleton
+public class PinotTableReloadService {
-/**
- * REST API resource for reloading table segments.
- * <ul>
- * <li>
- * POST requests:
- * <ul>
- * <li>"/segments/{tableName}/{segmentName}/reload": reload a specific
segment</li>
- * <li>"/segments/{tableName}/reload": reload all segments in a
table</li>
- * </ul>
- * </li>
- * <li>
- * GET requests:
- * <ul>
- * <li>"/segments/segmentReloadStatus/{jobId}": get status for a
submitted reload job</li>
- * <li>"/segments/{tableNameWithType}/needReload": check if table
segments need reloading</li>
- * </ul>
- * </li>
- * </ul>
- */
-@Api(tags = Constants.SEGMENT_TAG, authorizations = {
- @Authorization(value = SWAGGER_AUTHORIZATION_KEY), @Authorization(value =
DATABASE)
-})
-@SwaggerDefinition(securityDefinition =
@SecurityDefinition(apiKeyAuthDefinitions = {
- @ApiKeyAuthDefinition(name = HttpHeaders.AUTHORIZATION, in =
ApiKeyAuthDefinition.ApiKeyLocation.HEADER, key =
- SWAGGER_AUTHORIZATION_KEY, description = "The format of the key is
```\"Basic <token>\" or \"Bearer "
- + "<token>\"```"), @ApiKeyAuthDefinition(name = DATABASE, in =
ApiKeyAuthDefinition.ApiKeyLocation.HEADER,
- key = DATABASE, description =
- "Database context passed through http header. If no context is provided
'default' database "
- + "context will be considered.")
-}))
-@Path("/")
-public class PinotTableReloadResource {
- private static final Logger LOGGER =
LoggerFactory.getLogger(PinotTableReloadResource.class);
-
- @Inject
- ControllerConf _controllerConf;
+ private static final Logger LOG =
LoggerFactory.getLogger(PinotTableReloadService.class);
- @Inject
- PinotHelixResourceManager _pinotHelixResourceManager;
-
- @Inject
- Executor _executor;
+ private final PinotHelixResourceManager _pinotHelixResourceManager;
+ private final ControllerConf _controllerConf;
+ private final Executor _executor;
+ private final HttpClientConnectionManager _connectionManager;
@Inject
- HttpClientConnectionManager _connectionManager;
+ public PinotTableReloadService(PinotHelixResourceManager
pinotHelixResourceManager, ControllerConf controllerConf,
+ Executor executor, HttpClientConnectionManager connectionManager) {
+ _pinotHelixResourceManager = pinotHelixResourceManager;
+ _controllerConf = controllerConf;
+ _executor = executor;
+ _connectionManager = connectionManager;
+ }
- @POST
- @Path("segments/{tableName}/{segmentName}/reload")
- @Authorize(targetType = TargetType.TABLE, paramName = "tableName", action =
Actions.Table.RELOAD_SEGMENT)
- @Authenticate(AccessType.UPDATE)
- @Produces(MediaType.APPLICATION_JSON)
- @ApiOperation(value = "Reload a specific segment",
- notes = "Triggers segment reload on servers. Returns job ID and message
count.")
- @ApiResponses(value = {
- @ApiResponse(code = 200, message = "Reload job submitted successfully"),
- @ApiResponse(code = 404, message = "Segment or table not found")
- })
- public SuccessResponse reloadSegment(
- @ApiParam(value = "Table name with or without type suffix", required =
true, example = "myTable_OFFLINE")
- @PathParam("tableName") String tableName,
- @ApiParam(value = "Segment name", required = true, example = "myTable_0")
- @PathParam("segmentName") @Encoded String segmentName,
- @ApiParam(value = "Force server to re-download segment from deep store",
defaultValue = "false")
- @QueryParam("forceDownload") @DefaultValue("false") boolean
forceDownload,
- @ApiParam(value = "Target specific server instance")
@QueryParam("targetInstance") @Nullable
- String targetInstance, @Context HttpHeaders headers) {
+ public SuccessResponse reloadSegment(String tableName, String segmentName,
boolean forceDownload,
+ String targetInstance, HttpHeaders headers) {
tableName = DatabaseUtils.translateTableName(tableName, headers);
long startTimeMs = System.currentTimeMillis();
segmentName = URIUtils.decode(segmentName);
@@ -168,44 +101,24 @@ public class PinotTableReloadResource {
msgInfo.getRight(), startTimeMs, numReloadMsgSent)) {
zkJobMetaWriteSuccess = true;
} else {
- LOGGER.error("Failed to add reload segment job meta into zookeeper
for table: {}, segment: {}",
+ LOG.error("Failed to add reload segment job meta into zookeeper for
table: {}, segment: {}",
tableNameWithType, segmentName);
}
} catch (Exception e) {
- LOGGER.error("Failed to add reload segment job meta into zookeeper for
table: {}, segment: {}",
+ LOG.error("Failed to add reload segment job meta into zookeeper for
table: {}, segment: {}",
tableNameWithType, segmentName, e);
}
return new SuccessResponse(
String.format("Submitted reload job id: %s, sent %d reload messages.
Job meta ZK storage status: %s",
msgInfo.getRight(), numReloadMsgSent, zkJobMetaWriteSuccess ?
"SUCCESS" : "FAILED"));
}
- throw new ControllerApplicationException(LOGGER,
+ throw new ControllerApplicationException(LOG,
String.format("Failed to find segment: %s in table: %s on %s",
segmentName, tableName,
targetInstance == null ? "every instance" : targetInstance),
Response.Status.NOT_FOUND);
}
- @POST
- @Path("segments/{tableName}/reload")
- @Authorize(targetType = TargetType.TABLE, paramName = "tableName", action =
Actions.Table.RELOAD_SEGMENT)
- @Authenticate(AccessType.UPDATE)
- @Produces(MediaType.APPLICATION_JSON)
- @ApiOperation(value = "Reload all segments in a table",
- notes = "Reloads all segments for the specified table. Supports
filtering by type, instance, or custom mapping.")
- @ApiResponses(value = {
- @ApiResponse(code = 200, message = "Reload jobs submitted successfully"),
- @ApiResponse(code = 404, message = "No segments found")
- })
- public SuccessResponse reloadAllSegments(
- @ApiParam(value = "Table name with or without type suffix", required =
true, example = "myTable")
- @PathParam("tableName") String tableName,
- @ApiParam(value = "Table type filter", allowableValues =
"OFFLINE,REALTIME") @QueryParam("type")
- String tableTypeStr,
- @ApiParam(value = "Force server to re-download segments from deep
store", defaultValue = "false")
- @QueryParam("forceDownload") @DefaultValue("false") boolean
forceDownload,
- @ApiParam(value = "Target specific server instance")
@QueryParam("targetInstance") @Nullable
- String targetInstance,
- @ApiParam(value = "JSON map of instance to segment lists (overrides
targetInstance)")
- @QueryParam("instanceToSegmentsMap") @Nullable String
instanceToSegmentsMapInJson, @Context HttpHeaders headers)
+ public SuccessResponse reloadAllSegments(String tableName, String
tableTypeStr, boolean forceDownload,
+ String targetInstance, String instanceToSegmentsMapInJson, HttpHeaders
headers)
throws IOException {
tableName = DatabaseUtils.translateTableName(tableName, headers);
TableType tableTypeFromTableName =
TableNameBuilder.getTableTypeFromTableName(tableName);
@@ -219,8 +132,7 @@ public class PinotTableReloadResource {
tableTypeFromRequest = TableType.OFFLINE;
}
List<String> tableNamesWithType =
-
ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager,
tableName, tableTypeFromRequest,
- LOGGER);
+
ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager,
tableName, tableTypeFromRequest, LOG);
if (instanceToSegmentsMapInJson != null) {
Map<String, List<String>> instanceToSegmentsMap =
JsonUtils.stringToObject(instanceToSegmentsMapInJson, new
TypeReference<>() {
@@ -228,7 +140,7 @@ public class PinotTableReloadResource {
Map<String, Map<String, Map<String, String>>> tableInstanceMsgData =
reloadSegments(tableNamesWithType, forceDownload,
instanceToSegmentsMap);
if (tableInstanceMsgData.isEmpty()) {
- throw new ControllerApplicationException(LOGGER,
+ throw new ControllerApplicationException(LOG,
String.format("Failed to find any segments in table: %s with
instanceToSegmentsMap: %s", tableName,
instanceToSegmentsMap), Response.Status.NOT_FOUND);
}
@@ -254,95 +166,27 @@ public class PinotTableReloadResource {
tableReloadMeta.put("reloadJobMetaZKStorageStatus", "SUCCESS");
} else {
tableReloadMeta.put("reloadJobMetaZKStorageStatus", "FAILED");
- LOGGER.error("Failed to add reload all segments job meta into
zookeeper for table: {}", tableNameWithType);
+ LOG.error("Failed to add reload all segments job meta into zookeeper
for table: {}", tableNameWithType);
}
} catch (Exception e) {
tableReloadMeta.put("reloadJobMetaZKStorageStatus", "FAILED");
- LOGGER.error("Failed to add reload all segments job meta into
zookeeper for table: {}", tableNameWithType, e);
+ LOG.error("Failed to add reload all segments job meta into zookeeper
for table: {}", tableNameWithType, e);
}
}
if (perTableMsgData.isEmpty()) {
- throw new ControllerApplicationException(LOGGER,
+ throw new ControllerApplicationException(LOG,
String.format("Failed to find any segments in table: %s on %s",
tableName,
targetInstance == null ? "every instance" : targetInstance),
Response.Status.NOT_FOUND);
}
return new SuccessResponse(JsonUtils.objectToString(perTableMsgData));
}
- /**
- * Helper method to find the existing table based on the given table name
(with or without type suffix) and segment
- * name.
- */
- private String getExistingTable(String tableName, String segmentName) {
- TableType tableType =
TableNameBuilder.getTableTypeFromTableName(tableName);
- if (tableType == null) {
- // Derive table type from segment name if the given table name doesn't
have type suffix
- tableType = LLCSegmentName.isLLCSegment(segmentName) ? TableType.REALTIME
- :
(UploadedRealtimeSegmentName.isUploadedRealtimeSegmentName(segmentName) ?
TableType.REALTIME
- : TableType.OFFLINE);
- }
- return
ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager,
tableName, tableType, LOGGER).get(0);
- }
-
- private Map<String, Map<String, Map<String, String>>>
reloadSegments(List<String> tableNamesWithType,
- boolean forceDownload, Map<String, List<String>> instanceToSegmentsMap) {
- long startTimeMs = System.currentTimeMillis();
- Map<String, Map<String, Map<String, String>>> tableInstanceMsgData = new
LinkedHashMap<>();
- for (String tableNameWithType : tableNamesWithType) {
- Map<String, Pair<Integer, String>> instanceMsgInfoMap =
- _pinotHelixResourceManager.reloadSegments(tableNameWithType,
forceDownload, instanceToSegmentsMap);
- Map<String, Map<String, String>> instanceMsgData =
- tableInstanceMsgData.computeIfAbsent(tableNameWithType, t -> new
HashMap<>());
- for (Map.Entry<String, Pair<Integer, String>> instanceMsgInfo :
instanceMsgInfoMap.entrySet()) {
- String instance = instanceMsgInfo.getKey();
- Pair<Integer, String> msgInfo = instanceMsgInfo.getValue();
- int numReloadMsgSent = msgInfo.getLeft();
- if (numReloadMsgSent <= 0) {
- continue;
- }
- Map<String, String> tableReloadMeta = new HashMap<>();
- tableReloadMeta.put("numMessagesSent",
String.valueOf(numReloadMsgSent));
- tableReloadMeta.put("reloadJobId", msgInfo.getRight());
- instanceMsgData.put(instance, tableReloadMeta);
- // Store in ZK
- try {
- String segmentNames =
- StringUtils.join(instanceToSegmentsMap.get(instance),
SegmentNameUtils.SEGMENT_NAME_SEPARATOR);
- if
(_pinotHelixResourceManager.addNewReloadSegmentJob(tableNameWithType,
segmentNames, instance,
- msgInfo.getRight(), startTimeMs, numReloadMsgSent)) {
- tableReloadMeta.put("reloadJobMetaZKStorageStatus", "SUCCESS");
- } else {
- tableReloadMeta.put("reloadJobMetaZKStorageStatus", "FAILED");
- LOGGER.error("Failed to add batch reload job meta into zookeeper
for table: {} targeted instance: {}",
- tableNameWithType, instance);
- }
- } catch (Exception e) {
- tableReloadMeta.put("reloadJobMetaZKStorageStatus", "FAILED");
- LOGGER.error("Failed to add batch reload job meta into zookeeper for
table: {} targeted instance: {}",
- tableNameWithType, instance, e);
- }
- }
- }
- return tableInstanceMsgData;
- }
-
- @GET
- @Path("segments/segmentReloadStatus/{jobId}")
- @Authorize(targetType = TargetType.CLUSTER, action =
Actions.Cluster.GET_SEGMENT_RELOAD_STATUS)
- @Produces(MediaType.APPLICATION_JSON)
- @ApiOperation(value = "Get reload job status",
- notes = "Returns progress and metadata for a reload job including
completion stats and time estimates.")
- @ApiResponses(value = {
- @ApiResponse(code = 200, message = "Job status retrieved successfully"),
- @ApiResponse(code = 404, message = "Job ID not found")
- })
- public ServerReloadControllerJobStatusResponse getReloadJobStatus(
- @ApiParam(value = "Reload job ID returned from reload endpoint",
required = true) @PathParam("jobId")
- String reloadJobId) throws Exception {
+ public ServerReloadControllerJobStatusResponse getReloadJobStatus(String
reloadJobId)
+ throws InvalidConfigException {
Map<String, String> controllerJobZKMetadata =
_pinotHelixResourceManager.getControllerJobZKMetadata(reloadJobId,
ControllerJobTypes.RELOAD_SEGMENT);
if (controllerJobZKMetadata == null) {
- throw new ControllerApplicationException(LOGGER, "Failed to find
controller job id: " + reloadJobId,
+ throw new ControllerApplicationException(LOG, "Failed to find controller
job id: " + reloadJobId,
Response.Status.NOT_FOUND);
}
@@ -431,23 +275,9 @@ public class PinotTableReloadResource {
return serverReloadControllerJobStatusResponse;
}
- @GET
- @Path("segments/{tableNameWithType}/needReload")
- @Authorize(targetType = TargetType.TABLE, paramName = "tableNameWithType",
action = Actions.Table.GET_METADATA)
- @Produces(MediaType.APPLICATION_JSON)
- @ApiOperation(value = "Check if table needs reload",
- notes = "Queries all servers hosting the table to determine if segments
need reloading.")
- @ApiResponses(value = {
- @ApiResponse(code = 200, message = "Reload check completed
successfully"),
- @ApiResponse(code = 400, message = "Invalid table configuration")
- })
- public String getTableReloadMetadata(
- @ApiParam(value = "Table name with type suffix", required = true,
example = "myTable_REALTIME")
- @PathParam("tableNameWithType") String tableNameWithType,
- @ApiParam(value = "Include detailed server responses", defaultValue =
"false") @QueryParam("verbose")
- @DefaultValue("false") boolean verbose, @Context HttpHeaders headers) {
+ public String getTableReloadMetadata(String tableNameWithType, boolean
verbose, HttpHeaders headers) {
tableNameWithType = DatabaseUtils.translateTableName(tableNameWithType,
headers);
- LOGGER.info("Received a request to check reload for all servers hosting
segments for table {}", tableNameWithType);
+ LOG.info("Received a request to check reload for all servers hosting
segments for table {}", tableNameWithType);
try {
TableMetadataReader tableMetadataReader =
new TableMetadataReader(_executor, _connectionManager,
_pinotHelixResourceManager);
@@ -470,9 +300,9 @@ public class PinotTableReloadResource {
}
return JsonUtils.objectToPrettyString(tableNeedReloadResponse);
} catch (InvalidConfigException e) {
- throw new ControllerApplicationException(LOGGER, e.getMessage(),
Response.Status.BAD_REQUEST);
+ throw new ControllerApplicationException(LOG, e.getMessage(),
Response.Status.BAD_REQUEST);
} catch (IOException ioe) {
- throw new ControllerApplicationException(LOGGER, "Error parsing Pinot
server response: " + ioe.getMessage(),
+ throw new ControllerApplicationException(LOG, "Error parsing Pinot
server response: " + ioe.getMessage(),
Response.Status.INTERNAL_SERVER_ERROR, ioe);
}
}
@@ -501,4 +331,61 @@ public class PinotTableReloadResource {
}
return serverToSegments;
}
+
+ /**
+ * Helper method to find the existing table based on the given table name
(with or without type suffix) and segment
+ * name.
+ */
+ private String getExistingTable(String tableName, String segmentName) {
+ TableType tableType =
TableNameBuilder.getTableTypeFromTableName(tableName);
+ if (tableType == null) {
+ // Derive table type from segment name if the given table name doesn't
have type suffix
+ tableType = LLCSegmentName.isLLCSegment(segmentName) ? TableType.REALTIME
+ :
(UploadedRealtimeSegmentName.isUploadedRealtimeSegmentName(segmentName) ?
TableType.REALTIME
+ : TableType.OFFLINE);
+ }
+ return
ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager,
tableName, tableType, LOG).get(0);
+ }
+
+ private Map<String, Map<String, Map<String, String>>>
reloadSegments(List<String> tableNamesWithType,
+ boolean forceDownload, Map<String, List<String>> instanceToSegmentsMap) {
+ long startTimeMs = System.currentTimeMillis();
+ Map<String, Map<String, Map<String, String>>> tableInstanceMsgData = new
LinkedHashMap<>();
+ for (String tableNameWithType : tableNamesWithType) {
+ Map<String, Pair<Integer, String>> instanceMsgInfoMap =
+ _pinotHelixResourceManager.reloadSegments(tableNameWithType,
forceDownload, instanceToSegmentsMap);
+ Map<String, Map<String, String>> instanceMsgData =
+ tableInstanceMsgData.computeIfAbsent(tableNameWithType, t -> new
HashMap<>());
+ for (Map.Entry<String, Pair<Integer, String>> instanceMsgInfo :
instanceMsgInfoMap.entrySet()) {
+ String instance = instanceMsgInfo.getKey();
+ Pair<Integer, String> msgInfo = instanceMsgInfo.getValue();
+ int numReloadMsgSent = msgInfo.getLeft();
+ if (numReloadMsgSent <= 0) {
+ continue;
+ }
+ Map<String, String> tableReloadMeta = new HashMap<>();
+ tableReloadMeta.put("numMessagesSent",
String.valueOf(numReloadMsgSent));
+ tableReloadMeta.put("reloadJobId", msgInfo.getRight());
+ instanceMsgData.put(instance, tableReloadMeta);
+ // Store in ZK
+ try {
+ String segmentNames =
+ StringUtils.join(instanceToSegmentsMap.get(instance),
SegmentNameUtils.SEGMENT_NAME_SEPARATOR);
+ if
(_pinotHelixResourceManager.addNewReloadSegmentJob(tableNameWithType,
segmentNames, instance,
+ msgInfo.getRight(), startTimeMs, numReloadMsgSent)) {
+ tableReloadMeta.put("reloadJobMetaZKStorageStatus", "SUCCESS");
+ } else {
+ tableReloadMeta.put("reloadJobMetaZKStorageStatus", "FAILED");
+ LOG.error("Failed to add batch reload job meta into zookeeper for
table: {} targeted instance: {}",
+ tableNameWithType, instance);
+ }
+ } catch (Exception e) {
+ tableReloadMeta.put("reloadJobMetaZKStorageStatus", "FAILED");
+ LOG.error("Failed to add batch reload job meta into zookeeper for
table: {} targeted instance: {}",
+ tableNameWithType, instance, e);
+ }
+ }
+ }
+ return tableInstanceMsgData;
+ }
}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotTableReloadResourceTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/services/PinotTableReloadServiceTest.java
similarity index 80%
rename from
pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotTableReloadResourceTest.java
rename to
pinot-controller/src/test/java/org/apache/pinot/controller/services/PinotTableReloadServiceTest.java
index ed8fabb366f..a18f8c0ccf5 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotTableReloadResourceTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/services/PinotTableReloadServiceTest.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.controller.api.resources;
+package org.apache.pinot.controller.services;
import java.util.ArrayList;
import java.util.HashMap;
@@ -35,12 +35,12 @@ import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
-public class PinotTableReloadResourceTest {
+public class PinotTableReloadServiceTest {
@Mock
PinotHelixResourceManager _pinotHelixResourceManager;
@InjectMocks
- PinotTableReloadResource _resource;
+ PinotTableReloadService _service;
@BeforeMethod
public void setup() {
@@ -60,26 +60,26 @@ public class PinotTableReloadResourceTest {
when(_pinotHelixResourceManager.getServers(tableName,
"seg01")).thenReturn(Set.of("svr01", "svr03"));
// Get all servers and all their segments.
- Map<String, List<String>> serverToSegmentsMap =
_resource.getServerToSegments(tableName, null, null);
+ Map<String, List<String>> serverToSegmentsMap =
_service.getServerToSegments(tableName, null, null);
assertEquals(serverToSegmentsMap, fullServerToSegmentsMap);
// Get all segments on svr02.
- serverToSegmentsMap = _resource.getServerToSegments(tableName, null,
"svr02");
+ serverToSegmentsMap = _service.getServerToSegments(tableName, null,
"svr02");
assertEquals(serverToSegmentsMap, Map.of("svr02", List.of("seg02",
"seg03")));
// Get all servers with seg01.
- serverToSegmentsMap = _resource.getServerToSegments(tableName, "seg01",
null);
+ serverToSegmentsMap = _service.getServerToSegments(tableName, "seg01",
null);
assertEquals(serverToSegmentsMap, Map.of("svr01", List.of("seg01"),
"svr03", List.of("seg01")));
// Simply map the provided server to the provided segments.
- serverToSegmentsMap = _resource.getServerToSegments(tableName, "seg01",
"svr01");
+ serverToSegmentsMap = _service.getServerToSegments(tableName, "seg01",
"svr01");
assertEquals(serverToSegmentsMap, Map.of("svr01", List.of("seg01")));
- serverToSegmentsMap = _resource.getServerToSegments(tableName,
"anySegment", "anyServer");
+ serverToSegmentsMap = _service.getServerToSegments(tableName,
"anySegment", "anyServer");
assertEquals(serverToSegmentsMap, Map.of("anyServer",
List.of("anySegment")));
- serverToSegmentsMap = _resource.getServerToSegments(tableName,
"seg01|seg02", "svr02");
+ serverToSegmentsMap = _service.getServerToSegments(tableName,
"seg01|seg02", "svr02");
assertEquals(serverToSegmentsMap, Map.of("svr02", List.of("seg01",
"seg02")));
try {
- _resource.getServerToSegments(tableName, "seg01,seg02", null);
+ _service.getServerToSegments(tableName, "seg01,seg02", null);
} catch (Exception e) {
assertTrue(e.getMessage().contains("Only one segment is expected but
got: [seg01, seg02]"));
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]