swaminathanmanish commented on code in PR #12220: URL: https://github.com/apache/pinot/pull/12220#discussion_r1444085565
########## pinot-core/src/main/java/org/apache/pinot/core/segment/processing/genericrow/FileWriter.java: ########## @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.segment.processing.genericrow; + +import java.io.IOException; + + +public interface FileWriter<T> { Review Comment: **Suggestion** - Instead of AdaptiveConstrainsChecker, may be we can introduce an AdaptiveConstraintsWriter interface, that takes in the GenericWriter and delegates the write calls to the GenericWriter. AdaptiveConstraintsWriter can still have the canWrite() api (to terminate mapper phase), in addition to write() api. Whenever write is involved, bytes are automatically tracked in AdaptiveConstraintsWriter implementation (like AdaptiveSizeBasedWriter) and we dont need to explicitly update bytes or reset counters. That way we dont need to expose adaptive constraints logic in mapper (like updating bytes, resetting counter etc.). When a new writer is initialized, counters are automatically reset. ########## pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java: ########## @@ -141,28 +149,42 @@ private Map<String, GenericRowFileManager> doMap() RecordReaderFactory.getRecordReader(recordReaderFileConfig._fileFormat, recordReaderFileConfig._dataFile, recordReaderFileConfig._fieldsToRead, recordReaderFileConfig._recordReaderConfig); mapAndTransformRow(recordReader, reuse, observer, count, totalCount); + _recordReaderFileConfigs.get(i)._recordReader = recordReader; + if (!_constraintsChecker.canWrite()) { + LOGGER.info("Stopping record readers at index: {} as size limit reached", i); + break; + } } finally { - if (recordReader != null) { + if (recordReader != null && !recordReader.hasNext()) { recordReader.close(); } } } else { + if (!recordReader.hasNext()) { + LOGGER.info("Skipping record reader as it is already processed at index: {}", i); + count++; + continue; + } mapAndTransformRow(recordReader, reuse, observer, count, totalCount); + _recordReaderFileConfigs.get(i)._recordReader = recordReader; + if (!_constraintsChecker.canWrite()) { + LOGGER.info("Stopping record readers at index: {} as size limit reached", i); + break; + } } count++; } for (GenericRowFileManager fileManager : _partitionToFileManagerMap.values()) { fileManager.closeFileWriter(); } - return _partitionToFileManagerMap; } private void mapAndTransformRow(RecordReader recordReader, GenericRow reuse, Consumer<Object> observer, int count, int totalCount) throws Exception { observer.accept(String.format("Doing map phase on data from RecordReader (%d out of %d)", count, totalCount)); - while (recordReader.hasNext()) { + while (recordReader.hasNext() && (!_isSizeBasedConstraintsCheckerEnabled || _constraintsChecker.canWrite())) { Review Comment: We can move this check within constraintsChecker itself, because we are anyway setting bytes to Long.MAX_VALUE. we dont need separate isSizeBasedConstraintsCheckerEnabled? ########## pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java: ########## @@ -141,28 +149,42 @@ private Map<String, GenericRowFileManager> doMap() RecordReaderFactory.getRecordReader(recordReaderFileConfig._fileFormat, recordReaderFileConfig._dataFile, recordReaderFileConfig._fieldsToRead, recordReaderFileConfig._recordReaderConfig); mapAndTransformRow(recordReader, reuse, observer, count, totalCount); + _recordReaderFileConfigs.get(i)._recordReader = recordReader; + if (!_constraintsChecker.canWrite()) { + LOGGER.info("Stopping record readers at index: {} as size limit reached", i); + break; + } } finally { - if (recordReader != null) { + if (recordReader != null && !recordReader.hasNext()) { recordReader.close(); } } } else { + if (!recordReader.hasNext()) { + LOGGER.info("Skipping record reader as it is already processed at index: {}", i); + count++; + continue; + } mapAndTransformRow(recordReader, reuse, observer, count, totalCount); + _recordReaderFileConfigs.get(i)._recordReader = recordReader; + if (!_constraintsChecker.canWrite()) { Review Comment: Why do we need this here as well (pre and post call to mapAndTransformRow) if (!_constraintsChecker.canWrite()) { ########## pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java: ########## @@ -142,24 +136,83 @@ public List<File> process() private List<File> doProcess() throws Exception { - // Map phase - LOGGER.info("Beginning map phase on {} record readers", _recordReaderFileConfigs.size()); - SegmentMapper mapper = new SegmentMapper(_recordReaderFileConfigs, _customRecordTransformers, - _segmentProcessorConfig, _mapperOutputDir); - _partitionToFileManagerMap = mapper.map(); - - // Check for mapper output files - if (_partitionToFileManagerMap.isEmpty()) { - LOGGER.info("No partition generated from mapper phase, skipping the reducer phase"); - return Collections.emptyList(); + List<File> outputSegmentDirs = new ArrayList<>(); + int numRecordReaders = _recordReaderFileConfigs.size(); + int nextRecordReaderIndexToBeProcessed = 0; + + // Initialise the mapper. + SegmentMapper mapper = + new SegmentMapper(_recordReaderFileConfigs, _customRecordTransformers, _segmentProcessorConfig, + _mapperOutputDir); + + + while (nextRecordReaderIndexToBeProcessed < numRecordReaders) { + // Reset the constraints checker for each iteration. + mapper.resetConstraintsChecker(); + + // Map phase. + Map<String, GenericRowFileManager> partitionToFileManagerMap = doMap(mapper); + + // Check for mapper output files, if no files are generated, skip the reducer phase and move on to the next + // iteration. + if (partitionToFileManagerMap.isEmpty()) { + nextRecordReaderIndexToBeProcessed = getNextRecordReaderIndexToBeProcessed(nextRecordReaderIndexToBeProcessed); + continue; + } + + // Reduce phase. + doReduce(partitionToFileManagerMap); + + // Segment creation phase. Add the created segments to the final list. + outputSegmentDirs.addAll(generateSegment(partitionToFileManagerMap)); + + // Update next record reader index to be processed. + nextRecordReaderIndexToBeProcessed = getNextRecordReaderIndexToBeProcessed(nextRecordReaderIndexToBeProcessed); } + return outputSegmentDirs; + } - // Reduce phase - LOGGER.info("Beginning reduce phase on partitions: {}", _partitionToFileManagerMap.keySet()); + private int getNextRecordReaderIndexToBeProcessed(int currentRecordIndex) { + for (int i = currentRecordIndex; i < _recordReaderFileConfigs.size(); i++) { + RecordReader recordReader = _recordReaderFileConfigs.get(i)._recordReader; + if (recordReader == null || recordReader.hasNext()) { Review Comment: Basically this means reader has not been created or currently being iterated? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org