abhishekbafna commented on code in PR #15030:
URL: https://github.com/apache/pinot/pull/15030#discussion_r1952081902


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentPushUtils.java:
##########
@@ -458,10 +438,71 @@ public static void 
sendSegmentsUriAndMetadata(SegmentGenerationJobSpec spec, Pin
         });
       }
     } finally {
-      for (Map.Entry<String, File> metadataFileEntry: 
segmentMetadataFileMap.entrySet()) {
+      for (Map.Entry<String, File> metadataFileEntry : 
segmentMetadataFileMap.entrySet()) {
         FileUtils.deleteQuietly(metadataFileEntry.getValue());
       }
-      FileUtils.forceDelete(allSegmentsMetadataTarFile);
+      if (allSegmentsMetadataTarFile != null) {
+        FileUtils.deleteQuietly(allSegmentsMetadataTarFile);
+      }
+    }
+  }
+
+  @VisibleForTesting
+  public static void generateSegmentMetadataFiles(SegmentGenerationJobSpec 
spec, PinotFS fileSystem,
+      Map<String, String> segmentUriToTarPathMap, ConcurrentHashMap<String, 
File> segmentMetadataFileMap,
+      ConcurrentLinkedQueue<String> segmentURIs) {
+    int nThreads = 
spec.getPushJobSpec().getSegmentMetadataGenerationParallelism();
+    ExecutorService executor = Executors.newFixedThreadPool(nThreads);
+    List<Future<Void>> futures = new ArrayList<>();
+
+    for (String segmentUriPath : segmentUriToTarPathMap.keySet()) {
+      futures.add(
+          executor.submit(() -> {
+            String tarFilePath = segmentUriToTarPathMap.get(segmentUriPath);
+            String fileName = new File(tarFilePath).getName();
+            // segments stored in Pinot deep store do not have .tar.gz 
extension
+            String segmentName = fileName.endsWith(Constants.TAR_GZ_FILE_EXT)
+                ? fileName.substring(0, fileName.length() - 
Constants.TAR_GZ_FILE_EXT.length()) : fileName;
+            SegmentNameUtils.validatePartialOrFullSegmentName(segmentName);
+            File segmentMetadataFile;
+            // Check if there is a segment metadata tar gz file named 
`segmentName.metadata.tar.gz`, already in the
+            // remote directory. This is to avoid generating a new segment 
metadata tar gz file every time we push a
+            // segment, which requires downloading the entire segment tar gz 
file.
+
+            URI metadataTarGzFilePath = 
generateSegmentMetadataURI(tarFilePath, segmentName);
+            LOGGER.info("Checking if metadata tar gz file {} exists", 
metadataTarGzFilePath);
+            if (spec.getPushJobSpec().isPreferMetadataTarGz() && 
fileSystem.exists(metadataTarGzFilePath)) {
+              segmentMetadataFile = new File(FileUtils.getTempDirectory(),
+                  SegmentUploadConstants.SEGMENT_METADATA_DIR_PREFIX + 
UUID.randomUUID()
+                      + TarCompressionUtils.TAR_GZ_FILE_EXTENSION);
+              if (segmentMetadataFile.exists()) {
+                FileUtils.forceDelete(segmentMetadataFile);
+              }
+              fileSystem.copyToLocalFile(metadataTarGzFilePath, 
segmentMetadataFile);
+            } else {
+              segmentMetadataFile = generateSegmentMetadataFile(fileSystem, 
URI.create(tarFilePath));
+            }
+            segmentMetadataFileMap.put(segmentName, segmentMetadataFile);
+            segmentURIs.add(segmentName);
+            segmentURIs.add(segmentUriPath);
+            return null;
+          }));
+    }
+    executor.shutdown();

Review Comment:
   Yes. That is correct. I have moved the shutdown to post future for loop. 
Thank you.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to