klsince commented on code in PR #15142: URL: https://github.com/apache/pinot/pull/15142#discussion_r1985496441
########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java: ########## @@ -136,11 +166,101 @@ private void manageRetentionForOfflineTable(String offlineTableName, RetentionSt } } + /** + * Identifies segments in deepstore that are ready for deletion based on the retention strategy. + * + * This method finds segments that are beyond the retention period and are ready to be purged. + * It only considers segments that do not have entries in ZooKeeper metadata. + * The lastModified time of the file in deepstore is used to determine whether the segment + * should be retained or purged. + * + * @param tableNameWithType Name of the offline table + * @param retentionStrategy Strategy to determine if a segment should be purged + * @param segmentsToExclude List of segment names that should be excluded from deletion + * @return List of segment names that should be deleted from deepstore + * @throws IOException If there's an error accessing the filesystem + */ + private List<String> getSegmentsToDeleteFromDeepstore(String tableNameWithType, RetentionStrategy retentionStrategy, + List<String> segmentsToExclude) + throws IOException { + + List<String> segmentsToDelete = new ArrayList<>(); + String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType); + URI tableDataUri = URIUtils.getUri(_pinotHelixResourceManager.getDataDir(), rawTableName); + PinotFS pinotFS = PinotFSFactory.create(tableDataUri.getScheme()); + + long startTimeMs = System.currentTimeMillis(); + + List<FileMetadata> deepstoreFiles = pinotFS.listFilesWithMetadata(tableDataUri, false); + long listEndTimeMs = System.currentTimeMillis(); + LOGGER.info("Found: {} segments in deepstore for table: {}. Time taken to list segments: {} ms", Review Comment: nit: `Found ... in {} ms` to be short ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java: ########## @@ -136,11 +166,101 @@ private void manageRetentionForOfflineTable(String offlineTableName, RetentionSt } } + /** + * Identifies segments in deepstore that are ready for deletion based on the retention strategy. + * + * This method finds segments that are beyond the retention period and are ready to be purged. + * It only considers segments that do not have entries in ZooKeeper metadata. + * The lastModified time of the file in deepstore is used to determine whether the segment + * should be retained or purged. + * + * @param tableNameWithType Name of the offline table + * @param retentionStrategy Strategy to determine if a segment should be purged + * @param segmentsToExclude List of segment names that should be excluded from deletion + * @return List of segment names that should be deleted from deepstore + * @throws IOException If there's an error accessing the filesystem + */ + private List<String> getSegmentsToDeleteFromDeepstore(String tableNameWithType, RetentionStrategy retentionStrategy, + List<String> segmentsToExclude) + throws IOException { + + List<String> segmentsToDelete = new ArrayList<>(); + String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType); + URI tableDataUri = URIUtils.getUri(_pinotHelixResourceManager.getDataDir(), rawTableName); + PinotFS pinotFS = PinotFSFactory.create(tableDataUri.getScheme()); + + long startTimeMs = System.currentTimeMillis(); + + List<FileMetadata> deepstoreFiles = pinotFS.listFilesWithMetadata(tableDataUri, false); + long listEndTimeMs = System.currentTimeMillis(); + LOGGER.info("Found: {} segments in deepstore for table: {}. Time taken to list segments: {} ms", + deepstoreFiles.size(), tableNameWithType, listEndTimeMs - startTimeMs); + + for (FileMetadata fileMetadata : deepstoreFiles) { + if (fileMetadata.isDirectory()) { + continue; + } + + String segmentName = extractSegmentName(fileMetadata.getFilePath()); + if (Strings.isEmpty(segmentName) || segmentsToExclude.contains(segmentName)) { + continue; + } + + // determine whether the segment should be purged or not based on the last modified time of the file + long lastModifiedTime = fileMetadata.getLastModifiedTime(); + + if (retentionStrategy.isPurgeable(tableNameWithType, segmentName, lastModifiedTime)) { + segmentsToDelete.add(segmentName); + } + } + long endTimeMs = System.currentTimeMillis(); + LOGGER.info( Review Comment: perhaps combine the two INFO together, as "Found {} segments ... have no corresponding ZK metadata, in {} msg" ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java: ########## @@ -136,11 +166,101 @@ private void manageRetentionForOfflineTable(String offlineTableName, RetentionSt } } + /** + * Identifies segments in deepstore that are ready for deletion based on the retention strategy. + * + * This method finds segments that are beyond the retention period and are ready to be purged. + * It only considers segments that do not have entries in ZooKeeper metadata. + * The lastModified time of the file in deepstore is used to determine whether the segment + * should be retained or purged. + * + * @param tableNameWithType Name of the offline table + * @param retentionStrategy Strategy to determine if a segment should be purged + * @param segmentsToExclude List of segment names that should be excluded from deletion + * @return List of segment names that should be deleted from deepstore + * @throws IOException If there's an error accessing the filesystem + */ + private List<String> getSegmentsToDeleteFromDeepstore(String tableNameWithType, RetentionStrategy retentionStrategy, + List<String> segmentsToExclude) + throws IOException { + + List<String> segmentsToDelete = new ArrayList<>(); + String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType); + URI tableDataUri = URIUtils.getUri(_pinotHelixResourceManager.getDataDir(), rawTableName); + PinotFS pinotFS = PinotFSFactory.create(tableDataUri.getScheme()); + + long startTimeMs = System.currentTimeMillis(); + + List<FileMetadata> deepstoreFiles = pinotFS.listFilesWithMetadata(tableDataUri, false); + long listEndTimeMs = System.currentTimeMillis(); + LOGGER.info("Found: {} segments in deepstore for table: {}. Time taken to list segments: {} ms", + deepstoreFiles.size(), tableNameWithType, listEndTimeMs - startTimeMs); + + for (FileMetadata fileMetadata : deepstoreFiles) { + if (fileMetadata.isDirectory()) { + continue; + } + + String segmentName = extractSegmentName(fileMetadata.getFilePath()); + if (Strings.isEmpty(segmentName) || segmentsToExclude.contains(segmentName)) { + continue; + } + + // determine whether the segment should be purged or not based on the last modified time of the file + long lastModifiedTime = fileMetadata.getLastModifiedTime(); + + if (retentionStrategy.isPurgeable(tableNameWithType, segmentName, lastModifiedTime)) { + segmentsToDelete.add(segmentName); + } + } + long endTimeMs = System.currentTimeMillis(); + LOGGER.info( + "Took: {} ms to filter segments to delete from deepstore for table: {} that don't have a corresponding entry " + + "in property store", + endTimeMs - startTimeMs, tableNameWithType); + LOGGER.info( + "Deleting: {} segments from deep store for table: {} as they have no corresponding entry in the property " + + "store.", segmentsToDelete.size(), tableNameWithType); + + return segmentsToDelete; + } + + @Nullable + private String extractSegmentName(@Nullable String filePath) { + if (Strings.isEmpty(filePath)) { + return null; + } + String segmentName = filePath.substring(filePath.lastIndexOf("/") + 1); + if (segmentName.endsWith(TarCompressionUtils.TAR_GZ_FILE_EXTENSION)) { + segmentName = segmentName.substring(0, segmentName.length() - TarCompressionUtils.TAR_GZ_FILE_EXTENSION.length()); + } + return segmentName; + } + private void manageRetentionForRealtimeTable(String realtimeTableName, RetentionStrategy retentionStrategy) { + List<SegmentZKMetadata> segmentZKMetadataList = _pinotHelixResourceManager.getSegmentsZKMetadata(realtimeTableName); + List<String> segmentsPresentInZK = + segmentZKMetadataList.stream().map(SegmentZKMetadata::getSegmentName).collect(Collectors.toList()); + List<String> segmentsToDelete = new ArrayList<>(); + + // fetch those segments that are beyond the retention period and don't have an entry in ZK i.e. + // SegmentZkMetadata is missing for those segments + try { + LOGGER.info("Fetch segments present in deep store that are beyond retention period for table: {}", + realtimeTableName); + segmentsToDelete = getSegmentsToDeleteFromDeepstore(realtimeTableName, retentionStrategy, segmentsPresentInZK); + _controllerMetrics.setValueOfTableGauge(realtimeTableName, ControllerGauge.UNTRACKED_SEGMENTS_COUNT, + segmentsToDelete.size()); + } catch (IOException e) { + LOGGER.warn("Unable to fetch segments from deep store that are beyond retention period for table: {}", + realtimeTableName); + } Review Comment: make a helper method from L241-L258 and use it for both manageRetentionForOfflineTable and manageRetentionForREaltimeTable? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org