KKcorps commented on code in PR #13913:
URL: https://github.com/apache/pinot/pull/13913#discussion_r1743338028


##########
pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReader.java:
##########
@@ -202,158 +207,139 @@ public Map<String, Integer> getCSVHeaderMap() {
 
   @Override
   public boolean hasNext() {
-    if (_useLineIterator) {
-      // When line iterator is used, the call to this method won't throw an 
exception. The default and the only iterator
-      // from commons-csv library can throw an exception upon calling the 
hasNext() method. The line iterator overcomes
-      // this limitation.
-      return readNextRecord();
-    }
     return _iterator.hasNext();
   }
 
   @Override
   public GenericRow next()
       throws IOException {
-    if (_useLineIterator) {
-      return _nextRecord;
-    } else {
-      return next(new GenericRow());
-    }
+    return next(new GenericRow());
   }
 
   @Override
   public GenericRow next(GenericRow reuse)
       throws IOException {
-    if (_useLineIterator) {
-      reuse.init(_nextRecord);
-    } else {
-      CSVRecord record = _iterator.next();
-      _recordExtractor.extract(record, reuse);
-    }
+    CSVRecord record = _iterator.next();
+    _recordExtractor.extract(record, reuse);
     return reuse;
   }
 
   @Override
   public void rewind()
       throws IOException {
-    if (_useLineIterator) {
-      resetLineIteratorResources();
-    }
-
     if (_parser != null && !_parser.isClosed()) {
       _parser.close();
     }
-
-    init();
+    closeIterator();
+    initIterator();
   }
 
   @Override
   public void close()
       throws IOException {
-    if (_useLineIterator) {
-      resetLineIteratorResources();
-    }
+    closeIterator();
 
     if (_parser != null && !_parser.isClosed()) {
       _parser.close();
     }
   }
 
-  private boolean readNextRecord() {
-    try {
-      _nextRecord = null;
-      GenericRow genericRow = new GenericRow();
-      readNextLine(genericRow);
-      _nextRecord = genericRow;
-    } catch (Exception e) {
-      LOGGER.info("Error parsing next record.", e);
+  private void closeIterator()
+      throws IOException {
+    // if header is not provided by the client it would be rebuilt. When it's 
provided by the client it's initialized
+    // once in the constructor
+    if (useLineIterator(_config) && _config.getHeader() == null) {
+      _headerMap.clear();
+    }
+
+    if (_bufferedReader != null) {
+      _bufferedReader.close();
     }
-    return _nextRecord != null;
   }
 
-  private void readNextLine(GenericRow reuse)
-      throws IOException {
-    while (_nextLine != null) {
-      try (Reader reader = new StringReader(_nextLine)) {
-        try (CSVParser csvParser = _format.parse(reader)) {
-          List<CSVRecord> csvRecords = csvParser.getRecords();
-          if (csvRecords != null && csvRecords.size() > 0) {
-            // There would be only one record as lines are read one after the 
other
-            CSVRecord record = csvRecords.get(0);
-            _recordExtractor.extract(record, reuse);
-            break;
-          } else {
-            // Can be thrown on: 1) Empty lines 2) Commented lines
-            throw new NoSuchElementException("Failed to find any records");
+  class LineIterator implements Iterator<CSVRecord> {
+    private String _nextLine;
+    private CSVRecord _current;
+
+    public LineIterator() {
+      init();
+    }
+
+    private void init() {
+      try {
+        if (_config.getHeader() != null) {
+          if (_config.isSkipHeader()) {
+            // When skip header config is set and header is supplied – skip 
the first line from the input file
+            _bufferedReader.readLine();
+            // turn off the property so that it doesn't interfere with further 
parsing
+            _format = _format.builder().setSkipHeaderRecord(false).build();
           }
-        } catch (Exception e) {
-          _skippedLinesCount++;
-          LOGGER.debug("Skipped input line: {} from file: {}", _nextLine, 
_dataFile, e);
-          // Find the next line that can be parsed
-          _nextLine = _bufferedReader.readLine();
+        } else {
+          // read the first line
+          String headerLine = _bufferedReader.readLine();
+          _headerMap = parseHeaderMapFromLine(_format, headerLine);
+          // If header isn't provided, the first line would be set as header 
and the 'skipHeader' property
+          // is set to false.
+          _format = _format.builder()
+              .setSkipHeaderRecord(false)
+              .setHeader(_headerMap.keySet().toArray(new String[0]))
+              .build();
         }
+        _nextLine = _bufferedReader.readLine();
+      } catch (IOException e) {
+        throw new RuntimeException(e);
       }
     }
-    if (_nextLine != null) {
-      // Advance the pointer to the next line for future reading
-      _nextLine = _bufferedReader.readLine();
-    } else {
-      throw new RuntimeException("No more parseable lines. Line iterator 
reached end of file.");
-    }
-  }
 
-  private Map<String, Integer> parseLineAsHeader(String line)
-      throws IOException {
-    Map<String, Integer> headerMap;
-    try (StringReader stringReader = new StringReader(line)) {
-      try (CSVParser parser = _format.parse(stringReader)) {
-        headerMap = parser.getHeaderMap();
+    private CSVRecord getNextRecord() {

Review Comment:
   Should we optimise this method to not initialise stringReader every time 
next is called
   or is that out of scope?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to