This is an automated email from the ASF dual-hosted git repository.

snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 9a8fa79199 Allow passing custom record reader to be inited/closed in 
SegmentProcessorFramework (#12529)
9a8fa79199 is described below

commit 9a8fa791999cb792943fdaab19b498daeb9fb0dc
Author: swaminathanmanish <126024920+swaminathanman...@users.noreply.github.com>
AuthorDate: Fri Mar 1 16:56:59 2024 -0800

    Allow passing custom record reader to be inited/closed in 
SegmentProcessorFramework (#12529)
---
 .../framework/SegmentProcessorFrameworkTest.java   | 17 +++++++++------
 .../spi/data/readers/RecordReaderFileConfig.java   | 25 +++++++++++++---------
 2 files changed, 26 insertions(+), 16 deletions(-)

diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFrameworkTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFrameworkTest.java
index 0ec9261d92..c2c4c51789 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFrameworkTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFrameworkTest.java
@@ -48,6 +48,7 @@ import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.FileFormat;
 import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.data.readers.RecordReader;
+import org.apache.pinot.spi.data.readers.RecordReaderFactory;
 import org.apache.pinot.spi.data.readers.RecordReaderFileConfig;
 import org.apache.pinot.spi.utils.ReadMode;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
@@ -194,9 +195,11 @@ public class SegmentProcessorFrameworkTest {
     FileUtils.forceMkdir(workingDir);
     ClassLoader classLoader = getClass().getClassLoader();
     URL resource = classLoader.getResource("data/dimBaseballTeams.csv");
-    RecordReaderFileConfig reader = new RecordReaderFileConfig(FileFormat.CSV,
-        new File(resource.toURI()),
+    RecordReader recordReader = 
RecordReaderFactory.getRecordReader(FileFormat.CSV, new File(resource.toURI()),
         null, null);
+    RecordReaderFileConfig recordReaderFileConfig = new 
RecordReaderFileConfig(FileFormat.CSV,
+        new File(resource.toURI()),
+        null, null, recordReader);
     TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName("myTable").
         setTimeColumnName("time").build();
 
@@ -208,13 +211,15 @@ public class SegmentProcessorFrameworkTest {
 
     SegmentProcessorConfig config =
         new 
SegmentProcessorConfig.Builder().setTableConfig(tableConfig).setSchema(schema).build();
-    SegmentProcessorFramework framework = new 
SegmentProcessorFramework(config, workingDir, ImmutableList.of(reader),
-        Collections.emptyList(), null);
+    SegmentProcessorFramework framework = new 
SegmentProcessorFramework(config, workingDir,
+        ImmutableList.of(recordReaderFileConfig), Collections.emptyList(), 
null);
     List<File> outputSegments = framework.process();
     assertEquals(outputSegments.size(), 1);
     ImmutableSegment segment = 
ImmutableSegmentLoader.load(outputSegments.get(0), ReadMode.mmap);
     SegmentMetadata segmentMetadata = segment.getSegmentMetadata();
     assertEquals(segmentMetadata.getTotalDocs(), 52);
+    // Verify reader is closed
+    
assertEquals(recordReaderFileConfig.isRecordReaderClosedFromRecordReaderFileConfig(),
 true);
   }
 
   @Test
@@ -686,7 +691,7 @@ public class SegmentProcessorFrameworkTest {
     ClassLoader classLoader = getClass().getClassLoader();
     URL resource = classLoader.getResource("data/dimBaseballTeams.csv");
     RecordReaderFileConfig recordReaderFileConfig =
-        new RecordReaderFileConfig(FileFormat.CSV, new File(resource.toURI()), 
null, null);
+        new RecordReaderFileConfig(FileFormat.CSV, new File(resource.toURI()), 
null, null, null);
     TableConfig tableConfig =
         new 
TableConfigBuilder(TableType.OFFLINE).setTableName("myTable").setTimeColumnName("time").build();
     Schema schema =
@@ -738,7 +743,7 @@ public class SegmentProcessorFrameworkTest {
     // output size threshold configured).
 
     expectedTotalDocsCount = 52;
-    recordReaderFileConfig = new RecordReaderFileConfig(FileFormat.CSV, new 
File(resource.toURI()), null, null);
+    recordReaderFileConfig = new RecordReaderFileConfig(FileFormat.CSV, new 
File(resource.toURI()), null, null, null);
 
     segmentConfig = new 
SegmentConfig.Builder().setIntermediateFileSizeThreshold(19).setSegmentNamePrefix("testPrefix")
         .setSegmentNamePostfix("testPostfix").build();
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReaderFileConfig.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReaderFileConfig.java
index 51e4ed0cfb..e7566cb0ff 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReaderFileConfig.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReaderFileConfig.java
@@ -24,10 +24,8 @@ import javax.annotation.Nullable;
 
 
 /**
- * Wraps RecordReader info to instantiate a reader. Users can either pass in 
the
- * RecordReader instance directly or the info required to initialize the 
RecordReader, so that the
- * RecordReader can be initialized just when its about to be used, which 
avoids early/eager
- * initialization/memory allocation.
+ * Placeholder for all RecordReader configs. Manages the lifecycle of a 
RecordReader by initing/closing within the
+ * Segment creation framework.
  */
 public class RecordReaderFileConfig {
   public final FileFormat _fileFormat;
@@ -44,20 +42,22 @@ public class RecordReaderFileConfig {
 
   // Pass in the info needed to initialize the reader
   public RecordReaderFileConfig(FileFormat fileFormat, File dataFile, 
Set<String> fieldsToRead,
-      @Nullable RecordReaderConfig recordReaderConfig) {
+      @Nullable RecordReaderConfig recordReaderConfig, @Nullable RecordReader 
recordReader) {
     _fileFormat = fileFormat;
     _dataFile = dataFile;
     _fieldsToRead = fieldsToRead;
     _recordReaderConfig = recordReaderConfig;
-    _recordReader = null;
-    // This is not a delegate RecordReader i.e. RecordReaderFileConfig owns 
the RecordReader, so it should be closed
-    // by RecordReaderFileConfig as well.
+    // Users can pass in custom readers
+    _recordReader = recordReader;
+    // RecordReaderFileConfig owns the lifecycle of RecordReader, to be inited 
and closed.
     _isDelegateReader = false;
     _isRecordReaderInitialized = false;
     _isRecordReaderClosed = false;
   }
 
-  // Pass in the reader instance directly
+  // Keeping this for backwards compatibility. We want the lifecycle of the 
reader to be managed internally
+  // (inited/closed) by SegmentProcessorFramework.
+  @Deprecated
   public RecordReaderFileConfig(RecordReader recordReader) {
     _recordReader = recordReader;
     _fileFormat = null;
@@ -76,7 +76,12 @@ public class RecordReaderFileConfig {
   public RecordReader getRecordReader()
       throws Exception {
     if (!_isRecordReaderInitialized) {
-      _recordReader = RecordReaderFactory.getRecordReader(_fileFormat, 
_dataFile, _fieldsToRead, _recordReaderConfig);
+      if (_recordReader == null) {
+        // Record reader instance to be created and inited
+        _recordReader = RecordReaderFactory.getRecordReader(_fileFormat, 
_dataFile, _fieldsToRead, _recordReaderConfig);
+      } else {
+        _recordReader.init(_dataFile, _fieldsToRead, _recordReaderConfig);
+      }
       _isRecordReaderInitialized = true;
     }
     return _recordReader;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to