Jackie-Jiang commented on code in PR #8828: URL: https://github.com/apache/pinot/pull/8828#discussion_r910907030
########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/SegmentDataManager.java: ########## @@ -27,6 +27,15 @@ */ public abstract class SegmentDataManager { private int _referenceCount = 1; + private long _segmentLoadTimeInMillisEpoch = System.currentTimeMillis(); Review Comment: (minor) `_loadTimeMs` for simplicity, same for the getter and setter ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java: ########## @@ -1977,6 +1979,80 @@ private Set<String> getAllInstancesForTable(String tableNameWithType) { return instanceSet; } + public Map<String, String> getControllerJobZKMetadata(String tableNameWithType, String taskId) { + String controllerJobResourcePath = ZKMetadataProvider.constructPropertyStorePathForControllerJob(tableNameWithType); + if (_propertyStore.exists(controllerJobResourcePath, AccessOption.PERSISTENT)) { + ZNRecord taskResourceZnRecord = _propertyStore.get(controllerJobResourcePath, null, -1); + return taskResourceZnRecord.getMapFields().get(taskId); + } else { + return null; + } + } + + public Map<String, Map<String, String>> getAllJobsForTable(String tableNameWithType) { + String jobsResourcePath = ZKMetadataProvider.constructPropertyStorePathForControllerJob(tableNameWithType); + if (_propertyStore.exists(jobsResourcePath, AccessOption.PERSISTENT)) { + ZNRecord tableJobsRecord = _propertyStore.get(jobsResourcePath, null, -1); + return tableJobsRecord.getMapFields(); + } else { + return Collections.emptyMap(); + } + } + + public void addNewReloadSegmentJob(String tableNameWithType, String segmentName, String jobId, + int numberOfMessagesSent) { + Map<String, String> jobMetadata = new HashMap<>(); + jobMetadata.put(CommonConstants.ControllerJob.CONTROLLER_JOB_ID, jobId); + jobMetadata.put(CommonConstants.ControllerJob.CONTROLLER_JOB_TABLE_NAME_WITH_TYPE, tableNameWithType); + jobMetadata.put(CommonConstants.ControllerJob.CONTROLLER_JOB_TYPE, ControllerJobType.RELOAD_SEGMENT.toString()); + jobMetadata.put(CommonConstants.ControllerJob.CONTROLLER_JOB_SUBMISSION_TIME, + Long.toString(System.currentTimeMillis())); + jobMetadata.put(CommonConstants.ControllerJob.CONTROLLER_JOB_MESSAGES_COUNT, + Integer.toString(numberOfMessagesSent)); + jobMetadata.put(CommonConstants.ControllerJob.SEGMENT_RELOAD_JOB_SEGMENT_NAME, segmentName); + addReloadJobToZK(tableNameWithType, jobId, jobMetadata); + } + + public void addNewReloadAllSegmentsJob(String tableNameWithType, String jobId, int numberOfMessagesSent) { + Map<String, String> jobMetadata = new HashMap<>(); + jobMetadata.put(CommonConstants.ControllerJob.CONTROLLER_JOB_ID, jobId); + jobMetadata.put(CommonConstants.ControllerJob.CONTROLLER_JOB_TABLE_NAME_WITH_TYPE, tableNameWithType); + jobMetadata.put(CommonConstants.ControllerJob.CONTROLLER_JOB_TYPE, + ControllerJobType.RELOAD_ALL_SEGMENTS.toString()); + jobMetadata.put(CommonConstants.ControllerJob.CONTROLLER_JOB_SUBMISSION_TIME, + Long.toString(System.currentTimeMillis())); + jobMetadata.put(CommonConstants.ControllerJob.CONTROLLER_JOB_MESSAGES_COUNT, + Integer.toString(numberOfMessagesSent)); + addReloadJobToZK(tableNameWithType, jobId, jobMetadata); + } + + private void addReloadJobToZK(String tableNameWithType, String taskId, Map<String, String> taskMetadata) { + String jobResourcePath = ZKMetadataProvider.constructPropertyStorePathForControllerJob(tableNameWithType); + ZNRecord tableJobsZnRecord; + + if (_propertyStore.exists(jobResourcePath, AccessOption.PERSISTENT)) { Review Comment: No need to check if the record exists. We can directly `get()` and check if the return is `null`. It can also prevent race condition. When the record exist, we should track the record version (passing a `Stat` into the `get()`), and check the expected version when setting the record to prevent race condition ########## pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java: ########## @@ -425,9 +429,17 @@ public SuccessResponse reloadSegment( TableType tableType = SegmentName.isRealtimeSegmentName(segmentName) ? TableType.REALTIME : TableType.OFFLINE; String tableNameWithType = ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager, tableName, tableType, LOGGER).get(0); - int numMessagesSent = _pinotHelixResourceManager.reloadSegment(tableNameWithType, segmentName, forceDownload); - if (numMessagesSent > 0) { - return new SuccessResponse("Sent " + numMessagesSent + " reload messages"); + Pair<Integer, String> msgInfo = + _pinotHelixResourceManager.reloadSegment(tableNameWithType, segmentName, forceDownload); + if (msgInfo.getLeft() > 0) { + try { + _pinotHelixResourceManager.addNewReloadSegmentJob(tableNameWithType, segmentName, msgInfo.getRight(), + msgInfo.getLeft()); + } catch (Exception e) { + LOGGER.error("Failed to add reload segment job meta into zookeeper for table {}, segment {}", + tableNameWithType, segmentName, e); + } + return new SuccessResponse("Sent " + msgInfo + " reload messages"); Review Comment: ```suggestion return new SuccessResponse("Sent " + msgInfo.getLeft() + " reload messages"); ``` ########## pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java: ########## @@ -425,9 +429,17 @@ public SuccessResponse reloadSegment( TableType tableType = SegmentName.isRealtimeSegmentName(segmentName) ? TableType.REALTIME : TableType.OFFLINE; String tableNameWithType = ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager, tableName, tableType, LOGGER).get(0); - int numMessagesSent = _pinotHelixResourceManager.reloadSegment(tableNameWithType, segmentName, forceDownload); - if (numMessagesSent > 0) { - return new SuccessResponse("Sent " + numMessagesSent + " reload messages"); + Pair<Integer, String> msgInfo = + _pinotHelixResourceManager.reloadSegment(tableNameWithType, segmentName, forceDownload); + if (msgInfo.getLeft() > 0) { + try { + _pinotHelixResourceManager.addNewReloadSegmentJob(tableNameWithType, segmentName, msgInfo.getRight(), + msgInfo.getLeft()); + } catch (Exception e) { + LOGGER.error("Failed to add reload segment job meta into zookeeper for table {}, segment {}", Review Comment: Suggest reflecting this back to the user, instead of just logging an error and return success. User won't be able to read the reload status -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org