Jackie-Jiang commented on code in PR #8828: URL: https://github.com/apache/pinot/pull/8828#discussion_r931645241
########## pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java: ########## @@ -577,6 +599,99 @@ public SuccessResponse reloadSegmentDeprecated2( return reloadSegmentDeprecated1(tableName, segmentName, tableTypeStr); } + @GET + @Path("segments/segmentReloadStatus/{jobId}") + @Produces(MediaType.APPLICATION_JSON) + @ApiOperation(value = "Get status for a submitted reload operation", + notes = "Get status for a submitted reload operation") + public ServerReloadControllerJobStatusResponse getReloadJobStatus( + @ApiParam(value = "Reload job id", required = true) @PathParam("jobId") String reloadJobId) + throws Exception { + Map<String, String> controllerJobZKMetadata = _pinotHelixResourceManager.getControllerJobZKMetadata(reloadJobId); + if (controllerJobZKMetadata == null) { + throw new ControllerApplicationException(LOGGER, "Failed to find controller job id: " + reloadJobId, + Status.NOT_FOUND); + } + + String tableNameWithType = + controllerJobZKMetadata.get(CommonConstants.ControllerJob.CONTROLLER_JOB_TABLE_NAME_WITH_TYPE); + Map<String, List<String>> serverToSegments = _pinotHelixResourceManager.getServerToSegmentsMap(tableNameWithType); + + String singleSegmentName = null; + if (controllerJobZKMetadata.get(CommonConstants.ControllerJob.CONTROLLER_JOB_TYPE) + .equals(ControllerJobType.RELOAD_SEGMENT.toString())) { + // No need to query servers where this segment is not supposed to be hosted Review Comment: For single segment, we can use `_pinotHelixResourceManager.getServers()` instead ########## pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java: ########## @@ -597,14 +713,31 @@ public SuccessResponse reloadAllSegments( if (forceDownload && (tableTypeFromTableName == null && tableTypeFromRequest == null)) { tableTypeFromRequest = TableType.OFFLINE; } - List<String> tableNamesWithType = ResourceUtils - .getExistingTableNamesWithType(_pinotHelixResourceManager, tableName, tableTypeFromRequest, LOGGER); - Map<String, Integer> numMessagesSentPerTable = new LinkedHashMap<>(); + List<String> tableNamesWithType = + ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager, tableName, tableTypeFromRequest, + LOGGER); + Map<String, Map<String, String>> perTableMsgData = new LinkedHashMap<>(); for (String tableNameWithType : tableNamesWithType) { - int numMsgSent = _pinotHelixResourceManager.reloadAllSegments(tableNameWithType, forceDownload); - numMessagesSentPerTable.put(tableNameWithType, numMsgSent); + Pair<Integer, String> msgInfo = _pinotHelixResourceManager.reloadAllSegments(tableNameWithType, forceDownload); + Map<String, String> tableReloadMeta = new HashMap<>(); + tableReloadMeta.put("numberOfMessagesSent", String.valueOf(msgInfo.getLeft())); + tableReloadMeta.put("reloadMessageId", msgInfo.getRight()); Review Comment: ```suggestion tableReloadMeta.put("numMessagesSent", String.valueOf(msgInfo.getLeft())); tableReloadMeta.put("reloadJobId", msgInfo.getRight()); ``` ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java: ########## @@ -1984,6 +1988,111 @@ private Set<String> getAllInstancesForTable(String tableNameWithType) { return instanceSet; } + /** + * Returns the ZK metdata for the given jobId + * @param jobId the id of the job + * @return Map representing the job's ZK properties + */ + public Map<String, String> getControllerJobZKMetadata(String jobId) { Review Comment: Annotate the return as `@Nullable` ########## pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java: ########## @@ -2366,16 +2367,44 @@ public void testJDBCClient() Assert.assertTrue(resultSet.getLong(1) > 0); } - private java.sql.Connection getJDBCConnectionFromController(int controllerPort) throws Exception { + private java.sql.Connection getJDBCConnectionFromController(int controllerPort) + throws Exception { PinotDriver pinotDriver = new PinotDriver(); Properties jdbcProps = new Properties(); return pinotDriver.connect("jdbc:pinot://localhost:" + controllerPort, jdbcProps); } - private java.sql.Connection getJDBCConnectionFromBrokers(int controllerPort, int brokerPort) throws Exception { + private java.sql.Connection getJDBCConnectionFromBrokers(int controllerPort, int brokerPort) + throws Exception { PinotDriver pinotDriver = new PinotDriver(); Properties jdbcProps = new Properties(); jdbcProps.put(PinotConnection.BROKER_LIST, "localhost:" + brokerPort); return pinotDriver.connect("jdbc:pinot://localhost:" + controllerPort, jdbcProps); } + + private void reloadOfflineTableAndValidateResponse(String tableName, boolean forceDownload) { + try { + String response = + sendPostRequest(_controllerRequestURLBuilder.forTableReload(tableName, TableType.OFFLINE, forceDownload), + null); + String tableNameWithType = TableNameBuilder.OFFLINE.tableNameWithType(tableName); + JsonNode tableLevelDetails = + JsonUtils.stringToJsonNode(StringEscapeUtils.unescapeJava(response.split(": ")[1])).get(tableNameWithType); + String isZKWriteSuccess = tableLevelDetails.get("reloadJobMetaZKStorageStatus").asText(); + + if (isZKWriteSuccess.equals("SUCCESS")) { Review Comment: Let's verify that it is `SUCCESS` ########## pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java: ########## @@ -557,6 +557,23 @@ public static class Minion { public static final String CONFIG_OF_MINION_QUERY_REWRITER_CLASS_NAMES = "pinot.minion.query.rewriter.class.names"; } + public static class ControllerJob { + /** + * Controller job ZK props + */ + public static final String CONTROLLER_JOB_TYPE = "jobType"; + public static final String CONTROLLER_JOB_TABLE_NAME_WITH_TYPE = "tableName"; + public static final String CONTROLLER_JOB_ID = "jobId"; + public static final String CONTROLLER_JOB_SUBMISSION_TIME = "submissionTime"; Review Comment: (minor) Consider adding `Ms` to be more clear ```suggestion public static final String SUBMISSION_TIME_MS = "submissionTimeMs"; ``` ########## pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java: ########## @@ -557,6 +557,23 @@ public static class Minion { public static final String CONFIG_OF_MINION_QUERY_REWRITER_CLASS_NAMES = "pinot.minion.query.rewriter.class.names"; } + public static class ControllerJob { + /** + * Controller job ZK props + */ + public static final String CONTROLLER_JOB_TYPE = "jobType"; Review Comment: (minor) We may also simplify the key name because it is under `ControllerJob` class ```suggestion public static final String JOB_TYPE = "jobType"; ``` ########## pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java: ########## @@ -2366,16 +2367,44 @@ public void testJDBCClient() Assert.assertTrue(resultSet.getLong(1) > 0); } - private java.sql.Connection getJDBCConnectionFromController(int controllerPort) throws Exception { + private java.sql.Connection getJDBCConnectionFromController(int controllerPort) + throws Exception { PinotDriver pinotDriver = new PinotDriver(); Properties jdbcProps = new Properties(); return pinotDriver.connect("jdbc:pinot://localhost:" + controllerPort, jdbcProps); } - private java.sql.Connection getJDBCConnectionFromBrokers(int controllerPort, int brokerPort) throws Exception { + private java.sql.Connection getJDBCConnectionFromBrokers(int controllerPort, int brokerPort) + throws Exception { PinotDriver pinotDriver = new PinotDriver(); Properties jdbcProps = new Properties(); jdbcProps.put(PinotConnection.BROKER_LIST, "localhost:" + brokerPort); return pinotDriver.connect("jdbc:pinot://localhost:" + controllerPort, jdbcProps); } + + private void reloadOfflineTableAndValidateResponse(String tableName, boolean forceDownload) { + try { + String response = + sendPostRequest(_controllerRequestURLBuilder.forTableReload(tableName, TableType.OFFLINE, forceDownload), + null); + String tableNameWithType = TableNameBuilder.OFFLINE.tableNameWithType(tableName); + JsonNode tableLevelDetails = + JsonUtils.stringToJsonNode(StringEscapeUtils.unescapeJava(response.split(": ")[1])).get(tableNameWithType); + String isZKWriteSuccess = tableLevelDetails.get("reloadJobMetaZKStorageStatus").asText(); + + if (isZKWriteSuccess.equals("SUCCESS")) { + // We can validate reload status API now + String jobId = tableLevelDetails.get("reloadMessageId").asText(); + String jobStatusResponse = sendGetRequest(_controllerRequestURLBuilder.forControllerJobStatus(jobId)); + JsonNode jobStatus = JsonUtils.stringToJsonNode(jobStatusResponse); + + // Validate all fields are present + assertEquals(jobStatus.get("metadata").get("jobId").asText(), jobId); + assertEquals(jobStatus.get("metadata").get("jobType").asText(), "RELOAD_ALL_SEGMENTS"); + assertEquals(jobStatus.get("metadata").get("tableName").asText(), tableNameWithType); + } + } catch (Exception e) { + Assert.fail("Reload failed :" + e.getMessage()); Review Comment: (nit) ```suggestion Assert.fail("Reload failed: " + e.getMessage()); ``` ########## pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java: ########## @@ -577,6 +599,99 @@ public SuccessResponse reloadSegmentDeprecated2( return reloadSegmentDeprecated1(tableName, segmentName, tableTypeStr); } + @GET + @Path("segments/segmentReloadStatus/{jobId}") + @Produces(MediaType.APPLICATION_JSON) + @ApiOperation(value = "Get status for a submitted reload operation", + notes = "Get status for a submitted reload operation") + public ServerReloadControllerJobStatusResponse getReloadJobStatus( + @ApiParam(value = "Reload job id", required = true) @PathParam("jobId") String reloadJobId) + throws Exception { + Map<String, String> controllerJobZKMetadata = _pinotHelixResourceManager.getControllerJobZKMetadata(reloadJobId); + if (controllerJobZKMetadata == null) { + throw new ControllerApplicationException(LOGGER, "Failed to find controller job id: " + reloadJobId, + Status.NOT_FOUND); + } + + String tableNameWithType = + controllerJobZKMetadata.get(CommonConstants.ControllerJob.CONTROLLER_JOB_TABLE_NAME_WITH_TYPE); + Map<String, List<String>> serverToSegments = _pinotHelixResourceManager.getServerToSegmentsMap(tableNameWithType); + + String singleSegmentName = null; + if (controllerJobZKMetadata.get(CommonConstants.ControllerJob.CONTROLLER_JOB_TYPE) + .equals(ControllerJobType.RELOAD_SEGMENT.toString())) { + // No need to query servers where this segment is not supposed to be hosted + serverToSegments = serverToSegments.entrySet().stream().filter(kv -> kv.getValue() + .contains(controllerJobZKMetadata.get(CommonConstants.ControllerJob.SEGMENT_RELOAD_JOB_SEGMENT_NAME))) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + singleSegmentName = controllerJobZKMetadata.get(CommonConstants.ControllerJob.SEGMENT_RELOAD_JOB_SEGMENT_NAME); + } + + BiMap<String, String> serverEndPoints = + _pinotHelixResourceManager.getDataInstanceAdminEndpoints(serverToSegments.keySet()); + CompletionServiceHelper completionServiceHelper = + new CompletionServiceHelper(_executor, _connectionManager, serverEndPoints); + + List<String> serverUrls = new ArrayList<>(); + BiMap<String, String> endpointsToServers = serverEndPoints.inverse(); + for (String endpoint : endpointsToServers.keySet()) { + String reloadTaskStatusEndpoint = + endpoint + "/controllerJob/reloadStatus/" + tableNameWithType + "?reloadJobTimestamp=" + + controllerJobZKMetadata.get(CommonConstants.ControllerJob.CONTROLLER_JOB_SUBMISSION_TIME); + if (singleSegmentName != null) { + reloadTaskStatusEndpoint = reloadTaskStatusEndpoint + "&segmentName=" + singleSegmentName; + } + serverUrls.add(reloadTaskStatusEndpoint); + } + + CompletionServiceHelper.CompletionServiceResponse serviceResponse = + completionServiceHelper.doMultiGetRequest(serverUrls, null, true, 10000); + + ServerReloadControllerJobStatusResponse serverReloadControllerJobStatusResponse = + new ServerReloadControllerJobStatusResponse(); + serverReloadControllerJobStatusResponse.setSuccessCount(0); + serverReloadControllerJobStatusResponse.setTotalSegmentCount( + _pinotHelixResourceManager.getSegmentsCount(tableNameWithType)); Review Comment: This is incorrect because it doesn't count the replicas. We can count the total segments in `serverToSegments` for table reload, and total servers for single segment reload ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java: ########## @@ -1984,6 +1988,111 @@ private Set<String> getAllInstancesForTable(String tableNameWithType) { return instanceSet; } + /** + * Returns the ZK metdata for the given jobId + * @param jobId the id of the job + * @return Map representing the job's ZK properties + */ + public Map<String, String> getControllerJobZKMetadata(String jobId) { + String controllerJobResourcePath = ZKMetadataProvider.constructPropertyStorePathForControllerJob(); + try { + ZNRecord taskResourceZnRecord = _propertyStore.get(controllerJobResourcePath, null, -1); + return taskResourceZnRecord.getMapFields().get(jobId); + } catch (ZkNoNodeException e) { + LOGGER.warn("Could not find job metadata for id: {}", jobId, e); + } + return null; + } + + /** + * Returns a Map of jobId to job's ZK metadata for the given table + * @param tableNameWithType the table for which jobs are to be fetched + * @return A Map of jobId to job properties + */ + public Map<String, Map<String, String>> getAllJobsForTable(String tableNameWithType) { + String jobsResourcePath = ZKMetadataProvider.constructPropertyStorePathForControllerJob(); + try { + ZNRecord tableJobsRecord = _propertyStore.get(jobsResourcePath, null, -1); + Map<String, Map<String, String>> controllerJobs = tableJobsRecord.getMapFields(); + return controllerJobs.entrySet().stream().filter( + job -> job.getValue().get(CommonConstants.ControllerJob.CONTROLLER_JOB_TABLE_NAME_WITH_TYPE) + .equals(tableNameWithType)).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + } catch (ZkNoNodeException e) { + LOGGER.warn("Could not find controller job node for table : {}", tableNameWithType, e); + } + + return Collections.emptyMap(); + } + + /** + * Adds a new reload segment job metadata into ZK + * @param tableNameWithType Table for which job is to be added + * @param segmentName Name of the segment being reloaded + * @param jobId job's UUID + * @param numberOfMessagesSent number of messages that were sent to servers. Saved as metadata + * @return boolean representing success / failure of the ZK write step + */ + public boolean addNewReloadSegmentJob(String tableNameWithType, String segmentName, String jobId, + int numberOfMessagesSent) { Review Comment: (nit) we usually use `num` as a short version of `numberOf` ```suggestion int numMessagesSent) { ``` ########## pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java: ########## @@ -557,6 +557,23 @@ public static class Minion { public static final String CONFIG_OF_MINION_QUERY_REWRITER_CLASS_NAMES = "pinot.minion.query.rewriter.class.names"; } + public static class ControllerJob { + /** + * Controller job ZK props + */ + public static final String CONTROLLER_JOB_TYPE = "jobType"; + public static final String CONTROLLER_JOB_TABLE_NAME_WITH_TYPE = "tableName"; + public static final String CONTROLLER_JOB_ID = "jobId"; + public static final String CONTROLLER_JOB_SUBMISSION_TIME = "submissionTime"; + public static final String CONTROLLER_JOB_MESSAGES_COUNT = "messageCount"; Review Comment: (minor) Keep the key and value consistent ```suggestion public static final String MESSAGE_COUNT = "messageCount"; ``` ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java: ########## @@ -1984,6 +1988,111 @@ private Set<String> getAllInstancesForTable(String tableNameWithType) { return instanceSet; } + /** + * Returns the ZK metdata for the given jobId + * @param jobId the id of the job + * @return Map representing the job's ZK properties + */ + public Map<String, String> getControllerJobZKMetadata(String jobId) { + String controllerJobResourcePath = ZKMetadataProvider.constructPropertyStorePathForControllerJob(); + try { + ZNRecord taskResourceZnRecord = _propertyStore.get(controllerJobResourcePath, null, -1); Review Comment: We should not use option -1 here as it is not a valid option (this is not version). We usually use `AccessOption.PERSISTENT`. Same for other places, and we don't need to explicitly handle `ZkNoNodeException` ########## pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java: ########## @@ -2366,16 +2367,44 @@ public void testJDBCClient() Assert.assertTrue(resultSet.getLong(1) > 0); } - private java.sql.Connection getJDBCConnectionFromController(int controllerPort) throws Exception { + private java.sql.Connection getJDBCConnectionFromController(int controllerPort) + throws Exception { PinotDriver pinotDriver = new PinotDriver(); Properties jdbcProps = new Properties(); return pinotDriver.connect("jdbc:pinot://localhost:" + controllerPort, jdbcProps); } - private java.sql.Connection getJDBCConnectionFromBrokers(int controllerPort, int brokerPort) throws Exception { + private java.sql.Connection getJDBCConnectionFromBrokers(int controllerPort, int brokerPort) + throws Exception { PinotDriver pinotDriver = new PinotDriver(); Properties jdbcProps = new Properties(); jdbcProps.put(PinotConnection.BROKER_LIST, "localhost:" + brokerPort); return pinotDriver.connect("jdbc:pinot://localhost:" + controllerPort, jdbcProps); } + + private void reloadOfflineTableAndValidateResponse(String tableName, boolean forceDownload) { Review Comment: Let's also test the job status after reloading is done ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java: ########## @@ -1984,6 +1988,111 @@ private Set<String> getAllInstancesForTable(String tableNameWithType) { return instanceSet; } + /** + * Returns the ZK metdata for the given jobId + * @param jobId the id of the job + * @return Map representing the job's ZK properties + */ + public Map<String, String> getControllerJobZKMetadata(String jobId) { + String controllerJobResourcePath = ZKMetadataProvider.constructPropertyStorePathForControllerJob(); + try { + ZNRecord taskResourceZnRecord = _propertyStore.get(controllerJobResourcePath, null, -1); + return taskResourceZnRecord.getMapFields().get(jobId); + } catch (ZkNoNodeException e) { + LOGGER.warn("Could not find job metadata for id: {}", jobId, e); + } + return null; + } + + /** + * Returns a Map of jobId to job's ZK metadata for the given table + * @param tableNameWithType the table for which jobs are to be fetched + * @return A Map of jobId to job properties + */ + public Map<String, Map<String, String>> getAllJobsForTable(String tableNameWithType) { + String jobsResourcePath = ZKMetadataProvider.constructPropertyStorePathForControllerJob(); + try { + ZNRecord tableJobsRecord = _propertyStore.get(jobsResourcePath, null, -1); + Map<String, Map<String, String>> controllerJobs = tableJobsRecord.getMapFields(); + return controllerJobs.entrySet().stream().filter( + job -> job.getValue().get(CommonConstants.ControllerJob.CONTROLLER_JOB_TABLE_NAME_WITH_TYPE) + .equals(tableNameWithType)).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + } catch (ZkNoNodeException e) { + LOGGER.warn("Could not find controller job node for table : {}", tableNameWithType, e); + } + + return Collections.emptyMap(); + } + + /** + * Adds a new reload segment job metadata into ZK + * @param tableNameWithType Table for which job is to be added + * @param segmentName Name of the segment being reloaded + * @param jobId job's UUID + * @param numberOfMessagesSent number of messages that were sent to servers. Saved as metadata + * @return boolean representing success / failure of the ZK write step + */ + public boolean addNewReloadSegmentJob(String tableNameWithType, String segmentName, String jobId, + int numberOfMessagesSent) { + Map<String, String> jobMetadata = new HashMap<>(); + jobMetadata.put(CommonConstants.ControllerJob.CONTROLLER_JOB_ID, jobId); + jobMetadata.put(CommonConstants.ControllerJob.CONTROLLER_JOB_TABLE_NAME_WITH_TYPE, tableNameWithType); + jobMetadata.put(CommonConstants.ControllerJob.CONTROLLER_JOB_TYPE, ControllerJobType.RELOAD_SEGMENT.toString()); + jobMetadata.put(CommonConstants.ControllerJob.CONTROLLER_JOB_SUBMISSION_TIME, + Long.toString(System.currentTimeMillis())); + jobMetadata.put(CommonConstants.ControllerJob.CONTROLLER_JOB_MESSAGES_COUNT, + Integer.toString(numberOfMessagesSent)); + jobMetadata.put(CommonConstants.ControllerJob.SEGMENT_RELOAD_JOB_SEGMENT_NAME, segmentName); + return addReloadJobToZK(tableNameWithType, jobId, jobMetadata); + } + + /** + * Adds a new reload segment job metadata into ZK + * @param tableNameWithType Table for which job is to be added + * @param jobId job's UUID + * @param numberOfMessagesSent number of messages that were sent to servers. Saved as metadata + * @return boolean representing success / failure of the ZK write step + */ + public boolean addNewReloadAllSegmentsJob(String tableNameWithType, String jobId, int numberOfMessagesSent) { + Map<String, String> jobMetadata = new HashMap<>(); + jobMetadata.put(CommonConstants.ControllerJob.CONTROLLER_JOB_ID, jobId); + jobMetadata.put(CommonConstants.ControllerJob.CONTROLLER_JOB_TABLE_NAME_WITH_TYPE, tableNameWithType); + jobMetadata.put(CommonConstants.ControllerJob.CONTROLLER_JOB_TYPE, + ControllerJobType.RELOAD_ALL_SEGMENTS.toString()); + jobMetadata.put(CommonConstants.ControllerJob.CONTROLLER_JOB_SUBMISSION_TIME, + Long.toString(System.currentTimeMillis())); + jobMetadata.put(CommonConstants.ControllerJob.CONTROLLER_JOB_MESSAGES_COUNT, + Integer.toString(numberOfMessagesSent)); + return addReloadJobToZK(tableNameWithType, jobId, jobMetadata); + } + + private boolean addReloadJobToZK(String tableNameWithType, String taskId, Map<String, String> jobMetadata) { Review Comment: ```suggestion private boolean addReloadJobToZK(String jobId, Map<String, String> jobMetadata) { ``` ########## pinot-server/src/main/java/org/apache/pinot/server/api/resources/ControllerJobStatusResource.java: ########## @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.server.api.resources; + +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiOperation; +import java.util.List; +import javax.inject.Inject; +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import org.apache.pinot.segment.local.data.manager.SegmentDataManager; +import org.apache.pinot.segment.local.data.manager.TableDataManager; +import org.apache.pinot.server.starter.ServerInstance; +import org.apache.pinot.server.starter.helix.SegmentReloadStatusValue; +import org.apache.pinot.spi.utils.JsonUtils; + + +@Api(tags = "Tasks") +@Path("/") +public class ControllerJobStatusResource { + + @Inject + private ServerInstance _serverInstance; + + @GET + @Path("/controllerJob/reloadStatus/{tableNameWithType}") + @Produces(MediaType.APPLICATION_JSON) + @ApiOperation(value = "Task status", notes = "Return status of the given task") + public String taskStatus(@PathParam("tableNameWithType") String tableNameWithType, + @QueryParam("reloadJobTimestamp") long reloadJobSubmissionTimestamp, + @QueryParam("segmentName") String segmentName) + throws Exception { + TableDataManager tableDataManager = + ServerResourceUtils.checkGetTableDataManager(_serverInstance, tableNameWithType); + + if (segmentName == null) { + // All segments + List<SegmentDataManager> allSegments = tableDataManager.acquireAllSegments(); + try { + long successCount = 0; + for (SegmentDataManager segmentDataManager : allSegments) { + if (segmentDataManager.getLoadTimeMs() >= reloadJobSubmissionTimestamp) { + successCount++; + } + } + SegmentReloadStatusValue segmentReloadStatusValue = + new SegmentReloadStatusValue(allSegments.size(), successCount); + return JsonUtils.objectToString(segmentReloadStatusValue); + } finally { + for (SegmentDataManager segmentDataManager : allSegments) { + tableDataManager.releaseSegment(segmentDataManager); + } + } + } else { + SegmentDataManager segmentDataManager = tableDataManager.acquireSegment(segmentName); + if (segmentDataManager == null) { + throw new WebApplicationException("Segment: " + segmentName + " is not found", Response.Status.NOT_FOUND); Review Comment: I feel returning `0/0` might be preferred so that we can differentiate server not responding vs server not having the segment -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org