xiangfu0 commented on code in PR #10815: URL: https://github.com/apache/pinot/pull/10815#discussion_r1329564462
########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java: ########## @@ -1465,6 +1469,78 @@ public void uploadToDeepStoreIfMissing(TableConfig tableConfig, List<SegmentZKMe } } + /** + * Delete tmp segments for realtime table with low level consumer, split commit and async deletion is enabled. + * @param tableNameWithType + * @param segmentsZKMetadata + * @return number of deleted orphan temporary segments + * + */ + public long deleteTmpSegments(String tableNameWithType, List<SegmentZKMetadata> segmentsZKMetadata) { + Preconditions.checkState(!_isStopping, "Segment manager is stopping"); + + if (!TableNameBuilder.isRealtimeTableResource(tableNameWithType)) { + return 0L; + } + + TableConfig tableConfig = _helixResourceManager.getTableConfig(tableNameWithType); + if (tableConfig == null) { + LOGGER.warn("Failed to find table config for table: {}, skipping deletion of tmp segments", tableNameWithType); + return 0L; + } + + if (!isLowLevelConsumer(tableNameWithType, tableConfig) + || !getIsSplitCommitEnabled() + || !isTmpSegmentAsyncDeletionEnabled()) { + return 0L; + } + + Set<String> deepURIs = segmentsZKMetadata.stream().filter(meta -> meta.getStatus() == Status.DONE + && !CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD.equals(meta.getDownloadUrl())).map( + SegmentZKMetadata::getDownloadUrl).collect( + Collectors.toSet()); + + String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType); + URI tableDirURI = URIUtils.getUri(_controllerConf.getDataDir(), rawTableName); + PinotFS pinotFS = PinotFSFactory.create(tableDirURI.getScheme()); + long deletedTmpSegments = 0; + try { + for (String filePath : pinotFS.listFiles(tableDirURI, false)) { + // prepend scheme + URI uri = URIUtils.getUri(filePath); + if (isTmpAndCanDelete(uri, deepURIs, pinotFS)) { + LOGGER.info("Deleting temporary segment file: {}", uri); + if (pinotFS.delete(uri, true)) { + LOGGER.info("Succeed to delete file: {}", uri); + deletedTmpSegments++; + } else { + LOGGER.warn("Failed to delete file: {}", uri); + } + } + } + } catch (Exception e) { + LOGGER.warn("Caught exception while deleting temporary files for table: {}", rawTableName, e); + } + return deletedTmpSegments; + } + + private boolean isTmpAndCanDelete(URI uri, Set<String> deepURIs, PinotFS pinotFS) throws Exception { + long lastModified = pinotFS.lastModified(uri); + if (lastModified <= 0) { + LOGGER.warn("file {} modification time {} is not positive, ineligible for delete", uri.toString(), lastModified); + return false; + } + String uriString = uri.toString(); + return SegmentCompletionUtils.isTmpFile(uriString) && !deepURIs.contains(uriString) + && getCurrentTimeMs() - lastModified > _controllerConf.getTmpSegmentRetentionInSeconds() * 1000L; + } + + private boolean isLowLevelConsumer(String tableNameWithType, TableConfig tableConfig) { Review Comment: this `isLowLevelConsumer` is deprecated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org