aishikbh commented on code in PR #12220: URL: https://github.com/apache/pinot/pull/12220#discussion_r1455669240
########## pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java: ########## @@ -142,24 +136,62 @@ public List<File> process() private List<File> doProcess() throws Exception { - // Map phase - LOGGER.info("Beginning map phase on {} record readers", _recordReaderFileConfigs.size()); - SegmentMapper mapper = new SegmentMapper(_recordReaderFileConfigs, _customRecordTransformers, - _segmentProcessorConfig, _mapperOutputDir); - _partitionToFileManagerMap = mapper.map(); - - // Check for mapper output files - if (_partitionToFileManagerMap.isEmpty()) { - LOGGER.info("No partition generated from mapper phase, skipping the reducer phase"); - return Collections.emptyList(); + List<File> outputSegmentDirs = new ArrayList<>(); + int numRecordReaders = _recordReaderFileConfigs.size(); + int nextRecordReaderIndexToBeProcessed = 0; + + while (nextRecordReaderIndexToBeProcessed < numRecordReaders) { + // Initialise the mapper. Eliminate the record readers that have been processed in the previous iterations. + SegmentMapper mapper = + new SegmentMapper(_recordReaderFileConfigs.subList(nextRecordReaderIndexToBeProcessed, numRecordReaders), + _customRecordTransformers, _segmentProcessorConfig, _mapperOutputDir, numRecordReaders); + + // Map phase. + Map<String, GenericRowFileManager> partitionToFileManagerMap = doMap(mapper); + + // Check for mapper output files, if no files are generated, skip the reducer phase and move on to the next + // iteration. + if (partitionToFileManagerMap.isEmpty()) { + LOGGER.info("No mapper output files generated, skipping reduce phase"); + nextRecordReaderIndexToBeProcessed = getNextRecordReaderIndexToBeProcessed(nextRecordReaderIndexToBeProcessed); + continue; + } + + // Reduce phase. + doReduce(partitionToFileManagerMap); + + // Segment creation phase. Add the created segments to the final list. + outputSegmentDirs.addAll(generateSegment(partitionToFileManagerMap)); + + // Update next record reader index to be processed. + nextRecordReaderIndexToBeProcessed = getNextRecordReaderIndexToBeProcessed(nextRecordReaderIndexToBeProcessed); } + return outputSegmentDirs; + } - // Reduce phase - LOGGER.info("Beginning reduce phase on partitions: {}", _partitionToFileManagerMap.keySet()); + private int getNextRecordReaderIndexToBeProcessed(int currentRecordIndex) { + for (int i = currentRecordIndex; i < _recordReaderFileConfigs.size(); i++) { + RecordReaderFileConfig recordReaderFileConfig = _recordReaderFileConfigs.get(i); + if (!recordReaderFileConfig.isRecordReaderDone()) { + return i; + } + } + return _recordReaderFileConfigs.size(); + } + + private Map<String, GenericRowFileManager> doMap(SegmentMapper mapper) + throws Exception { + // Map phase Review Comment: Good catch. We do not need a separate function after cleaning up the catch block. removed it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org