krishan1390 commented on code in PR #16249:
URL: https://github.com/apache/pinot/pull/16249#discussion_r2179700785


##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java:
##########
@@ -195,30 +200,9 @@ public List<SegmentConversionResult> 
executeTask(PinotTaskConfig pinotTaskConfig
     File tempDataDir = new File(new File(MINION_CONTEXT.getDataDir(), 
taskType), "tmp-" + UUID.randomUUID());
     Preconditions.checkState(tempDataDir.mkdirs());
     try {
-      List<File> inputSegmentDirs = new ArrayList<>();
-      int numRecords = 0;
-
-      for (int i = 0; i < downloadURLs.length; i++) {
-        String segmentName = segmentNames[i];
-        // Download and decompress the segment file
-        _eventObserver.notifyProgress(_pinotTaskConfig, "Downloading and 
decompressing segment from: " + downloadURLs[i]
-            + " (" + (i + 1) + " out of " + downloadURLs.length + ")");
-        File indexDir;
-        try {
-          indexDir = downloadSegmentToLocalAndUntar(tableNameWithType, 
segmentName, downloadURLs[i], taskType,
-              tempDataDir, "_" + i);
-        } catch (Exception e) {
-          LOGGER.error("Failed to download segment from download url: {}", 
downloadURLs[i], e);
-          _minionMetrics.addMeteredTableValue(tableNameWithType, 
MinionMeter.SEGMENT_DOWNLOAD_FAIL_COUNT, 1L);
-          _eventObserver.notifyTaskError(_pinotTaskConfig, e);
-          throw e;
-        }
-        inputSegmentDirs.add(indexDir);
-
-        reportSegmentDownloadMetrics(indexDir, tableNameWithType, taskType);
-        SegmentMetadataImpl segmentMetadata = new 
SegmentMetadataImpl(indexDir);
-        numRecords += segmentMetadata.getTotalDocs();
-      }
+      List<File> inputSegmentDirs = Collections.synchronizedList(new 
ArrayList<>());

Review Comment:
   I think there are ordering requirements on inputSegmentDirs. These should be 
ordered based on order of correposponding downloadURLs / segmentNames. 
   
   the controller orders the input segmentNames based on sequence id. 
   
   And then upsert executor creates recordReaderFileConfigs in the same order 
and uses that in SegmentProcessorFramework. 
   
   The order can be important for various edge cases. I am not fully sure if 
its ok to break the order but it would be safe to not break the order.
   
   So if we can ensure the inputSegmentDirs are in the same order as 
downloadURLs / segmentNames that will help avoid regression.  



##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java:
##########
@@ -352,6 +336,59 @@ public List<SegmentConversionResult> 
executeTask(PinotTaskConfig pinotTaskConfig
     }
   }
 
+  private int parallelDownloadAndUntarSegments(String[] downloadURLs, String[] 
segmentNames, String tableNameWithType,
+      String taskType, File tempDataDir, List<File> inputSegmentDirs)
+      throws Exception {
+    AtomicInteger numRecords = new AtomicInteger(0);
+    List<Future<Void>> futures = new ArrayList<>();
+
+    int nThreads = Math.min(downloadURLs.length, 
Runtime.getRuntime().availableProcessors());

Review Comment:
   I think instead of Runtime.getRuntime().availableProcessors() we should use 
_minionConf here. 
   
   because there will be many parallel tasks running together. dependent on 
numConcurrentTasksPerInstance. so if each task uses N threads, it can lead to 
starvation. Also each task type will have its own numConcurrentTasksPerInstance 
defined . 
   
   So I think safer to use a minion conf. 



##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java:
##########
@@ -352,6 +336,59 @@ public List<SegmentConversionResult> 
executeTask(PinotTaskConfig pinotTaskConfig
     }
   }
 
+  private int parallelDownloadAndUntarSegments(String[] downloadURLs, String[] 
segmentNames, String tableNameWithType,
+      String taskType, File tempDataDir, List<File> inputSegmentDirs)
+      throws Exception {
+    AtomicInteger numRecords = new AtomicInteger(0);
+    List<Future<Void>> futures = new ArrayList<>();
+
+    int nThreads = Math.min(downloadURLs.length, 
Runtime.getRuntime().availableProcessors());

Review Comment:
   actually for OSS, we should avoid changing current behaviour by default. So 
by default we shouldn't create a threadpool but serially process it (same as 
today). 
   
   If config is present, we use the threadpool with configured number of 
threads. 
   
   Also I think better to use a common thread pool which is part of the 
BaseMultipleSegmentsConversionExecutor class. So that every call to 
executeTask() doesn't re-create the thread pool. We create this thread pool 
only if config is set. 



##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java:
##########
@@ -352,6 +336,59 @@ public List<SegmentConversionResult> 
executeTask(PinotTaskConfig pinotTaskConfig
     }
   }
 
