krishan1390 commented on code in PR #16249: URL: https://github.com/apache/pinot/pull/16249#discussion_r2204332057
########## pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java: ########## @@ -190,35 +195,26 @@ public List<SegmentConversionResult> executeTask(PinotTaskConfig pinotTaskConfig String downloadURLString = taskConfigs.get(MinionConstants.DOWNLOAD_URL_KEY); String[] downloadURLs = downloadURLString.split(MinionConstants.URL_SEPARATOR); AuthProvider authProvider = AuthProviderUtils.makeAuthProvider(taskConfigs.get(MinionConstants.AUTH_TOKEN)); - LOGGER.info("Start executing {} on table: {}, input segments: {} with downloadURLs: {}, uploadURL: {}", taskType, - tableNameWithType, inputSegmentNames, downloadURLString, uploadURL); File tempDataDir = new File(new File(MINION_CONTEXT.getDataDir(), taskType), "tmp-" + UUID.randomUUID()); Preconditions.checkState(tempDataDir.mkdirs()); + int numRecords; + List<File> inputSegmentDirs = new ArrayList<>(downloadURLs.length); + Map<String, SegmentMetadata> segmentMetadataMap = Collections.synchronizedMap(new HashMap<>(downloadURLs.length)); + int nThreads = Math.min(getParallelism(taskConfigs), downloadURLs.length); + LOGGER.info( + "Start executing {} on table: {}, input segments: {} with downloadURLs: {}, uploadURL: {}, thread pool size:{}", + taskType, tableNameWithType, inputSegmentNames, downloadURLString, uploadURL, nThreads); try { - List<File> inputSegmentDirs = new ArrayList<>(); - int numRecords = 0; - - for (int i = 0; i < downloadURLs.length; i++) { - String segmentName = segmentNames[i]; - // Download and decompress the segment file - _eventObserver.notifyProgress(_pinotTaskConfig, "Downloading and decompressing segment from: " + downloadURLs[i] - + " (" + (i + 1) + " out of " + downloadURLs.length + ")"); - File indexDir; - try { - indexDir = downloadSegmentToLocalAndUntar(tableNameWithType, segmentName, downloadURLs[i], taskType, - tempDataDir, "_" + i); - } catch (Exception e) { - LOGGER.error("Failed to download segment from download url: {}", downloadURLs[i], e); - _minionMetrics.addMeteredTableValue(tableNameWithType, MinionMeter.SEGMENT_DOWNLOAD_FAIL_COUNT, 1L); - _eventObserver.notifyTaskError(_pinotTaskConfig, e); - throw e; + if (nThreads <= 1) { + for (int index = 0; index < downloadURLs.length; index++) { + downloadAndUntarSegment(tableNameWithType, taskType, segmentNames[index], downloadURLs[index], + tempDataDir, index, segmentMetadataMap, segmentNames.length); } - inputSegmentDirs.add(indexDir); - - reportSegmentDownloadMetrics(indexDir, tableNameWithType, taskType); - SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(indexDir); - numRecords += segmentMetadata.getTotalDocs(); + } else { + parallelDownloadAndUntarSegments(nThreads, tableNameWithType, taskType, segmentNames, downloadURLs, + tempDataDir, segmentMetadataMap); } + numRecords = processSegmentMetadata(segmentNames, segmentMetadataMap, inputSegmentDirs); Review Comment: nit : processSegmentMetadata is doing 2 things - calculating numRecords and modifying an input variable inputSegmentDirs. I think it'l be cleaner to just create 2 methods getInputSegmentDirs(segmentMetadataMap, segmentNames) and getNumRecords(segmentMetadataMap) ########## pinot-minion/src/main/java/org/apache/pinot/minion/MinionConf.java: ########## @@ -31,6 +31,13 @@ public class MinionConf extends PinotConfiguration { public static final String MINION_TASK_PROGRESS_MANAGER_CLASS = "pinot.minion.taskProgressManager.class"; public static final int DEFAULT_END_REPLACE_SEGMENTS_SOCKET_TIMEOUT_MS = 10 * 60 * 1000; // 10 mins + /** + * The number of threads to use for downloading segments from the deepstore. + * This is a global setting that applies to all tasks of BaseMultipleSegmentsConversionExecutor class. Review Comment: nit - lets document that this config is applied at a task level and total parallelism will be this factor * number of parallel tasks running. that will useful to clarify in documentation because this config is at a minion level and can be easily misinterpreted to be a minion level factor -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org