krishan1390 commented on code in PR #16249:
URL: https://github.com/apache/pinot/pull/16249#discussion_r2204332057


##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java:
##########
@@ -190,35 +195,26 @@ public List<SegmentConversionResult> 
executeTask(PinotTaskConfig pinotTaskConfig
     String downloadURLString = 
taskConfigs.get(MinionConstants.DOWNLOAD_URL_KEY);
     String[] downloadURLs = 
downloadURLString.split(MinionConstants.URL_SEPARATOR);
     AuthProvider authProvider = 
AuthProviderUtils.makeAuthProvider(taskConfigs.get(MinionConstants.AUTH_TOKEN));
-    LOGGER.info("Start executing {} on table: {}, input segments: {} with 
downloadURLs: {}, uploadURL: {}", taskType,
-        tableNameWithType, inputSegmentNames, downloadURLString, uploadURL);
     File tempDataDir = new File(new File(MINION_CONTEXT.getDataDir(), 
taskType), "tmp-" + UUID.randomUUID());
     Preconditions.checkState(tempDataDir.mkdirs());
+    int numRecords;
+    List<File> inputSegmentDirs = new ArrayList<>(downloadURLs.length);
+    Map<String, SegmentMetadata> segmentMetadataMap = 
Collections.synchronizedMap(new HashMap<>(downloadURLs.length));
+    int nThreads = Math.min(getParallelism(taskConfigs), downloadURLs.length);
+    LOGGER.info(
+        "Start executing {} on table: {}, input segments: {} with 
downloadURLs: {}, uploadURL: {}, thread pool size:{}",
+        taskType, tableNameWithType, inputSegmentNames, downloadURLString, 
uploadURL, nThreads);
     try {
-      List<File> inputSegmentDirs = new ArrayList<>();
-      int numRecords = 0;
-
-      for (int i = 0; i < downloadURLs.length; i++) {
-        String segmentName = segmentNames[i];
-        // Download and decompress the segment file
-        _eventObserver.notifyProgress(_pinotTaskConfig, "Downloading and 
decompressing segment from: " + downloadURLs[i]
-            + " (" + (i + 1) + " out of " + downloadURLs.length + ")");
-        File indexDir;
-        try {
-          indexDir = downloadSegmentToLocalAndUntar(tableNameWithType, 
segmentName, downloadURLs[i], taskType,
-              tempDataDir, "_" + i);
-        } catch (Exception e) {
-          LOGGER.error("Failed to download segment from download url: {}", 
downloadURLs[i], e);
-          _minionMetrics.addMeteredTableValue(tableNameWithType, 
MinionMeter.SEGMENT_DOWNLOAD_FAIL_COUNT, 1L);
-          _eventObserver.notifyTaskError(_pinotTaskConfig, e);
-          throw e;
+      if (nThreads <= 1) {
+        for (int index = 0; index < downloadURLs.length; index++) {
+          downloadAndUntarSegment(tableNameWithType, taskType, 
segmentNames[index], downloadURLs[index],
+              tempDataDir, index, segmentMetadataMap, segmentNames.length);
         }
-        inputSegmentDirs.add(indexDir);
-
-        reportSegmentDownloadMetrics(indexDir, tableNameWithType, taskType);
-        SegmentMetadataImpl segmentMetadata = new 
SegmentMetadataImpl(indexDir);
-        numRecords += segmentMetadata.getTotalDocs();
+      } else {
+        parallelDownloadAndUntarSegments(nThreads, tableNameWithType, 
taskType, segmentNames, downloadURLs,
+            tempDataDir, segmentMetadataMap);
       }
+      numRecords = processSegmentMetadata(segmentNames, segmentMetadataMap, 
inputSegmentDirs);

Review Comment:
   nit : processSegmentMetadata is doing 2 things - calculating numRecords and 
modifying an input variable inputSegmentDirs. 
   
   I think it'l be cleaner to just create 2 methods 
getInputSegmentDirs(segmentMetadataMap, segmentNames) and 
getNumRecords(segmentMetadataMap) 



##########
pinot-minion/src/main/java/org/apache/pinot/minion/MinionConf.java:
##########
@@ -31,6 +31,13 @@ public class MinionConf extends PinotConfiguration {
   public static final String MINION_TASK_PROGRESS_MANAGER_CLASS = 
"pinot.minion.taskProgressManager.class";
   public static final int DEFAULT_END_REPLACE_SEGMENTS_SOCKET_TIMEOUT_MS = 10 
* 60 * 1000; // 10 mins
 
+  /**
+   * The number of threads to use for downloading segments from the deepstore.
+   * This is a global setting that applies to all tasks of 
BaseMultipleSegmentsConversionExecutor class.

Review Comment:
   nit - lets document that this config is applied at a task level and total 
parallelism will be this factor * number of parallel tasks running.
   
   that will useful to clarify in documentation because this config is at a 
minion level and can be easily misinterpreted to be a minion level factor



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to