abhishekbafna commented on code in PR #16249:
URL: https://github.com/apache/pinot/pull/16249#discussion_r2204607770


##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java:
##########
@@ -190,35 +195,26 @@ public List<SegmentConversionResult> 
executeTask(PinotTaskConfig pinotTaskConfig
     String downloadURLString = 
taskConfigs.get(MinionConstants.DOWNLOAD_URL_KEY);
     String[] downloadURLs = 
downloadURLString.split(MinionConstants.URL_SEPARATOR);
     AuthProvider authProvider = 
AuthProviderUtils.makeAuthProvider(taskConfigs.get(MinionConstants.AUTH_TOKEN));
-    LOGGER.info("Start executing {} on table: {}, input segments: {} with 
downloadURLs: {}, uploadURL: {}", taskType,
-        tableNameWithType, inputSegmentNames, downloadURLString, uploadURL);
     File tempDataDir = new File(new File(MINION_CONTEXT.getDataDir(), 
taskType), "tmp-" + UUID.randomUUID());
     Preconditions.checkState(tempDataDir.mkdirs());
+    int numRecords;
+    List<File> inputSegmentDirs = new ArrayList<>(downloadURLs.length);
+    Map<String, SegmentMetadata> segmentMetadataMap = 
Collections.synchronizedMap(new HashMap<>(downloadURLs.length));
+    int nThreads = Math.min(getParallelism(taskConfigs), downloadURLs.length);
+    LOGGER.info(
+        "Start executing {} on table: {}, input segments: {} with 
downloadURLs: {}, uploadURL: {}, thread pool size:{}",
+        taskType, tableNameWithType, inputSegmentNames, downloadURLString, 
uploadURL, nThreads);
     try {
-      List<File> inputSegmentDirs = new ArrayList<>();
-      int numRecords = 0;
-
-      for (int i = 0; i < downloadURLs.length; i++) {
-        String segmentName = segmentNames[i];
-        // Download and decompress the segment file
-        _eventObserver.notifyProgress(_pinotTaskConfig, "Downloading and 
decompressing segment from: " + downloadURLs[i]
-            + " (" + (i + 1) + " out of " + downloadURLs.length + ")");
-        File indexDir;
-        try {
-          indexDir = downloadSegmentToLocalAndUntar(tableNameWithType, 
segmentName, downloadURLs[i], taskType,
-              tempDataDir, "_" + i);
-        } catch (Exception e) {
-          LOGGER.error("Failed to download segment from download url: {}", 
downloadURLs[i], e);
-          _minionMetrics.addMeteredTableValue(tableNameWithType, 
MinionMeter.SEGMENT_DOWNLOAD_FAIL_COUNT, 1L);
-          _eventObserver.notifyTaskError(_pinotTaskConfig, e);
-          throw e;
+      if (nThreads <= 1) {
+        for (int index = 0; index < downloadURLs.length; index++) {
+          downloadAndUntarSegment(tableNameWithType, taskType, 
segmentNames[index], downloadURLs[index],
+              tempDataDir, index, segmentMetadataMap, segmentNames.length);
         }
-        inputSegmentDirs.add(indexDir);
-
-        reportSegmentDownloadMetrics(indexDir, tableNameWithType, taskType);
-        SegmentMetadataImpl segmentMetadata = new 
SegmentMetadataImpl(indexDir);
-        numRecords += segmentMetadata.getTotalDocs();
+      } else {
+        parallelDownloadAndUntarSegments(nThreads, tableNameWithType, 
taskType, segmentNames, downloadURLs,
+            tempDataDir, segmentMetadataMap);
       }
+      numRecords = processSegmentMetadata(segmentNames, segmentMetadataMap, 
inputSegmentDirs);

Review Comment:
   I think, it is alright. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to