abhishekbafna commented on code in PR #16249: URL: https://github.com/apache/pinot/pull/16249#discussion_r2204607770
########## pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java: ########## @@ -190,35 +195,26 @@ public List<SegmentConversionResult> executeTask(PinotTaskConfig pinotTaskConfig String downloadURLString = taskConfigs.get(MinionConstants.DOWNLOAD_URL_KEY); String[] downloadURLs = downloadURLString.split(MinionConstants.URL_SEPARATOR); AuthProvider authProvider = AuthProviderUtils.makeAuthProvider(taskConfigs.get(MinionConstants.AUTH_TOKEN)); - LOGGER.info("Start executing {} on table: {}, input segments: {} with downloadURLs: {}, uploadURL: {}", taskType, - tableNameWithType, inputSegmentNames, downloadURLString, uploadURL); File tempDataDir = new File(new File(MINION_CONTEXT.getDataDir(), taskType), "tmp-" + UUID.randomUUID()); Preconditions.checkState(tempDataDir.mkdirs()); + int numRecords; + List<File> inputSegmentDirs = new ArrayList<>(downloadURLs.length); + Map<String, SegmentMetadata> segmentMetadataMap = Collections.synchronizedMap(new HashMap<>(downloadURLs.length)); + int nThreads = Math.min(getParallelism(taskConfigs), downloadURLs.length); + LOGGER.info( + "Start executing {} on table: {}, input segments: {} with downloadURLs: {}, uploadURL: {}, thread pool size:{}", + taskType, tableNameWithType, inputSegmentNames, downloadURLString, uploadURL, nThreads); try { - List<File> inputSegmentDirs = new ArrayList<>(); - int numRecords = 0; - - for (int i = 0; i < downloadURLs.length; i++) { - String segmentName = segmentNames[i]; - // Download and decompress the segment file - _eventObserver.notifyProgress(_pinotTaskConfig, "Downloading and decompressing segment from: " + downloadURLs[i] - + " (" + (i + 1) + " out of " + downloadURLs.length + ")"); - File indexDir; - try { - indexDir = downloadSegmentToLocalAndUntar(tableNameWithType, segmentName, downloadURLs[i], taskType, - tempDataDir, "_" + i); - } catch (Exception e) { - LOGGER.error("Failed to download segment from download url: {}", downloadURLs[i], e); - _minionMetrics.addMeteredTableValue(tableNameWithType, MinionMeter.SEGMENT_DOWNLOAD_FAIL_COUNT, 1L); - _eventObserver.notifyTaskError(_pinotTaskConfig, e); - throw e; + if (nThreads <= 1) { + for (int index = 0; index < downloadURLs.length; index++) { + downloadAndUntarSegment(tableNameWithType, taskType, segmentNames[index], downloadURLs[index], + tempDataDir, index, segmentMetadataMap, segmentNames.length); } - inputSegmentDirs.add(indexDir); - - reportSegmentDownloadMetrics(indexDir, tableNameWithType, taskType); - SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(indexDir); - numRecords += segmentMetadata.getTotalDocs(); + } else { + parallelDownloadAndUntarSegments(nThreads, tableNameWithType, taskType, segmentNames, downloadURLs, + tempDataDir, segmentMetadataMap); } + numRecords = processSegmentMetadata(segmentNames, segmentMetadataMap, inputSegmentDirs); Review Comment: I think, it is alright. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org