This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new c3e348693f CSVRecordReader. Refactor init and line iterator parsing 
logic (#13913)
c3e348693f is described below

commit c3e348693fe4b27a01e088be96160904cbb66ab5
Author: Suvodeep Pyne <suvodeep-p...@users.noreply.github.com>
AuthorDate: Wed Sep 4 10:35:13 2024 -0700

    CSVRecordReader. Refactor init and line iterator parsing logic (#13913)
---
 .../plugin/inputformat/csv/CSVRecordReader.java    | 389 ++++++++++-----------
 .../inputformat/csv/CSVRecordReaderTest.java       |  41 +++
 .../pinot-csv/src/test/resources/dataFileBasic.csv |   5 +
 .../resources/dataFileWithUnparseableLines2.csv    |   5 +
 4 files changed, 237 insertions(+), 203 deletions(-)

diff --git 
a/pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReader.java
 
b/pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReader.java
index c4dc8c167f..daf6041a09 100644
--- 
a/pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReader.java
+++ 
b/pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReader.java
@@ -29,6 +29,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
+import java.util.Optional;
 import java.util.Set;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.NotThreadSafe;
@@ -58,112 +59,124 @@ public class CSVRecordReader implements RecordReader {
   private Iterator<CSVRecord> _iterator;
   private CSVRecordExtractor _recordExtractor;
   private Map<String, Integer> _headerMap = new HashMap<>();
-  private boolean _isHeaderProvided = false;
 
-  // line iterator specific variables
-  private boolean _useLineIterator = false;
-  private boolean _skipHeaderRecord = false;
-  private long _skippedLinesCount;
   private BufferedReader _bufferedReader;
-  private String _nextLine;
-  private GenericRow _nextRecord;
+  private CSVRecordReaderConfig _config = null;
 
   public CSVRecordReader() {
   }
 
-  @Override
-  public void init(File dataFile, @Nullable Set<String> fieldsToRead, 
@Nullable RecordReaderConfig recordReaderConfig)
-      throws IOException {
-    _dataFile = dataFile;
-    CSVRecordReaderConfig config = (CSVRecordReaderConfig) recordReaderConfig;
-    Character multiValueDelimiter = null;
-    if (config == null) {
-      _format = 
CSVFormat.DEFAULT.builder().setDelimiter(CSVRecordReaderConfig.DEFAULT_DELIMITER).setHeader().build();
-      multiValueDelimiter = 
CSVRecordReaderConfig.DEFAULT_MULTI_VALUE_DELIMITER;
-    } else {
-      CSVFormat format;
-      String formatString = config.getFileFormat();
-      if (formatString == null) {
-        format = CSVFormat.DEFAULT;
-      } else {
-        switch (formatString.toUpperCase()) {
-          case "EXCEL":
-            format = CSVFormat.EXCEL;
-            break;
-          case "MYSQL":
-            format = CSVFormat.MYSQL;
-            break;
-          case "RFC4180":
-            format = CSVFormat.RFC4180;
-            break;
-          case "TDF":
-            format = CSVFormat.TDF;
-            break;
-          default:
-            format = CSVFormat.DEFAULT;
-            break;
-        }
-      }
-      char delimiter = config.getDelimiter();
-      format = format.builder().setDelimiter(delimiter).build();
+  private static CSVFormat baseCsvFormat(CSVRecordReaderConfig config) {
+    if (config.getFileFormat() == null) {
+      return CSVFormat.DEFAULT;
+    }
+    switch (config.getFileFormat().toUpperCase()) {
+      case "EXCEL":
+        return CSVFormat.EXCEL;
+      case "MYSQL":
+        return CSVFormat.MYSQL;
+      case "RFC4180":
+        return CSVFormat.RFC4180;
+      case "TDF":
+        return CSVFormat.TDF;
+      default:
+        return CSVFormat.DEFAULT;
+    }
+  }
 
-      if (config.isSkipUnParseableLines()) {
-        _useLineIterator = true;
+  private static Map<String, Integer> parseHeaderMapFromLine(CSVFormat format, 
String line) {
+    try (StringReader stringReader = new StringReader(line)) {
+      try (CSVParser parser = format.parse(stringReader)) {
+        return parser.getHeaderMap();
       }
+    } catch (IOException e) {
+      throw new RuntimeException("Failed to parse header from line: " + line, 
e);
+    }
+  }
 
-      _isHeaderProvided = config.getHeader() != null;
-      _skipHeaderRecord = config.isSkipHeader();
-      _format = format.builder()
-          .setHeader()
-          .setSkipHeaderRecord(config.isSkipHeader())
-          .setCommentMarker(config.getCommentMarker())
-          .setEscape(config.getEscapeCharacter())
-          .setIgnoreEmptyLines(config.isIgnoreEmptyLines())
-          .setIgnoreSurroundingSpaces(config.isIgnoreSurroundingSpaces())
-          .setQuote(config.getQuoteCharacter())
-          .build();
-
-      if (config.getQuoteMode() != null) {
-        _format = 
_format.builder().setQuoteMode(QuoteMode.valueOf(config.getQuoteMode())).build();
-      }
+  private static Character getMultiValueDelimiter(CSVRecordReaderConfig 
config) {
+    if (config == null) {
+      return CSVRecordReaderConfig.DEFAULT_MULTI_VALUE_DELIMITER;
+    } else if (config.isMultiValueDelimiterEnabled()) {
+      return config.getMultiValueDelimiter();
+    }
+    return null;
+  }
 
-      if (config.getRecordSeparator() != null) {
-        _format = 
_format.builder().setRecordSeparator(config.getRecordSeparator()).build();
-      }
+  private static boolean useLineIterator(CSVRecordReaderConfig config) {
+    return config != null && config.isSkipUnParseableLines();
+  }
 
-      String nullString = config.getNullStringValue();
-      if (nullString != null) {
-        _format = _format.builder().setNullString(nullString).build();
-      }
+  @Override
+  public void init(File dataFile, @Nullable Set<String> fieldsToRead, 
@Nullable RecordReaderConfig recordReaderConfig)
+      throws IOException {
+    _dataFile = dataFile;
+    _config = (CSVRecordReaderConfig) recordReaderConfig;
+    _format = createCSVFormat();
+
+    // If header is provided by the client, use it. Otherwise, parse the 
header from the first line of the file.
+    // Overwrite the format with the header information.
+    
Optional.ofNullable(_config).map(CSVRecordReaderConfig::getHeader).ifPresent(header
 -> {
+      _headerMap = parseHeaderMapFromLine(_format, header);
+      _format = _format.builder().setHeader(_headerMap.keySet().toArray(new 
String[0])).build();
+    });
+
+    validateHeaderWithDelimiter();
+    initIterator();
+    initRecordExtractor(fieldsToRead);
+  }
 
-      if (_isHeaderProvided) {
-        _headerMap = parseLineAsHeader(config.getHeader());
-        _format = _format.builder().setHeader(_headerMap.keySet().toArray(new 
String[0])).build();
-        if (!_useLineIterator) {
-          validateHeaderForDelimiter(delimiter, config.getHeader(), _format);
-        }
-      }
+  private void initRecordExtractor(Set<String> fieldsToRead) {
+    final CSVRecordExtractorConfig recordExtractorConfig = new 
CSVRecordExtractorConfig();
+    
recordExtractorConfig.setMultiValueDelimiter(getMultiValueDelimiter(_config));
+    recordExtractorConfig.setColumnNames(_headerMap.keySet());
+    _recordExtractor = new CSVRecordExtractor();
+    _recordExtractor.init(fieldsToRead, recordExtractorConfig);
+  }
 
-      if (config.isMultiValueDelimiterEnabled()) {
-        multiValueDelimiter = config.getMultiValueDelimiter();
-      }
+  private CSVFormat createCSVFormat() {
+    if (_config == null) {
+      return 
CSVFormat.DEFAULT.builder().setDelimiter(CSVRecordReaderConfig.DEFAULT_DELIMITER).setHeader().build();
     }
-    _recordExtractor = new CSVRecordExtractor();
 
-    init();
+    final CSVFormat.Builder builder = baseCsvFormat(_config).builder()
+        .setDelimiter(_config.getDelimiter())
+        .setHeader()
+        .setSkipHeaderRecord(_config.isSkipHeader())
+        .setCommentMarker(_config.getCommentMarker())
+        .setEscape(_config.getEscapeCharacter())
+        .setIgnoreEmptyLines(_config.isIgnoreEmptyLines())
+        .setIgnoreSurroundingSpaces(_config.isIgnoreSurroundingSpaces())
+        .setQuote(_config.getQuoteCharacter());
+
+    
Optional.ofNullable(_config.getQuoteMode()).map(QuoteMode::valueOf).ifPresent(builder::setQuoteMode);
+    
Optional.ofNullable(_config.getRecordSeparator()).ifPresent(builder::setRecordSeparator);
+    
Optional.ofNullable(_config.getNullStringValue()).ifPresent(builder::setNullString);
+
+    return builder.build();
+  }
 
-    CSVRecordExtractorConfig recordExtractorConfig = new 
CSVRecordExtractorConfig();
-    recordExtractorConfig.setMultiValueDelimiter(multiValueDelimiter);
-    recordExtractorConfig.setColumnNames(_headerMap.keySet());
-    _recordExtractor.init(fieldsToRead, recordExtractorConfig);
+  private void initIterator()
+      throws IOException {
+    if (useLineIterator(_config)) {
+      _bufferedReader = new BufferedReader(new FileReader(_dataFile), 1024 * 
32); // 32KB buffer size
+      _iterator = new LineIterator();
+    } else {
+      _parser = _format.parse(RecordReaderUtils.getBufferedReader(_dataFile));
+      _headerMap = _parser.getHeaderMap();
+      _iterator = _parser.iterator();
+    }
   }
 
-  private void validateHeaderForDelimiter(char delimiter, String csvHeader, 
CSVFormat format)
+  private void validateHeaderWithDelimiter()
       throws IOException {
-    CSVParser parser = 
format.parse(RecordReaderUtils.getBufferedReader(_dataFile));
-    Iterator<CSVRecord> iterator = parser.iterator();
-    if (iterator.hasNext() && recordHasMultipleValues(iterator.next()) && 
delimiterNotPresentInHeader(delimiter,
-        csvHeader)) {
+    if (_config == null || _config.getHeader() == null || 
useLineIterator(_config)) {
+      return;
+    }
+    final CSVParser parser = 
_format.parse(RecordReaderUtils.getBufferedReader(_dataFile));
+    final Iterator<CSVRecord> iterator = parser.iterator();
+    if (iterator.hasNext() && recordHasMultipleValues(iterator.next()) && 
delimiterNotPresentInHeader(
+        _config.getDelimiter(), _config.getHeader())) {
       throw new IllegalArgumentException("Configured header does not contain 
the configured delimiter");
     }
   }
@@ -176,17 +189,6 @@ public class CSVRecordReader implements RecordReader {
     return !StringUtils.contains(csvHeader, delimiter);
   }
 
-  private void init()
-      throws IOException {
-    if (_useLineIterator) {
-      initLineIteratorResources();
-      return;
-    }
-    _parser = _format.parse(RecordReaderUtils.getBufferedReader(_dataFile));
-    _headerMap = _parser.getHeaderMap();
-    _iterator = _parser.iterator();
-  }
-
   /**
    * Returns a copy of the header map that iterates in column order.
    * <p>
@@ -202,158 +204,139 @@ public class CSVRecordReader implements RecordReader {
 
   @Override
   public boolean hasNext() {
-    if (_useLineIterator) {
-      // When line iterator is used, the call to this method won't throw an 
exception. The default and the only iterator
-      // from commons-csv library can throw an exception upon calling the 
hasNext() method. The line iterator overcomes
-      // this limitation.
-      return readNextRecord();
-    }
     return _iterator.hasNext();
   }
 
   @Override
   public GenericRow next()
       throws IOException {
-    if (_useLineIterator) {
-      return _nextRecord;
-    } else {
-      return next(new GenericRow());
-    }
+    return next(new GenericRow());
   }
 
   @Override
   public GenericRow next(GenericRow reuse)
       throws IOException {
-    if (_useLineIterator) {
-      reuse.init(_nextRecord);
-    } else {
-      CSVRecord record = _iterator.next();
-      _recordExtractor.extract(record, reuse);
-    }
+    CSVRecord record = _iterator.next();
+    _recordExtractor.extract(record, reuse);
     return reuse;
   }
 
   @Override
   public void rewind()
       throws IOException {
-    if (_useLineIterator) {
-      resetLineIteratorResources();
-    }
-
     if (_parser != null && !_parser.isClosed()) {
       _parser.close();
     }
-
-    init();
+    closeIterator();
+    initIterator();
   }
 
   @Override
   public void close()
       throws IOException {
-    if (_useLineIterator) {
-      resetLineIteratorResources();
-    }
+    closeIterator();
 
     if (_parser != null && !_parser.isClosed()) {
       _parser.close();
     }
   }
 
-  private boolean readNextRecord() {
-    try {
-      _nextRecord = null;
-      GenericRow genericRow = new GenericRow();
-      readNextLine(genericRow);
-      _nextRecord = genericRow;
-    } catch (Exception e) {
-      LOGGER.info("Error parsing next record.", e);
+  private void closeIterator()
+      throws IOException {
+    // if header is not provided by the client it would be rebuilt. When it's 
provided by the client it's initialized
+    // once in the constructor
+    if (useLineIterator(_config) && _config.getHeader() == null) {
+      _headerMap.clear();
+    }
+
+    if (_bufferedReader != null) {
+      _bufferedReader.close();
     }
-    return _nextRecord != null;
   }
 
-  private void readNextLine(GenericRow reuse)
-      throws IOException {
-    while (_nextLine != null) {
-      try (Reader reader = new StringReader(_nextLine)) {
-        try (CSVParser csvParser = _format.parse(reader)) {
-          List<CSVRecord> csvRecords = csvParser.getRecords();
-          if (csvRecords != null && csvRecords.size() > 0) {
-            // There would be only one record as lines are read one after the 
other
-            CSVRecord record = csvRecords.get(0);
-            _recordExtractor.extract(record, reuse);
-            break;
-          } else {
-            // Can be thrown on: 1) Empty lines 2) Commented lines
-            throw new NoSuchElementException("Failed to find any records");
+  class LineIterator implements Iterator<CSVRecord> {
+    private String _nextLine;
+    private CSVRecord _current;
+
+    public LineIterator() {
+      init();
+    }
+
+    private void init() {
+      try {
+        if (_config.getHeader() != null) {
+          if (_config.isSkipHeader()) {
+            // When skip header config is set and header is supplied – skip 
the first line from the input file
+            _bufferedReader.readLine();
+            // turn off the property so that it doesn't interfere with further 
parsing
+            _format = _format.builder().setSkipHeaderRecord(false).build();
           }
-        } catch (Exception e) {
-          _skippedLinesCount++;
-          LOGGER.debug("Skipped input line: {} from file: {}", _nextLine, 
_dataFile, e);
-          // Find the next line that can be parsed
-          _nextLine = _bufferedReader.readLine();
+        } else {
+          // read the first line
+          String headerLine = _bufferedReader.readLine();
+          _headerMap = parseHeaderMapFromLine(_format, headerLine);
+          // If header isn't provided, the first line would be set as header 
and the 'skipHeader' property
+          // is set to false.
+          _format = _format.builder()
+              .setSkipHeaderRecord(false)
+              .setHeader(_headerMap.keySet().toArray(new String[0]))
+              .build();
         }
+        _nextLine = _bufferedReader.readLine();
+      } catch (IOException e) {
+        throw new RuntimeException(e);
       }
     }
-    if (_nextLine != null) {
-      // Advance the pointer to the next line for future reading
-      _nextLine = _bufferedReader.readLine();
-    } else {
-      throw new RuntimeException("No more parseable lines. Line iterator 
reached end of file.");
-    }
-  }
 
-  private Map<String, Integer> parseLineAsHeader(String line)
-      throws IOException {
-    Map<String, Integer> headerMap;
-    try (StringReader stringReader = new StringReader(line)) {
-      try (CSVParser parser = _format.parse(stringReader)) {
-        headerMap = parser.getHeaderMap();
+    private CSVRecord getNextRecord() {
+      while (_nextLine != null) {
+        try (Reader reader = new StringReader(_nextLine)) {
+          try (CSVParser csvParser = _format.parse(reader)) {
+            List<CSVRecord> csvRecords = csvParser.getRecords();
+            if (csvRecords == null || csvRecords.isEmpty()) {
+              // Can be thrown on: 1) Empty lines 2) Commented lines
+              throw new NoSuchElementException("Failed to find any records");
+            }
+            // There would be only one record as lines are read one after the 
other
+            CSVRecord csvRecord = csvRecords.get(0);
+
+            // move the pointer to the next line
+            _nextLine = _bufferedReader.readLine();
+            return csvRecord;
+          } catch (Exception e) {
+            // Find the next line that can be parsed
+            _nextLine = _bufferedReader.readLine();
+          }
+        } catch (IOException e) {
+          throw new RuntimeException(e);
+        }
       }
+      return null;
     }
-    return headerMap;
-  }
 
-  private void initLineIteratorResources()
-      throws IOException {
-    _bufferedReader = new BufferedReader(new FileReader(_dataFile), 1024 * 
32); // 32KB buffer size
-
-    // When header is supplied by the client
-    if (_isHeaderProvided) {
-      if (_skipHeaderRecord) {
-        // When skip header config is set and header is supplied – skip the 
first line from the input file
-        _bufferedReader.readLine();
-        // turn off the property so that it doesn't interfere with further 
parsing
-        _format = _format.builder().setSkipHeaderRecord(false).build();
+    @Override
+    public boolean hasNext() {
+      if (_current == null) {
+        _current = getNextRecord();
       }
-    } else {
-      // read the first line
-      String headerLine = _bufferedReader.readLine();
-      _headerMap = parseLineAsHeader(headerLine);
-      _format = _format.builder()
-          // If header isn't provided, the first line would be set as header 
and the 'skipHeader' property
-          // is set to false.
-          .setSkipHeaderRecord(false)
-          .setHeader(_headerMap.keySet().toArray(new String[0]))
-          .build();
-    }
-    _nextLine = _bufferedReader.readLine();
-  }
 
-  private void resetLineIteratorResources()
-      throws IOException {
-    _nextLine = null;
+      return _current != null;
+    }
 
-    LOGGER.info("Total lines skipped in file: {} were: {}", _dataFile, 
_skippedLinesCount);
-    _skippedLinesCount = 0;
+    @Override
+    public CSVRecord next() {
+      CSVRecord next = _current;
+      _current = null;
 
-    // if header is not provided by the client it would be rebuilt. When it's 
provided by the client it's initialized
-    // once in the constructor
-    if (!_isHeaderProvided) {
-      _headerMap.clear();
-    }
+      if (next == null) {
+        // hasNext() wasn't called before
+        next = getNextRecord();
+        if (next == null) {
+          throw new NoSuchElementException("No more CSV records available");
+        }
+      }
 
-    if (_bufferedReader != null) {
-      _bufferedReader.close();
+      return next;
     }
   }
 }
diff --git 
a/pinot-plugins/pinot-input-format/pinot-csv/src/test/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReaderTest.java
 
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReaderTest.java
index d245fb33c0..6a1d86a48d 100644
--- 
a/pinot-plugins/pinot-input-format/pinot-csv/src/test/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReaderTest.java
+++ 
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReaderTest.java
@@ -324,6 +324,47 @@ public class CSVRecordReaderTest extends 
AbstractRecordReaderTest {
     Assert.assertEquals(1, genericRows.size());
   }
 
+  @Test
+  public void testLineIteratorReadingDataFileWithUnparseableLinesWithRewind()
+      throws URISyntaxException, IOException {
+    URI uri = 
ClassLoader.getSystemResource("dataFileWithUnparseableLines2.csv").toURI();
+    File dataFile = new File(uri);
+
+    CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig();
+    readerConfig.setSkipUnParseableLines(true);
+    final List<GenericRow> genericRows1 = readCSVRecords(dataFile, 
readerConfig, null, true);
+    Assert.assertEquals(3, genericRows1.size());
+
+    // Start reading again; results should be same
+    final List<GenericRow> genericRows2 = readCSVRecords(dataFile, 
readerConfig, null, true);
+    Assert.assertEquals(3, genericRows2.size());
+
+    // Check that the rows are the same
+    for (int i = 0; i < genericRows1.size(); i++) {
+      Assert.assertEquals(genericRows1.get(i), genericRows2.get(i));
+    }
+  }
+
+  @Test
+  public void testReadingDataFileWithRewind()
+      throws URISyntaxException, IOException {
+    URI uri = ClassLoader.getSystemResource("dataFileBasic.csv").toURI();
+    File dataFile = new File(uri);
+
+    CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig();
+    List<GenericRow> genericRows1 = readCSVRecords(dataFile, readerConfig, 
null, true);
+    Assert.assertEquals(4, genericRows1.size());
+
+    // Start reading again; results should be same
+    List<GenericRow> genericRows2 = readCSVRecords(dataFile, readerConfig, 
null, true);
+    Assert.assertEquals(4, genericRows2.size());
+
+    // Check that the rows are the same
+    for (int i = 0; i < genericRows1.size(); i++) {
+      Assert.assertEquals(genericRows1.get(i), genericRows2.get(i));
+    }
+  }
+
   @Test (expectedExceptions = RuntimeException.class)
   public void 
testDefaultCsvReaderExceptionReadingDataFileWithUnparseableLines()
       throws URISyntaxException, IOException {
diff --git 
a/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileBasic.csv
 
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileBasic.csv
new file mode 100644
index 0000000000..c2b0fe3262
--- /dev/null
+++ 
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileBasic.csv
@@ -0,0 +1,5 @@
+id,name
+"100","John"
+"101","Jane"
+"102","Alice"
+"103","Bob"
diff --git 
a/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithUnparseableLines2.csv
 
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithUnparseableLines2.csv
new file mode 100644
index 0000000000..80e9a736c3
--- /dev/null
+++ 
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithUnparseableLines2.csv
@@ -0,0 +1,5 @@
+id,name
+"100","John"s"
+"101","Jane"
+"102","Alice"
+"103","Bob"


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to