krishan1390 commented on code in PR #16249: URL: https://github.com/apache/pinot/pull/16249#discussion_r2179700785
########## pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java: ########## @@ -195,30 +200,9 @@ public List<SegmentConversionResult> executeTask(PinotTaskConfig pinotTaskConfig File tempDataDir = new File(new File(MINION_CONTEXT.getDataDir(), taskType), "tmp-" + UUID.randomUUID()); Preconditions.checkState(tempDataDir.mkdirs()); try { - List<File> inputSegmentDirs = new ArrayList<>(); - int numRecords = 0; - - for (int i = 0; i < downloadURLs.length; i++) { - String segmentName = segmentNames[i]; - // Download and decompress the segment file - _eventObserver.notifyProgress(_pinotTaskConfig, "Downloading and decompressing segment from: " + downloadURLs[i] - + " (" + (i + 1) + " out of " + downloadURLs.length + ")"); - File indexDir; - try { - indexDir = downloadSegmentToLocalAndUntar(tableNameWithType, segmentName, downloadURLs[i], taskType, - tempDataDir, "_" + i); - } catch (Exception e) { - LOGGER.error("Failed to download segment from download url: {}", downloadURLs[i], e); - _minionMetrics.addMeteredTableValue(tableNameWithType, MinionMeter.SEGMENT_DOWNLOAD_FAIL_COUNT, 1L); - _eventObserver.notifyTaskError(_pinotTaskConfig, e); - throw e; - } - inputSegmentDirs.add(indexDir); - - reportSegmentDownloadMetrics(indexDir, tableNameWithType, taskType); - SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(indexDir); - numRecords += segmentMetadata.getTotalDocs(); - } + List<File> inputSegmentDirs = Collections.synchronizedList(new ArrayList<>()); Review Comment: I think there are ordering requirements on inputSegmentDirs. These should be ordered based on order of correposponding downloadURLs / segmentNames. the controller orders the input segmentNames based on sequence id. And then upsert executor creates recordReaderFileConfigs in the same order and uses that in SegmentProcessorFramework. The order can be important for various edge cases. I am not fully sure if its ok to break the order but it would be safe to not break the order. So if we can ensure the inputSegmentDirs are in the same order as downloadURLs / segmentNames that will help avoid regression. ########## pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java: ########## @@ -352,6 +336,59 @@ public List<SegmentConversionResult> executeTask(PinotTaskConfig pinotTaskConfig } } + private int parallelDownloadAndUntarSegments(String[] downloadURLs, String[] segmentNames, String tableNameWithType, + String taskType, File tempDataDir, List<File> inputSegmentDirs) + throws Exception { + AtomicInteger numRecords = new AtomicInteger(0); + List<Future<Void>> futures = new ArrayList<>(); + + int nThreads = Math.min(downloadURLs.length, Runtime.getRuntime().availableProcessors()); Review Comment: I think instead of Runtime.getRuntime().availableProcessors() we should use _minionConf here. because there will be many parallel tasks running together. dependent on numConcurrentTasksPerInstance. so if each task uses N threads, it can lead to starvation. Also each task type will have its own numConcurrentTasksPerInstance defined . So I think safer to use a minion conf. ########## pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java: ########## @@ -352,6 +336,59 @@ public List<SegmentConversionResult> executeTask(PinotTaskConfig pinotTaskConfig } } + private int parallelDownloadAndUntarSegments(String[] downloadURLs, String[] segmentNames, String tableNameWithType, + String taskType, File tempDataDir, List<File> inputSegmentDirs) + throws Exception { + AtomicInteger numRecords = new AtomicInteger(0); + List<Future<Void>> futures = new ArrayList<>(); + + int nThreads = Math.min(downloadURLs.length, Runtime.getRuntime().availableProcessors()); Review Comment: actually for OSS, we should avoid changing current behaviour by default. So by default we shouldn't create a threadpool but serially process it (same as today). If config is present, we use the threadpool with configured number of threads. Also I think better to use a common thread pool which is part of the BaseMultipleSegmentsConversionExecutor class. So that every call to executeTask() doesn't re-create the thread pool. We create this thread pool only if config is set. ########## pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java: ########## @@ -352,6 +336,59 @@ public List<SegmentConversionResult> executeTask(PinotTaskConfig pinotTaskConfig } } + private int parallelDownloadAndUntarSegments(String[] downloadURLs, String[] segmentNames, String tableNameWithType, + String taskType, File tempDataDir, List<File> inputSegmentDirs) + throws Exception { + AtomicInteger numRecords = new AtomicInteger(0); + List<Future<Void>> futures = new ArrayList<>(); + + int nThreads = Math.min(downloadURLs.length, Runtime.getRuntime().availableProcessors()); + ExecutorService executorService = Executors.newFixedThreadPool(nThreads); + try { + for (int i = 0; i < downloadURLs.length; i++) { + final int index = i; + futures.add(executorService.submit(() -> { + String segmentName = segmentNames[index]; + String downloadUrl = downloadURLs[index]; + + _eventObserver.notifyProgress(_pinotTaskConfig, + String.format("Downloading and decompressing segment from: %s (%d out of %d)", downloadUrl, index + 1, + downloadURLs.length)); + + try { + File indexDir = downloadSegmentToLocalAndUntar( + tableNameWithType, segmentName, downloadUrl, taskType, tempDataDir, "_" + index); + inputSegmentDirs.add(indexDir); + reportSegmentDownloadMetrics(indexDir, tableNameWithType, taskType); + SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(indexDir); + numRecords.addAndGet(segmentMetadata.getTotalDocs()); + } catch (Exception e) { Review Comment: the catch is required only for downloadSegmentToLocalAndUntar right ? ########## pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java: ########## @@ -352,6 +336,59 @@ public List<SegmentConversionResult> executeTask(PinotTaskConfig pinotTaskConfig } } + private int parallelDownloadAndUntarSegments(String[] downloadURLs, String[] segmentNames, String tableNameWithType, Review Comment: I think better for this method to return List<SegmentMetadataImpl> as thats more intuitive for a method called "parallelDownloadAndUntarSegments". and the caller can iterate and populate numRecords. this will also help in creating the same order of inputSegmentDirs as "segmentNames" as we can use SegmentMetadataImpl.getName() and sort based on index of segmentNames for that segment name ########## pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java: ########## @@ -352,6 +336,59 @@ public List<SegmentConversionResult> executeTask(PinotTaskConfig pinotTaskConfig } } + private int parallelDownloadAndUntarSegments(String[] downloadURLs, String[] segmentNames, String tableNameWithType, + String taskType, File tempDataDir, List<File> inputSegmentDirs) + throws Exception { + AtomicInteger numRecords = new AtomicInteger(0); + List<Future<Void>> futures = new ArrayList<>(); + + int nThreads = Math.min(downloadURLs.length, Runtime.getRuntime().availableProcessors()); + ExecutorService executorService = Executors.newFixedThreadPool(nThreads); Review Comment: after adding the configs, can we add / modify a test case that parallelly downloads multiple segments ? because its possible many sub methods like downloadSegmentToLocalAndUntar / reportSegmentDownloadMetrics aren't thread safe and a test case can help catch such issues Even if they're thread safe now, a test case that downloads multiple segemnts parallely can help catch other issues that might exist now or in future ########## pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java: ########## @@ -352,6 +336,59 @@ public List<SegmentConversionResult> executeTask(PinotTaskConfig pinotTaskConfig } } + private int parallelDownloadAndUntarSegments(String[] downloadURLs, String[] segmentNames, String tableNameWithType, + String taskType, File tempDataDir, List<File> inputSegmentDirs) + throws Exception { + AtomicInteger numRecords = new AtomicInteger(0); + List<Future<Void>> futures = new ArrayList<>(); + + int nThreads = Math.min(downloadURLs.length, Runtime.getRuntime().availableProcessors()); + ExecutorService executorService = Executors.newFixedThreadPool(nThreads); + try { Review Comment: nit - we can remove this try block ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org