+  private int parallelDownloadAndUntarSegments(String[] downloadURLs, String[] 
segmentNames, String tableNameWithType,
+      String taskType, File tempDataDir, List<File> inputSegmentDirs)
+      throws Exception {
+    AtomicInteger numRecords = new AtomicInteger(0);
+    List<Future<Void>> futures = new ArrayList<>();
+
+    int nThreads = Math.min(downloadURLs.length, 
Runtime.getRuntime().availableProcessors());
+    ExecutorService executorService = Executors.newFixedThreadPool(nThreads);
+    try {
+      for (int i = 0; i < downloadURLs.length; i++) {
+        final int index = i;
+        futures.add(executorService.submit(() -> {
+          String segmentName = segmentNames[index];
+          String downloadUrl = downloadURLs[index];
+
+          _eventObserver.notifyProgress(_pinotTaskConfig,
+              String.format("Downloading and decompressing segment from: %s 
(%d out of %d)", downloadUrl, index + 1,
+                  downloadURLs.length));
+
+          try {
+            File indexDir = downloadSegmentToLocalAndUntar(
+                tableNameWithType, segmentName, downloadUrl, taskType, 
tempDataDir, "_" + index);
+            inputSegmentDirs.add(indexDir);
+            reportSegmentDownloadMetrics(indexDir, tableNameWithType, 
taskType);
+            SegmentMetadataImpl segmentMetadata = new 
SegmentMetadataImpl(indexDir);
+            numRecords.addAndGet(segmentMetadata.getTotalDocs());
+          } catch (Exception e) {

Review Comment:
   the catch is required only for downloadSegmentToLocalAndUntar right ? 



##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java:
##########
@@ -352,6 +336,59 @@ public List<SegmentConversionResult> 
executeTask(PinotTaskConfig pinotTaskConfig
     }
   }
 
+  private int parallelDownloadAndUntarSegments(String[] downloadURLs, String[] 
segmentNames, String tableNameWithType,

Review Comment:
   I think better for this method to return List<SegmentMetadataImpl> as thats 
more intuitive for a method called "parallelDownloadAndUntarSegments". and the 
caller can iterate and populate numRecords. 
   
   this will also help in creating the same order of inputSegmentDirs as 
"segmentNames" as we can use SegmentMetadataImpl.getName() and sort based on 
index of segmentNames for that segment name



##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java:
##########
@@ -352,6 +336,59 @@ public List<SegmentConversionResult> 
executeTask(PinotTaskConfig pinotTaskConfig
     }
   }
 
+  private int parallelDownloadAndUntarSegments(String[] downloadURLs, String[] 
segmentNames, String tableNameWithType,
+      String taskType, File tempDataDir, List<File> inputSegmentDirs)
+      throws Exception {
+    AtomicInteger numRecords = new AtomicInteger(0);
+    List<Future<Void>> futures = new ArrayList<>();
+
+    int nThreads = Math.min(downloadURLs.length, 
Runtime.getRuntime().availableProcessors());
+    ExecutorService executorService = Executors.newFixedThreadPool(nThreads);

Review Comment:
   after adding the configs, can we add / modify a test case that parallelly 
downloads multiple segments ? 
   
   because its possible many sub methods like downloadSegmentToLocalAndUntar / 
reportSegmentDownloadMetrics aren't thread safe and a test case can help catch 
such issues 
   
   Even if they're thread safe now, a test case that downloads multiple 
segemnts parallely can help catch other issues that might exist now or in future



##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java:
##########
@@ -352,6 +336,59 @@ public List<SegmentConversionResult> 
executeTask(PinotTaskConfig pinotTaskConfig
     }
   }
 
+  private int parallelDownloadAndUntarSegments(String[] downloadURLs, String[] 
segmentNames, String tableNameWithType,
+      String taskType, File tempDataDir, List<File> inputSegmentDirs)
+      throws Exception {
+    AtomicInteger numRecords = new AtomicInteger(0);
+    List<Future<Void>> futures = new ArrayList<>();
+
+    int nThreads = Math.min(downloadURLs.length, 
Runtime.getRuntime().availableProcessors());
+    ExecutorService executorService = Executors.newFixedThreadPool(nThreads);
+    try {

Review Comment:
   nit - we can remove this try block ? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to