rajagopr commented on code in PR #15030:
URL: https://github.com/apache/pinot/pull/15030#discussion_r1951975063


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentPushUtils.java:
##########
@@ -458,10 +438,71 @@ public static void 
sendSegmentsUriAndMetadata(SegmentGenerationJobSpec spec, Pin
         });
       }
     } finally {
-      for (Map.Entry<String, File> metadataFileEntry: 
segmentMetadataFileMap.entrySet()) {
+      for (Map.Entry<String, File> metadataFileEntry : 
segmentMetadataFileMap.entrySet()) {
         FileUtils.deleteQuietly(metadataFileEntry.getValue());
       }
-      FileUtils.forceDelete(allSegmentsMetadataTarFile);
+      if (allSegmentsMetadataTarFile != null) {
+        FileUtils.deleteQuietly(allSegmentsMetadataTarFile);
+      }
+    }
+  }
+
+  @VisibleForTesting
+  public static void generateSegmentMetadataFiles(SegmentGenerationJobSpec 
spec, PinotFS fileSystem,
+      Map<String, String> segmentUriToTarPathMap, ConcurrentHashMap<String, 
File> segmentMetadataFileMap,
+      ConcurrentLinkedQueue<String> segmentURIs) {
+    int nThreads = 
spec.getPushJobSpec().getSegmentMetadataGenerationParallelism();

Review Comment:
   We should set a default value here for scenarios where this new parameter 
would not be set. This would be the case for the current minion tasks.



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentPushUtils.java:
##########
@@ -458,10 +438,71 @@ public static void 
sendSegmentsUriAndMetadata(SegmentGenerationJobSpec spec, Pin
         });
       }
     } finally {
-      for (Map.Entry<String, File> metadataFileEntry: 
segmentMetadataFileMap.entrySet()) {
+      for (Map.Entry<String, File> metadataFileEntry : 
segmentMetadataFileMap.entrySet()) {
         FileUtils.deleteQuietly(metadataFileEntry.getValue());
       }
-      FileUtils.forceDelete(allSegmentsMetadataTarFile);
+      if (allSegmentsMetadataTarFile != null) {
+        FileUtils.deleteQuietly(allSegmentsMetadataTarFile);
+      }
+    }
+  }
+
+  @VisibleForTesting
+  public static void generateSegmentMetadataFiles(SegmentGenerationJobSpec 
spec, PinotFS fileSystem,
+      Map<String, String> segmentUriToTarPathMap, ConcurrentHashMap<String, 
File> segmentMetadataFileMap,
+      ConcurrentLinkedQueue<String> segmentURIs) {
+    int nThreads = 
spec.getPushJobSpec().getSegmentMetadataGenerationParallelism();
+    ExecutorService executor = Executors.newFixedThreadPool(nThreads);
+    List<Future<Void>> futures = new ArrayList<>();
+
+    for (String segmentUriPath : segmentUriToTarPathMap.keySet()) {
+      futures.add(
+          executor.submit(() -> {
+            String tarFilePath = segmentUriToTarPathMap.get(segmentUriPath);
+            String fileName = new File(tarFilePath).getName();
+            // segments stored in Pinot deep store do not have .tar.gz 
extension
+            String segmentName = fileName.endsWith(Constants.TAR_GZ_FILE_EXT)
+                ? fileName.substring(0, fileName.length() - 
Constants.TAR_GZ_FILE_EXT.length()) : fileName;
+            SegmentNameUtils.validatePartialOrFullSegmentName(segmentName);
+            File segmentMetadataFile;
+            // Check if there is a segment metadata tar gz file named 
`segmentName.metadata.tar.gz`, already in the
+            // remote directory. This is to avoid generating a new segment 
metadata tar gz file every time we push a
+            // segment, which requires downloading the entire segment tar gz 
file.
+
+            URI metadataTarGzFilePath = 
generateSegmentMetadataURI(tarFilePath, segmentName);
+            LOGGER.info("Checking if metadata tar gz file {} exists", 
metadataTarGzFilePath);
+            if (spec.getPushJobSpec().isPreferMetadataTarGz() && 
fileSystem.exists(metadataTarGzFilePath)) {
+              segmentMetadataFile = new File(FileUtils.getTempDirectory(),
+                  SegmentUploadConstants.SEGMENT_METADATA_DIR_PREFIX + 
UUID.randomUUID()
+                      + TarCompressionUtils.TAR_GZ_FILE_EXTENSION);
+              if (segmentMetadataFile.exists()) {
+                FileUtils.forceDelete(segmentMetadataFile);
+              }
+              fileSystem.copyToLocalFile(metadataTarGzFilePath, 
segmentMetadataFile);
+            } else {
+              segmentMetadataFile = generateSegmentMetadataFile(fileSystem, 
URI.create(tarFilePath));
+            }
+            segmentMetadataFileMap.put(segmentName, segmentMetadataFile);
+            segmentURIs.add(segmentName);
+            segmentURIs.add(segmentUriPath);
+            return null;
+          }));
+    }
+    executor.shutdown();

Review Comment:
   The docs for the `shutdown` method say the following.
   ```
   This method does not wait for previously submitted tasks to complete 
execution.
   ```
   
   This won't work for us right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to