rajagopr commented on code in PR #15030: URL: https://github.com/apache/pinot/pull/15030#discussion_r1951975063
########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentPushUtils.java: ########## @@ -458,10 +438,71 @@ public static void sendSegmentsUriAndMetadata(SegmentGenerationJobSpec spec, Pin }); } } finally { - for (Map.Entry<String, File> metadataFileEntry: segmentMetadataFileMap.entrySet()) { + for (Map.Entry<String, File> metadataFileEntry : segmentMetadataFileMap.entrySet()) { FileUtils.deleteQuietly(metadataFileEntry.getValue()); } - FileUtils.forceDelete(allSegmentsMetadataTarFile); + if (allSegmentsMetadataTarFile != null) { + FileUtils.deleteQuietly(allSegmentsMetadataTarFile); + } + } + } + + @VisibleForTesting + public static void generateSegmentMetadataFiles(SegmentGenerationJobSpec spec, PinotFS fileSystem, + Map<String, String> segmentUriToTarPathMap, ConcurrentHashMap<String, File> segmentMetadataFileMap, + ConcurrentLinkedQueue<String> segmentURIs) { + int nThreads = spec.getPushJobSpec().getSegmentMetadataGenerationParallelism(); Review Comment: We should set a default value here for scenarios where this new parameter would not be set. This would be the case for the current minion tasks. ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentPushUtils.java: ########## @@ -458,10 +438,71 @@ public static void sendSegmentsUriAndMetadata(SegmentGenerationJobSpec spec, Pin }); } } finally { - for (Map.Entry<String, File> metadataFileEntry: segmentMetadataFileMap.entrySet()) { + for (Map.Entry<String, File> metadataFileEntry : segmentMetadataFileMap.entrySet()) { FileUtils.deleteQuietly(metadataFileEntry.getValue()); } - FileUtils.forceDelete(allSegmentsMetadataTarFile); + if (allSegmentsMetadataTarFile != null) { + FileUtils.deleteQuietly(allSegmentsMetadataTarFile); + } + } + } + + @VisibleForTesting + public static void generateSegmentMetadataFiles(SegmentGenerationJobSpec spec, PinotFS fileSystem, + Map<String, String> segmentUriToTarPathMap, ConcurrentHashMap<String, File> segmentMetadataFileMap, + ConcurrentLinkedQueue<String> segmentURIs) { + int nThreads = spec.getPushJobSpec().getSegmentMetadataGenerationParallelism(); + ExecutorService executor = Executors.newFixedThreadPool(nThreads); + List<Future<Void>> futures = new ArrayList<>(); + + for (String segmentUriPath : segmentUriToTarPathMap.keySet()) { + futures.add( + executor.submit(() -> { + String tarFilePath = segmentUriToTarPathMap.get(segmentUriPath); + String fileName = new File(tarFilePath).getName(); + // segments stored in Pinot deep store do not have .tar.gz extension + String segmentName = fileName.endsWith(Constants.TAR_GZ_FILE_EXT) + ? fileName.substring(0, fileName.length() - Constants.TAR_GZ_FILE_EXT.length()) : fileName; + SegmentNameUtils.validatePartialOrFullSegmentName(segmentName); + File segmentMetadataFile; + // Check if there is a segment metadata tar gz file named `segmentName.metadata.tar.gz`, already in the + // remote directory. This is to avoid generating a new segment metadata tar gz file every time we push a + // segment, which requires downloading the entire segment tar gz file. + + URI metadataTarGzFilePath = generateSegmentMetadataURI(tarFilePath, segmentName); + LOGGER.info("Checking if metadata tar gz file {} exists", metadataTarGzFilePath); + if (spec.getPushJobSpec().isPreferMetadataTarGz() && fileSystem.exists(metadataTarGzFilePath)) { + segmentMetadataFile = new File(FileUtils.getTempDirectory(), + SegmentUploadConstants.SEGMENT_METADATA_DIR_PREFIX + UUID.randomUUID() + + TarCompressionUtils.TAR_GZ_FILE_EXTENSION); + if (segmentMetadataFile.exists()) { + FileUtils.forceDelete(segmentMetadataFile); + } + fileSystem.copyToLocalFile(metadataTarGzFilePath, segmentMetadataFile); + } else { + segmentMetadataFile = generateSegmentMetadataFile(fileSystem, URI.create(tarFilePath)); + } + segmentMetadataFileMap.put(segmentName, segmentMetadataFile); + segmentURIs.add(segmentName); + segmentURIs.add(segmentUriPath); + return null; + })); + } + executor.shutdown(); Review Comment: The docs for the `shutdown` method say the following. ``` This method does not wait for previously submitted tasks to complete execution. ``` This won't work for us right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org