This is an automated email from the ASF dual-hosted git repository. snlee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 9a8fa79199 Allow passing custom record reader to be inited/closed in SegmentProcessorFramework (#12529) 9a8fa79199 is described below commit 9a8fa791999cb792943fdaab19b498daeb9fb0dc Author: swaminathanmanish <126024920+swaminathanman...@users.noreply.github.com> AuthorDate: Fri Mar 1 16:56:59 2024 -0800 Allow passing custom record reader to be inited/closed in SegmentProcessorFramework (#12529) --- .../framework/SegmentProcessorFrameworkTest.java | 17 +++++++++------ .../spi/data/readers/RecordReaderFileConfig.java | 25 +++++++++++++--------- 2 files changed, 26 insertions(+), 16 deletions(-) diff --git a/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFrameworkTest.java b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFrameworkTest.java index 0ec9261d92..c2c4c51789 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFrameworkTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFrameworkTest.java @@ -48,6 +48,7 @@ import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.data.readers.FileFormat; import org.apache.pinot.spi.data.readers.GenericRow; import org.apache.pinot.spi.data.readers.RecordReader; +import org.apache.pinot.spi.data.readers.RecordReaderFactory; import org.apache.pinot.spi.data.readers.RecordReaderFileConfig; import org.apache.pinot.spi.utils.ReadMode; import org.apache.pinot.spi.utils.builder.TableConfigBuilder; @@ -194,9 +195,11 @@ public class SegmentProcessorFrameworkTest { FileUtils.forceMkdir(workingDir); ClassLoader classLoader = getClass().getClassLoader(); URL resource = classLoader.getResource("data/dimBaseballTeams.csv"); - RecordReaderFileConfig reader = new RecordReaderFileConfig(FileFormat.CSV, - new File(resource.toURI()), + RecordReader recordReader = RecordReaderFactory.getRecordReader(FileFormat.CSV, new File(resource.toURI()), null, null); + RecordReaderFileConfig recordReaderFileConfig = new RecordReaderFileConfig(FileFormat.CSV, + new File(resource.toURI()), + null, null, recordReader); TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName("myTable"). setTimeColumnName("time").build(); @@ -208,13 +211,15 @@ public class SegmentProcessorFrameworkTest { SegmentProcessorConfig config = new SegmentProcessorConfig.Builder().setTableConfig(tableConfig).setSchema(schema).build(); - SegmentProcessorFramework framework = new SegmentProcessorFramework(config, workingDir, ImmutableList.of(reader), - Collections.emptyList(), null); + SegmentProcessorFramework framework = new SegmentProcessorFramework(config, workingDir, + ImmutableList.of(recordReaderFileConfig), Collections.emptyList(), null); List<File> outputSegments = framework.process(); assertEquals(outputSegments.size(), 1); ImmutableSegment segment = ImmutableSegmentLoader.load(outputSegments.get(0), ReadMode.mmap); SegmentMetadata segmentMetadata = segment.getSegmentMetadata(); assertEquals(segmentMetadata.getTotalDocs(), 52); + // Verify reader is closed + assertEquals(recordReaderFileConfig.isRecordReaderClosedFromRecordReaderFileConfig(), true); } @Test @@ -686,7 +691,7 @@ public class SegmentProcessorFrameworkTest { ClassLoader classLoader = getClass().getClassLoader(); URL resource = classLoader.getResource("data/dimBaseballTeams.csv"); RecordReaderFileConfig recordReaderFileConfig = - new RecordReaderFileConfig(FileFormat.CSV, new File(resource.toURI()), null, null); + new RecordReaderFileConfig(FileFormat.CSV, new File(resource.toURI()), null, null, null); TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName("myTable").setTimeColumnName("time").build(); Schema schema = @@ -738,7 +743,7 @@ public class SegmentProcessorFrameworkTest { // output size threshold configured). expectedTotalDocsCount = 52; - recordReaderFileConfig = new RecordReaderFileConfig(FileFormat.CSV, new File(resource.toURI()), null, null); + recordReaderFileConfig = new RecordReaderFileConfig(FileFormat.CSV, new File(resource.toURI()), null, null, null); segmentConfig = new SegmentConfig.Builder().setIntermediateFileSizeThreshold(19).setSegmentNamePrefix("testPrefix") .setSegmentNamePostfix("testPostfix").build(); diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReaderFileConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReaderFileConfig.java index 51e4ed0cfb..e7566cb0ff 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReaderFileConfig.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReaderFileConfig.java @@ -24,10 +24,8 @@ import javax.annotation.Nullable; /** - * Wraps RecordReader info to instantiate a reader. Users can either pass in the - * RecordReader instance directly or the info required to initialize the RecordReader, so that the - * RecordReader can be initialized just when its about to be used, which avoids early/eager - * initialization/memory allocation. + * Placeholder for all RecordReader configs. Manages the lifecycle of a RecordReader by initing/closing within the + * Segment creation framework. */ public class RecordReaderFileConfig { public final FileFormat _fileFormat; @@ -44,20 +42,22 @@ public class RecordReaderFileConfig { // Pass in the info needed to initialize the reader public RecordReaderFileConfig(FileFormat fileFormat, File dataFile, Set<String> fieldsToRead, - @Nullable RecordReaderConfig recordReaderConfig) { + @Nullable RecordReaderConfig recordReaderConfig, @Nullable RecordReader recordReader) { _fileFormat = fileFormat; _dataFile = dataFile; _fieldsToRead = fieldsToRead; _recordReaderConfig = recordReaderConfig; - _recordReader = null; - // This is not a delegate RecordReader i.e. RecordReaderFileConfig owns the RecordReader, so it should be closed - // by RecordReaderFileConfig as well. + // Users can pass in custom readers + _recordReader = recordReader; + // RecordReaderFileConfig owns the lifecycle of RecordReader, to be inited and closed. _isDelegateReader = false; _isRecordReaderInitialized = false; _isRecordReaderClosed = false; } - // Pass in the reader instance directly + // Keeping this for backwards compatibility. We want the lifecycle of the reader to be managed internally + // (inited/closed) by SegmentProcessorFramework. + @Deprecated public RecordReaderFileConfig(RecordReader recordReader) { _recordReader = recordReader; _fileFormat = null; @@ -76,7 +76,12 @@ public class RecordReaderFileConfig { public RecordReader getRecordReader() throws Exception { if (!_isRecordReaderInitialized) { - _recordReader = RecordReaderFactory.getRecordReader(_fileFormat, _dataFile, _fieldsToRead, _recordReaderConfig); + if (_recordReader == null) { + // Record reader instance to be created and inited + _recordReader = RecordReaderFactory.getRecordReader(_fileFormat, _dataFile, _fieldsToRead, _recordReaderConfig); + } else { + _recordReader.init(_dataFile, _fieldsToRead, _recordReaderConfig); + } _isRecordReaderInitialized = true; } return _recordReader; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org