This is an automated email from the ASF dual-hosted git repository. snlee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 91ffcc759f Reduce Disk Footprint for Segment Processor Framework to Avoid Out of Disk Issues (#12220) 91ffcc759f is described below commit 91ffcc759f4f178cbfb6b9a1541069734b4c0031 Author: aishikbh <ais...@startree.ai> AuthorDate: Wed Jan 24 11:17:29 2024 +0530 Reduce Disk Footprint for Segment Processor Framework to Avoid Out of Disk Issues (#12220) * modularized map, reduce and segment generation phase. * added interfaces and classes * added staetefulrecordreaderfileconfig instead of recordreaderfileconfig * save Progress * major changes, working tests. * more changes * made the number of bytes to be configurable * added global sequence id to segments since it is modularised out, made the intermediate file size configurable through segmentsConfig. * remove logic for global segment IDs. * used recordreaderconfig instead of statefulrecordreaderconfig, changed looping conditions, made recordreader of recordreaderfileconfig modifiable. * added proper sequence ids for segments. * ingestion working without errors. Should be good in sunny day scenarios. * remove typo from testing * replace _partitionToFilemanagerMap with local variable asthe mapping is happening in multiple iterations. Change exception handling for mapper. * added precondition check for intermediateFileSizeThresholdInBytes and inferred observer from segmentsConfig * fix typo and add comment. * removed redundant method. * config related changes, fix unit tests. * remove redundant flag. * replaced size based constraint checker with size based constraint writer. * consolidated all the constraint checks into one place. * add test to validate segmentprocessorframework * decoupled recordreader closing logic from segment mapper * addressing comments related to segmentprocessorframework. * delegated initialisation and check for recordreader being done with all the records to recordreaderfileconfig. * simplify logic for termination of map phase. * change variable names for better readability. * Added tests and minor changes in logic - Added further tests for SegmentProcessorFramework. - Removed redundant checks for SegmentProcessorFramework. - Detected an edge case and added an additional statement in SegmentMapper. * isolated the termination logs to mapAndTransformRow. * Keep SegmentMapper public interface unchanged - Pass the total count of RecordReaders through SegmentMapper Constructor instead of map() arguments. * addressing review comments. * cleaned up logging logic. * Channged Logs * kept the logging for default scenario same as before * conditionally logged the progress between iterations in case the feature is enabled. * modify tests - PR #12290 touched dimBaseballTeams.csv on which the tests depends on. Modified the test to reflect that. - Changed import statements (addressed comment). --- .../apache/pinot/core/common/MinionConstants.java | 1 + .../processing/framework/SegmentConfig.java | 30 ++- .../framework/SegmentProcessorFramework.java | 139 ++++++++++---- .../genericrow/AdaptiveConstraintsWriter.java | 33 ++++ .../genericrow/AdaptiveSizeBasedWriter.java | 51 ++++++ .../segment/processing/genericrow/FileWriter.java | 30 +++ .../genericrow/GenericRowFileWriter.java | 11 +- .../segment/processing/mapper/SegmentMapper.java | 70 ++++--- .../framework/SegmentProcessorFrameworkTest.java | 203 +++++++++++++++++++++ .../pinot/plugin/minion/tasks/MergeTaskUtils.java | 4 + .../plugin/minion/tasks/MergeTaskUtilsTest.java | 6 +- .../spi/data/readers/RecordReaderFileConfig.java | 51 +++++- 12 files changed, 561 insertions(+), 68 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java index 8e4afd465a..4193984559 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java @@ -104,6 +104,7 @@ public class MinionConstants { // Segment config public static final String MAX_NUM_RECORDS_PER_TASK_KEY = "maxNumRecordsPerTask"; public static final String MAX_NUM_RECORDS_PER_SEGMENT_KEY = "maxNumRecordsPerSegment"; + public static final String SEGMENT_MAPPER_FILE_SIZE_IN_BYTES = "segmentMapperFileSizeThresholdInBytes"; public static final String MAX_NUM_PARALLEL_BUCKETS = "maxNumParallelBuckets"; public static final String SEGMENT_NAME_PREFIX_KEY = "segmentNamePrefix"; public static final String SEGMENT_NAME_POSTFIX_KEY = "segmentNamePostfix"; diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentConfig.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentConfig.java index b3302fafc0..95191b658b 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentConfig.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentConfig.java @@ -31,22 +31,28 @@ import javax.annotation.Nullable; @JsonIgnoreProperties(ignoreUnknown = true) public class SegmentConfig { public static final int DEFAULT_MAX_NUM_RECORDS_PER_SEGMENT = 5_000_000; + public static final long DEFAULT_SEGMENT_MAPPER_FILE_SIZE_IN_BYTES = Long.MAX_VALUE; private final int _maxNumRecordsPerSegment; private final String _segmentNamePrefix; private final String _segmentNamePostfix; private final String _fixedSegmentName; + private final long _segmentMapperFileSizeThresholdInBytes; @JsonCreator private SegmentConfig(@JsonProperty(value = "maxNumRecordsPerSegment", required = true) int maxNumRecordsPerSegment, @JsonProperty("segmentNamePrefix") @Nullable String segmentNamePrefix, @JsonProperty("segmentNamePostfix") @Nullable String segmentNamePostfix, - @JsonProperty("fixedSegmentName") @Nullable String fixedSegmentName) { + @JsonProperty("fixedSegmentName") @Nullable String fixedSegmentName, + @JsonProperty(value = "segmentMapperFileSizeThresholdInBytes", required = true) + long segmentMapperFileSizeThresholdInBytes) { Preconditions.checkState(maxNumRecordsPerSegment > 0, "Max num records per segment must be > 0"); + Preconditions.checkState(segmentMapperFileSizeThresholdInBytes > 0, "Intermediate file size threshold must be > 0"); _maxNumRecordsPerSegment = maxNumRecordsPerSegment; _segmentNamePrefix = segmentNamePrefix; _segmentNamePostfix = segmentNamePostfix; _fixedSegmentName = fixedSegmentName; + _segmentMapperFileSizeThresholdInBytes = segmentMapperFileSizeThresholdInBytes; } /** @@ -71,15 +77,21 @@ public class SegmentConfig { return _fixedSegmentName; } + public long getIntermediateFileSizeThreshold() { + return _segmentMapperFileSizeThresholdInBytes; + } + /** * Builder for SegmentConfig */ public static class Builder { private int _maxNumRecordsPerSegment = DEFAULT_MAX_NUM_RECORDS_PER_SEGMENT; + private long _segmentMapperFileSizeThresholdInBytes = DEFAULT_SEGMENT_MAPPER_FILE_SIZE_IN_BYTES; private String _segmentNamePrefix; private String _segmentNamePostfix; private String _fixedSegmentName; + public Builder setMaxNumRecordsPerSegment(int maxNumRecordsPerSegment) { _maxNumRecordsPerSegment = maxNumRecordsPerSegment; return this; @@ -99,17 +111,25 @@ public class SegmentConfig { _fixedSegmentName = fixedSegmentName; return this; } + public Builder setIntermediateFileSizeThreshold(long segmentMapperFileSizeThresholdInBytes) { + _segmentMapperFileSizeThresholdInBytes = segmentMapperFileSizeThresholdInBytes; + return this; + } public SegmentConfig build() { Preconditions.checkState(_maxNumRecordsPerSegment > 0, "Max num records per segment must be > 0"); - return new SegmentConfig(_maxNumRecordsPerSegment, _segmentNamePrefix, _segmentNamePostfix, _fixedSegmentName); + Preconditions.checkState(_segmentMapperFileSizeThresholdInBytes > 0, + "Intermediate file size threshold must be > 0"); + return new SegmentConfig(_maxNumRecordsPerSegment, _segmentNamePrefix, _segmentNamePostfix, _fixedSegmentName, + _segmentMapperFileSizeThresholdInBytes); } } @Override public String toString() { - return "SegmentConfig{" + "_maxNumRecordsPerSegment=" + _maxNumRecordsPerSegment + ", _segmentNamePrefix='" - + _segmentNamePrefix + '\'' + ", _segmentNamePostfix='" + _segmentNamePostfix + '\'' + ", _fixedSegmentName='" - + _fixedSegmentName + '\'' + '}'; + return "SegmentConfig{" + "_maxNumRecordsPerSegment=" + _maxNumRecordsPerSegment + + ", _segmentMapperFileSizeThresholdInBytes=" + _segmentMapperFileSizeThresholdInBytes + + ", _segmentNamePrefix='" + _segmentNamePrefix + '\'' + ", _segmentNamePostfix='" + _segmentNamePostfix + '\'' + + ", _fixedSegmentName='" + _fixedSegmentName + '\'' + '}'; } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java index fb5a08ff56..868583d0d3 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java @@ -66,8 +66,8 @@ public class SegmentProcessorFramework { private final File _mapperOutputDir; private final File _reducerOutputDir; private final File _segmentsOutputDir; - private Map<String, GenericRowFileManager> _partitionToFileManagerMap; private final SegmentNumRowProvider _segmentNumRowProvider; + private int _segmentSequenceId = 0; /** * Initializes the SegmentProcessorFramework with record readers, config and working directory. We will now rely on @@ -124,14 +124,8 @@ public class SegmentProcessorFramework { try { return doProcess(); } catch (Exception e) { - // Cleaning up file managers no matter they are from map phase or reduce phase. For those from reduce phase, the - // reducers should have cleaned up the corresponding file managers from map phase already. - if (_partitionToFileManagerMap != null) { - for (GenericRowFileManager fileManager : _partitionToFileManagerMap.values()) { - fileManager.cleanUp(); - } - } - // Cleaning up output dir as processing has failed. + // Cleaning up output dir as processing has failed. file managers left from map or reduce phase will be cleaned + // up in the respective phases. FileUtils.deleteQuietly(_segmentsOutputDir); throw e; } finally { @@ -142,24 +136,103 @@ public class SegmentProcessorFramework { private List<File> doProcess() throws Exception { - // Map phase - LOGGER.info("Beginning map phase on {} record readers", _recordReaderFileConfigs.size()); - SegmentMapper mapper = new SegmentMapper(_recordReaderFileConfigs, _customRecordTransformers, - _segmentProcessorConfig, _mapperOutputDir); - _partitionToFileManagerMap = mapper.map(); - - // Check for mapper output files - if (_partitionToFileManagerMap.isEmpty()) { - LOGGER.info("No partition generated from mapper phase, skipping the reducer phase"); - return Collections.emptyList(); + List<File> outputSegmentDirs = new ArrayList<>(); + int numRecordReaders = _recordReaderFileConfigs.size(); + int nextRecordReaderIndexToBeProcessed = 0; + int iterationCount = 1; + Consumer<Object> observer = _segmentProcessorConfig.getProgressObserver(); + boolean isMapperOutputSizeThresholdEnabled = + _segmentProcessorConfig.getSegmentConfig().getIntermediateFileSizeThreshold() != Long.MAX_VALUE; + + while (nextRecordReaderIndexToBeProcessed < numRecordReaders) { + // Initialise the mapper. Eliminate the record readers that have been processed in the previous iterations. + SegmentMapper mapper = + new SegmentMapper(_recordReaderFileConfigs.subList(nextRecordReaderIndexToBeProcessed, numRecordReaders), + _customRecordTransformers, _segmentProcessorConfig, _mapperOutputDir); + + // Log start of iteration details only if intermediate file size threshold is set. + if (isMapperOutputSizeThresholdEnabled) { + String logMessage = + String.format("Starting iteration %d with %d record readers. Starting index = %d, end index = %d", + iterationCount, + _recordReaderFileConfigs.subList(nextRecordReaderIndexToBeProcessed, numRecordReaders).size(), + nextRecordReaderIndexToBeProcessed + 1, numRecordReaders); + LOGGER.info(logMessage); + observer.accept(logMessage); + } + + // Map phase. + long mapStartTimeInMs = System.currentTimeMillis(); + Map<String, GenericRowFileManager> partitionToFileManagerMap = mapper.map(); + + // Log the time taken to map. + LOGGER.info("Finished iteration {} in {}ms", iterationCount, System.currentTimeMillis() - mapStartTimeInMs); + + // Check for mapper output files, if no files are generated, skip the reducer phase and move on to the next + // iteration. + if (partitionToFileManagerMap.isEmpty()) { + LOGGER.info("No mapper output files generated, skipping reduce phase"); + nextRecordReaderIndexToBeProcessed = getNextRecordReaderIndexToBeProcessed(nextRecordReaderIndexToBeProcessed); + continue; + } + + // Reduce phase. + doReduce(partitionToFileManagerMap); + + // Segment creation phase. Add the created segments to the final list. + outputSegmentDirs.addAll(generateSegment(partitionToFileManagerMap)); + + // Store the starting index of the record readers that were processed in this iteration for logging purposes. + int startingProcessedRecordReaderIndex = nextRecordReaderIndexToBeProcessed; + + // Update next record reader index to be processed. + nextRecordReaderIndexToBeProcessed = getNextRecordReaderIndexToBeProcessed(nextRecordReaderIndexToBeProcessed); + + // Log the details between iteration only if intermediate file size threshold is set. + if (isMapperOutputSizeThresholdEnabled) { + // Take care of logging the proper RecordReader index in case of the last iteration. + int boundaryIndexToLog = + nextRecordReaderIndexToBeProcessed == numRecordReaders ? nextRecordReaderIndexToBeProcessed + : nextRecordReaderIndexToBeProcessed + 1; + + // We are sure that the last RecordReader is completely processed in the last iteration else it may or may not + // have completed processing. Log it accordingly. + String logMessage; + if (nextRecordReaderIndexToBeProcessed == numRecordReaders) { + logMessage = String.format("Finished processing all of %d RecordReaders", numRecordReaders); + } else { + logMessage = String.format( + "Finished processing RecordReaders %d to %d (RecordReader %d might be partially processed) out of %d in " + + "iteration %d", startingProcessedRecordReaderIndex + 1, boundaryIndexToLog, + nextRecordReaderIndexToBeProcessed + 1, numRecordReaders, iterationCount); + } + + observer.accept(logMessage); + LOGGER.info(logMessage); + } + + iterationCount++; } + return outputSegmentDirs; + } - // Reduce phase - LOGGER.info("Beginning reduce phase on partitions: {}", _partitionToFileManagerMap.keySet()); + private int getNextRecordReaderIndexToBeProcessed(int currentRecordIndex) { + for (int i = currentRecordIndex; i < _recordReaderFileConfigs.size(); i++) { + RecordReaderFileConfig recordReaderFileConfig = _recordReaderFileConfigs.get(i); + if (!recordReaderFileConfig.isRecordReaderDone()) { + return i; + } + } + return _recordReaderFileConfigs.size(); + } + + private void doReduce(Map<String, GenericRowFileManager> partitionToFileManagerMap) + throws Exception { + LOGGER.info("Beginning reduce phase on partitions: {}", partitionToFileManagerMap.keySet()); Consumer<Object> observer = _segmentProcessorConfig.getProgressObserver(); - int totalCount = _partitionToFileManagerMap.keySet().size(); + int totalCount = partitionToFileManagerMap.keySet().size(); int count = 1; - for (Map.Entry<String, GenericRowFileManager> entry : _partitionToFileManagerMap.entrySet()) { + for (Map.Entry<String, GenericRowFileManager> entry : partitionToFileManagerMap.entrySet()) { String partitionId = entry.getKey(); observer.accept( String.format("Doing reduce phase on data from partition: %s (%d out of %d)", partitionId, count++, @@ -168,9 +241,11 @@ public class SegmentProcessorFramework { Reducer reducer = ReducerFactory.getReducer(partitionId, fileManager, _segmentProcessorConfig, _reducerOutputDir); entry.setValue(reducer.reduce()); } + } - // Segment creation phase - LOGGER.info("Beginning segment creation phase on partitions: {}", _partitionToFileManagerMap.keySet()); + private List<File> generateSegment(Map<String, GenericRowFileManager> partitionToFileManagerMap) + throws Exception { + LOGGER.info("Beginning segment creation phase on partitions: {}", partitionToFileManagerMap.keySet()); List<File> outputSegmentDirs = new ArrayList<>(); TableConfig tableConfig = _segmentProcessorConfig.getTableConfig(); Schema schema = _segmentProcessorConfig.getSchema(); @@ -179,6 +254,7 @@ public class SegmentProcessorFramework { String fixedSegmentName = _segmentProcessorConfig.getSegmentConfig().getFixedSegmentName(); SegmentGeneratorConfig generatorConfig = new SegmentGeneratorConfig(tableConfig, schema); generatorConfig.setOutDir(_segmentsOutputDir.getPath()); + Consumer<Object> observer = _segmentProcessorConfig.getProgressObserver(); if (tableConfig.getIndexingConfig().getSegmentNameGeneratorType() != null) { generatorConfig.setSegmentNameGenerator( @@ -190,8 +266,7 @@ public class SegmentProcessorFramework { generatorConfig.setSegmentNamePostfix(segmentNamePostfix); } - int sequenceId = 0; - for (Map.Entry<String, GenericRowFileManager> entry : _partitionToFileManagerMap.entrySet()) { + for (Map.Entry<String, GenericRowFileManager> entry : partitionToFileManagerMap.entrySet()) { String partitionId = entry.getKey(); GenericRowFileManager fileManager = entry.getValue(); try { @@ -202,15 +277,15 @@ public class SegmentProcessorFramework { numSortFields); GenericRowFileRecordReader recordReader = fileReader.getRecordReader(); int maxNumRecordsPerSegment; - for (int startRowId = 0; startRowId < numRows; startRowId += maxNumRecordsPerSegment, sequenceId++) { + for (int startRowId = 0; startRowId < numRows; startRowId += maxNumRecordsPerSegment, _segmentSequenceId++) { maxNumRecordsPerSegment = _segmentNumRowProvider.getNumRows(); int endRowId = Math.min(startRowId + maxNumRecordsPerSegment, numRows); - LOGGER.info("Start creating segment of sequenceId: {} with row range: {} to {}", sequenceId, startRowId, - endRowId); + LOGGER.info("Start creating segment of sequenceId: {} with row range: {} to {}", _segmentSequenceId, + startRowId, endRowId); observer.accept(String.format( "Creating segment of sequentId: %d with data from partition: %s and row range: [%d, %d) out of [0, %d)", - sequenceId, partitionId, startRowId, endRowId, numRows)); - generatorConfig.setSequenceId(sequenceId); + _segmentSequenceId, partitionId, startRowId, endRowId, numRows)); + generatorConfig.setSequenceId(_segmentSequenceId); GenericRowFileRecordReader recordReaderForRange = recordReader.getRecordReaderForRange(startRowId, endRowId); SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl(); driver.init(generatorConfig, new RecordReaderSegmentCreationDataSource(recordReaderForRange), diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/genericrow/AdaptiveConstraintsWriter.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/genericrow/AdaptiveConstraintsWriter.java new file mode 100644 index 0000000000..d0a3daffa0 --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/genericrow/AdaptiveConstraintsWriter.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.segment.processing.genericrow; + +import java.io.IOException; + + +/** + * Interface for a writer which can track constraints. This will be used by SegmentProcessorFramework. + * */ + +public interface AdaptiveConstraintsWriter<W, D> { + boolean canWrite(); + + void write(W writer, D dataUnit) + throws IOException; +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/genericrow/AdaptiveSizeBasedWriter.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/genericrow/AdaptiveSizeBasedWriter.java new file mode 100644 index 0000000000..541bd14e26 --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/genericrow/AdaptiveSizeBasedWriter.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.segment.processing.genericrow; + +import java.io.IOException; +import org.apache.pinot.spi.data.readers.GenericRow; + + +public class AdaptiveSizeBasedWriter implements AdaptiveConstraintsWriter<GenericRowFileWriter, GenericRow> { + + private final long _bytesLimit; + private long _numBytesWritten; + + public AdaptiveSizeBasedWriter(long bytesLimit) { + _bytesLimit = bytesLimit; + _numBytesWritten = 0; + } + + public long getBytesLimit() { + return _bytesLimit; + } + public long getNumBytesWritten() { + return _numBytesWritten; + } + + @Override + public boolean canWrite() { + return _numBytesWritten < _bytesLimit; + } + + @Override + public void write(GenericRowFileWriter writer, GenericRow row) throws IOException { + _numBytesWritten += writer.writeData(row); + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/genericrow/FileWriter.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/genericrow/FileWriter.java new file mode 100644 index 0000000000..8dd0a7d0a1 --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/genericrow/FileWriter.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.segment.processing.genericrow; + +import java.io.IOException; + +/** + * Abstraction for writing data units to a file. + * */ + +public interface FileWriter<T> { + void close() throws IOException; + long writeData(T dataUnit) throws IOException; +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/genericrow/GenericRowFileWriter.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/genericrow/GenericRowFileWriter.java index 171488f963..281838a0b5 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/genericrow/GenericRowFileWriter.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/genericrow/GenericRowFileWriter.java @@ -38,7 +38,7 @@ import org.apache.pinot.spi.data.readers.GenericRow; * * TODO: Consider using ByteBuffer instead of OutputStream. */ -public class GenericRowFileWriter implements Closeable { +public class GenericRowFileWriter implements Closeable, FileWriter<GenericRow> { private final DataOutputStream _offsetStream; private final BufferedOutputStream _dataStream; private final GenericRowSerializer _serializer; @@ -63,6 +63,15 @@ public class GenericRowFileWriter implements Closeable { _nextOffset += bytes.length; } + public long writeData(GenericRow genericRow) + throws IOException { + _offsetStream.writeLong(_nextOffset); + byte[] bytes = _serializer.serialize(genericRow); + _dataStream.write(bytes); + _nextOffset += bytes.length; + return bytes.length; + } + @Override public void close() throws IOException { diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java index b883d775cc..407cbd4dcd 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java @@ -30,7 +30,9 @@ import java.util.stream.Collectors; import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.tuple.Pair; import org.apache.pinot.core.segment.processing.framework.SegmentProcessorConfig; +import org.apache.pinot.core.segment.processing.genericrow.AdaptiveSizeBasedWriter; import org.apache.pinot.core.segment.processing.genericrow.GenericRowFileManager; +import org.apache.pinot.core.segment.processing.genericrow.GenericRowFileWriter; import org.apache.pinot.core.segment.processing.partitioner.Partitioner; import org.apache.pinot.core.segment.processing.partitioner.PartitionerConfig; import org.apache.pinot.core.segment.processing.partitioner.PartitionerFactory; @@ -45,7 +47,6 @@ import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.data.readers.GenericRow; import org.apache.pinot.spi.data.readers.RecordReader; -import org.apache.pinot.spi.data.readers.RecordReaderFactory; import org.apache.pinot.spi.data.readers.RecordReaderFileConfig; import org.apache.pinot.spi.utils.StringUtil; import org.slf4j.Logger; @@ -62,22 +63,20 @@ import org.slf4j.LoggerFactory; */ public class SegmentMapper { private static final Logger LOGGER = LoggerFactory.getLogger(SegmentMapper.class); - - private List<RecordReaderFileConfig> _recordReaderFileConfigs; - private List<RecordTransformer> _customRecordTransformers; private final SegmentProcessorConfig _processorConfig; private final File _mapperOutputDir; - private final List<FieldSpec> _fieldSpecs; private final boolean _includeNullFields; private final int _numSortFields; - private final CompositeTransformer _recordTransformer; private final TimeHandler _timeHandler; private final Partitioner[] _partitioners; private final String[] _partitionsBuffer; // NOTE: Use TreeMap so that the order is deterministic private final Map<String, GenericRowFileManager> _partitionToFileManagerMap = new TreeMap<>(); + private AdaptiveSizeBasedWriter _adaptiveSizeBasedWriter; + private List<RecordReaderFileConfig> _recordReaderFileConfigs; + private List<RecordTransformer> _customRecordTransformers; public SegmentMapper(List<RecordReaderFileConfig> recordReaderFileConfigs, List<RecordTransformer> customRecordTransformers, SegmentProcessorConfig processorConfig, File mapperOutputDir) { @@ -107,6 +106,10 @@ public class SegmentMapper { LOGGER.info("Initialized mapper with {} record readers, output dir: {}, timeHandler: {}, partitioners: {}", _recordReaderFileConfigs.size(), _mapperOutputDir, _timeHandler.getClass(), Arrays.stream(_partitioners).map(p -> p.getClass().toString()).collect(Collectors.joining(","))); + + // initialize adaptive writer. + _adaptiveSizeBasedWriter = + new AdaptiveSizeBasedWriter(processorConfig.getSegmentConfig().getIntermediateFileSizeThreshold()); } /** @@ -118,7 +121,7 @@ public class SegmentMapper { try { return doMap(); } catch (Exception e) { - // Cleaning up resources created by the mapper, leaving others to the caller like the input _recordReaders. + // Cleaning up resources created by the mapper. for (GenericRowFileManager fileManager : _partitionToFileManagerMap.values()) { fileManager.cleanUp(); } @@ -129,40 +132,39 @@ public class SegmentMapper { private Map<String, GenericRowFileManager> doMap() throws Exception { Consumer<Object> observer = _processorConfig.getProgressObserver(); - int totalCount = _recordReaderFileConfigs.size(); int count = 1; + int totalNumRecordReaders = _recordReaderFileConfigs.size(); GenericRow reuse = new GenericRow(); for (RecordReaderFileConfig recordReaderFileConfig : _recordReaderFileConfigs) { - RecordReader recordReader = recordReaderFileConfig._recordReader; - if (recordReader == null) { - // We create and use the recordReader here. - try { - recordReader = - RecordReaderFactory.getRecordReader(recordReaderFileConfig._fileFormat, recordReaderFileConfig._dataFile, - recordReaderFileConfig._fieldsToRead, recordReaderFileConfig._recordReaderConfig); - mapAndTransformRow(recordReader, reuse, observer, count, totalCount); - } finally { - if (recordReader != null) { - recordReader.close(); - } - } - } else { - mapAndTransformRow(recordReader, reuse, observer, count, totalCount); + RecordReader recordReader = recordReaderFileConfig.getRecordReader(); + + // Mapper can terminate midway of reading a file if the intermediate file size has crossed the configured + // threshold. Map phase will continue in the next iteration right where we are leaving off in the current + // iteration. + boolean shouldMapperTerminate = + !completeMapAndTransformRow(recordReader, reuse, observer, count, totalNumRecordReaders); + + // Terminate the map phase if intermediate file size has crossed the threshold. + if (shouldMapperTerminate) { + break; } + recordReaderFileConfig.closeRecordReader(); count++; } for (GenericRowFileManager fileManager : _partitionToFileManagerMap.values()) { fileManager.closeFileWriter(); } - return _partitionToFileManagerMap; } - private void mapAndTransformRow(RecordReader recordReader, GenericRow reuse, + +// Returns true if the map phase can continue, false if it should terminate based on the configured threshold for +// intermediate file size during map phase. + private boolean completeMapAndTransformRow(RecordReader recordReader, GenericRow reuse, Consumer<Object> observer, int count, int totalCount) throws Exception { observer.accept(String.format("Doing map phase on data from RecordReader (%d out of %d)", count, totalCount)); - while (recordReader.hasNext()) { + while (recordReader.hasNext() && (_adaptiveSizeBasedWriter.canWrite())) { reuse = recordReader.next(reuse); // TODO: Add ComplexTypeTransformer here. Currently it is not idempotent so cannot add it @@ -183,6 +185,16 @@ public class SegmentMapper { } reuse.clear(); } + if (recordReader.hasNext() && !_adaptiveSizeBasedWriter.canWrite()) { + String logMessage = String.format( + "Stopping record readers at index: %d out of %d passed to mapper as size limit reached, bytes written = %d," + + " bytes " + "limit = %d", count, totalCount, _adaptiveSizeBasedWriter.getNumBytesWritten(), + _adaptiveSizeBasedWriter.getBytesLimit()); + observer.accept(logMessage); + LOGGER.info(logMessage); + return false; + } + return true; } private void writeRecord(GenericRow row) @@ -210,6 +222,10 @@ public class SegmentMapper { _partitionToFileManagerMap.put(partition, fileManager); } - fileManager.getFileWriter().write(row); + // Get the file writer. + GenericRowFileWriter fileWriter = fileManager.getFileWriter(); + + // Write the row. + _adaptiveSizeBasedWriter.write(fileWriter, row); } } diff --git a/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFrameworkTest.java b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFrameworkTest.java index 8b3e150a1b..0ec9261d92 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFrameworkTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFrameworkTest.java @@ -56,8 +56,10 @@ import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertThrows; import static org.testng.Assert.assertTrue; @@ -581,6 +583,207 @@ public class SegmentProcessorFrameworkTest { rewindRecordReaders(_multipleSegments); } + @Test + public void testConfigurableMapperOutputSize() + throws Exception { + File workingDir = new File(TEMP_DIR, "configurable_mapper_test_output"); + FileUtils.forceMkdir(workingDir); + int expectedTotalDocsCount = 10; + + // Test 1 : Default case i.e. no limit to mapper output file size (single record reader). + + SegmentProcessorConfig config = + new SegmentProcessorConfig.Builder().setTableConfig(_tableConfig).setSchema(_schema).build(); + SegmentProcessorFramework framework = new SegmentProcessorFramework(_singleSegment, config, workingDir); + List<File> outputSegments = framework.process(); + assertEquals(outputSegments.size(), 1); + String[] outputDirs = workingDir.list(); + assertTrue(outputDirs != null && outputDirs.length == 1, Arrays.toString(outputDirs)); + SegmentMetadata segmentMetadata = new SegmentMetadataImpl(outputSegments.get(0)); + assertEquals(segmentMetadata.getTotalDocs(), expectedTotalDocsCount); + assertEquals(segmentMetadata.getName(), "myTable_1597719600000_1597892400000_0"); + FileUtils.cleanDirectory(workingDir); + rewindRecordReaders(_singleSegment); + + // Test 2 : Default case i.e. no limit to mapper output file size (multiple record readers). + config = new SegmentProcessorConfig.Builder().setTableConfig(_tableConfig).setSchema(_schema).build(); + framework = new SegmentProcessorFramework(_multipleSegments, config, workingDir); + outputSegments = framework.process(); + assertEquals(outputSegments.size(), 1); + outputDirs = workingDir.list(); + assertTrue(outputDirs != null && outputDirs.length == 1, Arrays.toString(outputDirs)); + segmentMetadata = new SegmentMetadataImpl(outputSegments.get(0)); + assertEquals(segmentMetadata.getTotalDocs(), expectedTotalDocsCount); + assertEquals(segmentMetadata.getName(), "myTable_1597719600000_1597892400000_0"); + FileUtils.cleanDirectory(workingDir); + rewindRecordReaders(_multipleSegments); + + // Test 3 : Test mapper with threshold output size (single record reader). + + // Create a segmentConfig with intermediate mapper output size threshold set to the number of bytes in each row + // from the data. In this way, we can test if each row is written to a separate segment. + SegmentConfig segmentConfig = + new SegmentConfig.Builder().setIntermediateFileSizeThreshold(16).setSegmentNamePrefix("testPrefix") + .setSegmentNamePostfix("testPostfix").build(); + config = new SegmentProcessorConfig.Builder().setSegmentConfig(segmentConfig).setTableConfig(_tableConfig) + .setSchema(_schema).build(); + framework = new SegmentProcessorFramework(_singleSegment, config, workingDir); + outputSegments = framework.process(); + assertEquals(outputSegments.size(), expectedTotalDocsCount); + outputDirs = workingDir.list(); + assertTrue(outputDirs != null && outputDirs.length == 1, Arrays.toString(outputDirs)); + + // Verify that each segment has only one row, and the segment name is correct. + + for (int i = 0; i < expectedTotalDocsCount; i++) { + segmentMetadata = new SegmentMetadataImpl(outputSegments.get(i)); + assertEquals(segmentMetadata.getTotalDocs(), 1); + assertTrue(segmentMetadata.getName().matches("testPrefix_.*_testPostfix_" + i)); + } + FileUtils.cleanDirectory(workingDir); + rewindRecordReaders(_singleSegment); + + // Test 4 : Test mapper with threshold output size (multiple record readers). + + // Create a segmentConfig with intermediate mapper output size threshold set to the number of bytes in each row + // from the data. In this way, we can test if each row is written to a separate segment. + segmentConfig = new SegmentConfig.Builder().setIntermediateFileSizeThreshold(16).setSegmentNamePrefix("testPrefix") + .setSegmentNamePostfix("testPostfix").build(); + config = new SegmentProcessorConfig.Builder().setSegmentConfig(segmentConfig).setTableConfig(_tableConfig) + .setSchema(_schema).build(); + framework = new SegmentProcessorFramework(_multipleSegments, config, workingDir); + outputSegments = framework.process(); + assertEquals(outputSegments.size(), expectedTotalDocsCount); + outputDirs = workingDir.list(); + assertTrue(outputDirs != null && outputDirs.length == 1, Arrays.toString(outputDirs)); + + // Verify that each segment has only one row, and the segment name is correct. + + for (int i = 0; i < expectedTotalDocsCount; i++) { + segmentMetadata = new SegmentMetadataImpl(outputSegments.get(i)); + assertEquals(segmentMetadata.getTotalDocs(), 1); + assertTrue(segmentMetadata.getName().matches("testPrefix_.*_testPostfix_" + i)); + } + FileUtils.cleanDirectory(workingDir); + rewindRecordReaders(_multipleSegments); + + // Test 5 : Test with injected failure in mapper to verify output directory is cleaned up. + + List<RecordReader> testList = new ArrayList<>(_multipleSegments); + testList.set(1, null); + segmentConfig = new SegmentConfig.Builder().setIntermediateFileSizeThreshold(16).setSegmentNamePrefix("testPrefix") + .setSegmentNamePostfix("testPostfix").build(); + config = new SegmentProcessorConfig.Builder().setSegmentConfig(segmentConfig).setTableConfig(_tableConfig) + .setSchema(_schema).build(); + SegmentProcessorFramework failureTest = new SegmentProcessorFramework(testList, config, workingDir); + assertThrows(NullPointerException.class, failureTest::process); + assertTrue(FileUtils.isEmptyDirectory(workingDir)); + rewindRecordReaders(_multipleSegments); + + // Test 6: RecordReader should be closed when recordReader is created inside RecordReaderFileConfig (without mapper + // output size threshold configured). + + ClassLoader classLoader = getClass().getClassLoader(); + URL resource = classLoader.getResource("data/dimBaseballTeams.csv"); + RecordReaderFileConfig recordReaderFileConfig = + new RecordReaderFileConfig(FileFormat.CSV, new File(resource.toURI()), null, null); + TableConfig tableConfig = + new TableConfigBuilder(TableType.OFFLINE).setTableName("myTable").setTimeColumnName("time").build(); + Schema schema = + new Schema.SchemaBuilder().setSchemaName("mySchema").addSingleValueDimension("teamId", DataType.STRING, "") + .addSingleValueDimension("teamName", DataType.STRING, "") + .addDateTime("time", DataType.LONG, "1:MILLISECONDS:EPOCH", "1:MILLISECONDS").build(); + + config = new SegmentProcessorConfig.Builder().setTableConfig(tableConfig).setSchema(schema).build(); + + SegmentProcessorFramework frameworkWithRecordReaderFileConfig = + new SegmentProcessorFramework(config, workingDir, ImmutableList.of(recordReaderFileConfig), + Collections.emptyList(), null); + outputSegments = frameworkWithRecordReaderFileConfig.process(); + + // Verify the number of segments created and the total docs. + assertEquals(outputSegments.size(), 1); + ImmutableSegment segment = ImmutableSegmentLoader.load(outputSegments.get(0), ReadMode.mmap); + segmentMetadata = segment.getSegmentMetadata(); + assertEquals(segmentMetadata.getTotalDocs(), 52); + + // Verify that the record reader is closed from RecordReaderFileConfig. + assertTrue(recordReaderFileConfig.isRecordReaderClosedFromRecordReaderFileConfig()); + FileUtils.cleanDirectory(workingDir); + + // Test 7: RecordReader should not be closed when recordReader is passed to RecordReaderFileConfig. (Without + // mapper output size threshold configured) + + RecordReader recordReader = recordReaderFileConfig.getRecordReader(); + recordReader.rewind(); + + // Pass the recordReader to RecordReaderFileConfig. + recordReaderFileConfig = new RecordReaderFileConfig(recordReader); + SegmentProcessorFramework frameworkWithDelegateRecordReader = + new SegmentProcessorFramework(config, workingDir, ImmutableList.of(recordReaderFileConfig), + Collections.emptyList(), null); + outputSegments = frameworkWithDelegateRecordReader.process(); + + // Verify the number of segments created and the total docs. + assertEquals(outputSegments.size(), 1); + segment = ImmutableSegmentLoader.load(outputSegments.get(0), ReadMode.mmap); + segmentMetadata = segment.getSegmentMetadata(); + assertEquals(segmentMetadata.getTotalDocs(), 52); + + // Verify that the record reader is not closed from RecordReaderFileConfig. + assertFalse(recordReaderFileConfig.isRecordReaderClosedFromRecordReaderFileConfig()); + FileUtils.cleanDirectory(workingDir); + + // Test 8: RecordReader should be closed when recordReader is created inside RecordReaderFileConfig (With mapper + // output size threshold configured). + + expectedTotalDocsCount = 52; + recordReaderFileConfig = new RecordReaderFileConfig(FileFormat.CSV, new File(resource.toURI()), null, null); + + segmentConfig = new SegmentConfig.Builder().setIntermediateFileSizeThreshold(19).setSegmentNamePrefix("testPrefix") + .setSegmentNamePostfix("testPostfix").build(); + config = new SegmentProcessorConfig.Builder().setSegmentConfig(segmentConfig).setTableConfig(tableConfig) + .setSchema(schema).build(); + + frameworkWithRecordReaderFileConfig = + new SegmentProcessorFramework(config, workingDir, ImmutableList.of(recordReaderFileConfig), + Collections.emptyList(), null); + outputSegments = frameworkWithRecordReaderFileConfig.process(); + + // Verify that each segment has only one row. + for (int i = 0; i < expectedTotalDocsCount; i++) { + segmentMetadata = new SegmentMetadataImpl(outputSegments.get(i)); + assertEquals(segmentMetadata.getTotalDocs(), 1); + } + + // Verify that the record reader is closed from RecordReaderFileConfig. + assertTrue(recordReaderFileConfig.isRecordReaderClosedFromRecordReaderFileConfig()); + FileUtils.cleanDirectory(workingDir); + + // Test 9: RecordReader should not be closed when recordReader is passed to RecordReaderFileConfig (With mapper + // output size threshold configured). + + recordReader = recordReaderFileConfig.getRecordReader(); + recordReader.rewind(); + + // Pass the recordReader to RecordReaderFileConfig. + recordReaderFileConfig = new RecordReaderFileConfig(recordReader); + frameworkWithDelegateRecordReader = + new SegmentProcessorFramework(config, workingDir, ImmutableList.of(recordReaderFileConfig), + Collections.emptyList(), null); + outputSegments = frameworkWithDelegateRecordReader.process(); + + // Verify that each segment has only one row. + for (int i = 0; i < expectedTotalDocsCount; i++) { + segmentMetadata = new SegmentMetadataImpl(outputSegments.get(i)); + assertEquals(segmentMetadata.getTotalDocs(), 1); + } + + // Verify that the record reader is not closed from RecordReaderFileConfig. + assertFalse(recordReaderFileConfig.isRecordReaderClosedFromRecordReaderFileConfig()); + FileUtils.cleanDirectory(workingDir); + } + @Test public void testMultiValue() throws Exception { diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MergeTaskUtils.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MergeTaskUtils.java index 26c9e59e39..233dfd7f03 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MergeTaskUtils.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MergeTaskUtils.java @@ -147,6 +147,10 @@ public class MergeTaskUtils { if (maxNumRecordsPerSegment != null) { segmentConfigBuilder.setMaxNumRecordsPerSegment(Integer.parseInt(maxNumRecordsPerSegment)); } + String segmentMapperFileSizeThreshold = taskConfig.get(MergeTask.SEGMENT_MAPPER_FILE_SIZE_IN_BYTES); + if (segmentMapperFileSizeThreshold != null) { + segmentConfigBuilder.setIntermediateFileSizeThreshold(Long.parseLong(segmentMapperFileSizeThreshold)); + } segmentConfigBuilder.setSegmentNamePrefix(taskConfig.get(MergeTask.SEGMENT_NAME_PREFIX_KEY)); segmentConfigBuilder.setSegmentNamePostfix(taskConfig.get(MergeTask.SEGMENT_NAME_POSTFIX_KEY)); segmentConfigBuilder.setFixedSegmentName(taskConfig.get(MergeTask.FIXED_SEGMENT_NAME_KEY)); diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/MergeTaskUtilsTest.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/MergeTaskUtilsTest.java index bc6b897211..731607784b 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/MergeTaskUtilsTest.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/MergeTaskUtilsTest.java @@ -178,15 +178,17 @@ public class MergeTaskUtilsTest { taskConfig.put(MergeTask.SEGMENT_NAME_PREFIX_KEY, "myPrefix"); taskConfig.put(MergeTask.SEGMENT_NAME_POSTFIX_KEY, "myPostfix"); taskConfig.put(MergeTask.FIXED_SEGMENT_NAME_KEY, "mySegment"); + taskConfig.put(MergeTask.SEGMENT_MAPPER_FILE_SIZE_IN_BYTES, "1000000000"); SegmentConfig segmentConfig = MergeTaskUtils.getSegmentConfig(taskConfig); assertEquals(segmentConfig.getMaxNumRecordsPerSegment(), 10000); assertEquals(segmentConfig.getSegmentNamePrefix(), "myPrefix"); assertEquals(segmentConfig.getSegmentNamePostfix(), "myPostfix"); assertEquals(segmentConfig.getSegmentNamePostfix(), "myPostfix"); assertEquals(segmentConfig.getFixedSegmentName(), "mySegment"); + assertEquals(segmentConfig.getIntermediateFileSizeThreshold(), 1000000000L); assertEquals(segmentConfig.toString(), - "SegmentConfig{_maxNumRecordsPerSegment=10000, _segmentNamePrefix='myPrefix', " - + "_segmentNamePostfix='myPostfix', _fixedSegmentName='mySegment'}"); + "SegmentConfig{_maxNumRecordsPerSegment=10000, _segmentMapperFileSizeThresholdInBytes=1000000000, " + + "_segmentNamePrefix='myPrefix', _segmentNamePostfix='myPostfix', _fixedSegmentName='mySegment'}"); segmentConfig = MergeTaskUtils.getSegmentConfig(Collections.emptyMap()); assertEquals(segmentConfig.getMaxNumRecordsPerSegment(), SegmentConfig.DEFAULT_MAX_NUM_RECORDS_PER_SEGMENT); diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReaderFileConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReaderFileConfig.java index 8a2ebb9e16..51e4ed0cfb 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReaderFileConfig.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReaderFileConfig.java @@ -34,8 +34,13 @@ public class RecordReaderFileConfig { public final File _dataFile; public final Set<String> _fieldsToRead; public final RecordReaderConfig _recordReaderConfig; + private final boolean _isDelegateReader; // Record Readers created/passed from clients. - public final RecordReader _recordReader; + public RecordReader _recordReader; + // Track if RecordReaderFileConfig initialized the RecordReader for to aid in closing the RecordReader. + private boolean _isRecordReaderInitialized; + // Track if RecordReaderFileConfig closed the RecordReader for testing purposes. + private boolean _isRecordReaderClosed; // Pass in the info needed to initialize the reader public RecordReaderFileConfig(FileFormat fileFormat, File dataFile, Set<String> fieldsToRead, @@ -45,6 +50,11 @@ public class RecordReaderFileConfig { _fieldsToRead = fieldsToRead; _recordReaderConfig = recordReaderConfig; _recordReader = null; + // This is not a delegate RecordReader i.e. RecordReaderFileConfig owns the RecordReader, so it should be closed + // by RecordReaderFileConfig as well. + _isDelegateReader = false; + _isRecordReaderInitialized = false; + _isRecordReaderClosed = false; } // Pass in the reader instance directly @@ -54,5 +64,44 @@ public class RecordReaderFileConfig { _dataFile = null; _fieldsToRead = null; _recordReaderConfig = null; + // This is a delegate RecordReader i.e. RecordReader instance has been passed to RecordReaderFileConfig instead + // of the configs. It means RecordReaderFileConfig does not own the RecordReader, so it should not be closed by + // RecordReaderFileConfig as well. The responsibility of closing the RecordReader lies with the caller. + _isDelegateReader = true; + _isRecordReaderInitialized = true; + _isRecordReaderClosed = false; + } + + // Return the RecordReader instance. Initialize the RecordReader if not already initialized. + public RecordReader getRecordReader() + throws Exception { + if (!_isRecordReaderInitialized) { + _recordReader = RecordReaderFactory.getRecordReader(_fileFormat, _dataFile, _fieldsToRead, _recordReaderConfig); + _isRecordReaderInitialized = true; + } + return _recordReader; + } + + // Close the RecordReader instance if RecordReaderFileConfig initialized it. + public void closeRecordReader() + throws Exception { + // If RecordReaderFileConfig did not create the RecordReader, then it should not close it. + if (_isRecordReaderInitialized && !_isDelegateReader) { + _recordReader.close(); + _isRecordReaderClosed = true; + } + } + + // Return true if RecordReader is done processing. + public boolean isRecordReaderDone() { + if (_isRecordReaderInitialized) { + return !_recordReader.hasNext(); + } + return false; + } + + // For testing purposes only. + public boolean isRecordReaderClosedFromRecordReaderFileConfig() { + return _isRecordReaderClosed; } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org