This is an automated email from the ASF dual-hosted git repository.

kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new d03241acf9 Re-implement CSV record reader to skip unparseable lines 
(#14396)
d03241acf9 is described below

commit d03241acf9754bc42f030e43fc5e513ccddf9e9c
Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com>
AuthorDate: Fri Nov 8 01:07:17 2024 -0800

    Re-implement CSV record reader to skip unparseable lines (#14396)
---
 .../plugin/inputformat/csv/CSVRecordReader.java    | 377 ++++-----
 .../inputformat/csv/CSVRecordReaderConfig.java     |  19 +-
 .../inputformat/csv/CSVRecordReaderTest.java       | 921 ++++++++++-----------
 .../pinot-csv/src/test/resources/dataFileBasic.csv |   8 +-
 .../pinot-csv/src/test/resources/dataFileEmpty.csv |   0
 .../resources/dataFileWithAlternateDelimiter.csv   |   2 +-
 .../src/test/resources/dataFileWithCustomNull.csv  |   5 +
 .../test/resources/dataFileWithMultiLineValues.csv |   4 +
 .../resources/dataFileWithMultipleCombinations.csv |  10 +-
 .../dataFileWithMultipleCombinationsParseable.csv  |  15 -
 ....csv => dataFileWithNoHeaderAndEmptyValues.csv} |   0
 .../test/resources/dataFileWithQuotedHeaders.csv   |   3 -
 ...WithValidHeaders.csv => dataFileWithQuotes.csv} |   0
 .../test/resources/dataFileWithSingleColumn.csv    |   4 +
 .../resources/dataFileWithSpaceAroundHeaders.csv   |   4 -
 .../resources/dataFileWithSurroundingSpaces.csv    |   4 +
 ...es.csv => dataFileWithUnparseableFirstLine.csv} |   0
 .../resources/dataFileWithUnparseableLastLine.csv  |   3 +
 ...leBasic.csv => dataFileWithUnparseableLine.csv} |   3 +-
 .../resources/dataFileWithUnparseableLines2.csv    |   5 -
 .../pinot/spi/data/readers/RecordReader.java       |   5 +
 .../spi/data/readers/RecordReaderFileConfig.java   |   9 +-
 22 files changed, 654 insertions(+), 747 deletions(-)

diff --git 
a/pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReader.java
 
b/pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReader.java
index 636574a19b..68958480f9 100644
--- 
a/pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReader.java
+++ 
b/pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReader.java
@@ -20,16 +20,12 @@ package org.apache.pinot.plugin.inputformat.csv;
 
 import java.io.BufferedReader;
 import java.io.File;
-import java.io.FileReader;
 import java.io.IOException;
-import java.io.Reader;
-import java.io.StringReader;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.NoSuchElementException;
-import java.util.Optional;
 import java.util.Set;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.NotThreadSafe;
@@ -53,284 +49,199 @@ import org.slf4j.LoggerFactory;
 public class CSVRecordReader implements RecordReader {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(CSVRecordReader.class);
 
-  private File _dataFile;
-  private CSVFormat _format;
-  private CSVParser _parser;
-  private Iterator<CSVRecord> _iterator;
-  private CSVRecordExtractor _recordExtractor;
-  private Map<String, Integer> _headerMap = new HashMap<>();
+  private static final Map<String, CSVFormat> CSV_FORMAT_MAP = new HashMap<>();
 
-  private BufferedReader _bufferedReader;
-  private CSVRecordReaderConfig _config = null;
+  static {
+    for (CSVFormat.Predefined format : CSVFormat.Predefined.values()) {
+      CSV_FORMAT_MAP.put(canonicalize(format.name()), format.getFormat());
+    }
+  }
 
-  public CSVRecordReader() {
+  private static String canonicalize(String format) {
+    return StringUtils.remove(format, '_').toUpperCase();
   }
 
-  private static CSVFormat baseCsvFormat(CSVRecordReaderConfig config) {
-    if (config.getFileFormat() == null) {
+  private static CSVFormat getCSVFormat(@Nullable String format) {
+    if (format == null) {
       return CSVFormat.DEFAULT;
     }
-    switch (config.getFileFormat().toUpperCase()) {
-      case "EXCEL":
-        return CSVFormat.EXCEL;
-      case "MYSQL":
-        return CSVFormat.MYSQL;
-      case "RFC4180":
-        return CSVFormat.RFC4180;
-      case "TDF":
-        return CSVFormat.TDF;
-      default:
-        return CSVFormat.DEFAULT;
-    }
-  }
-
-  private static Map<String, Integer> parseHeaderMapFromLine(CSVFormat format, 
String line) {
-    try (StringReader stringReader = new StringReader(line)) {
-      try (CSVParser parser = format.parse(stringReader)) {
-        return parser.getHeaderMap();
-      }
-    } catch (IOException e) {
-      throw new RuntimeException("Failed to parse header from line: " + line, 
e);
+    CSVFormat csvFormat = CSV_FORMAT_MAP.get(canonicalize(format));
+    if (csvFormat != null) {
+      return csvFormat;
+    } else {
+      LOGGER.warn("Failed to find CSV format for: {}, using DEFAULT format", 
format);
+      return CSVFormat.DEFAULT;
     }
   }
 
-  private static Character getMultiValueDelimiter(CSVRecordReaderConfig 
config) {
-    if (config == null) {
-      return CSVRecordReaderConfig.DEFAULT_MULTI_VALUE_DELIMITER;
-    } else if (config.isMultiValueDelimiterEnabled()) {
-      return config.getMultiValueDelimiter();
-    }
-    return null;
-  }
+  private File _dataFile;
+  private CSVRecordReaderConfig _config;
+  private CSVFormat _format;
+  private BufferedReader _reader;
+  private CSVParser _parser;
+  private List<String> _columns;
+  private Iterator<CSVRecord> _iterator;
+  private CSVRecordExtractor _recordExtractor;
 
-  private static boolean useLineIterator(CSVRecordReaderConfig config) {
-    return config != null && config.isSkipUnParseableLines();
-  }
+  // Following fields are used to handle exceptions in hasNext() method
+  private int _nextLineId;
+  private int _numSkippedLines;
+  private RuntimeException _exceptionInHasNext;
+  private CSVFormat _recoveryFormat;
 
   @Override
   public void init(File dataFile, @Nullable Set<String> fieldsToRead, 
@Nullable RecordReaderConfig recordReaderConfig)
       throws IOException {
     _dataFile = dataFile;
-    _config = (CSVRecordReaderConfig) recordReaderConfig;
-    _format = createCSVFormat();
-
-    // If header is provided by the client, use it. Otherwise, parse the 
header from the first line of the file.
-    // Overwrite the format with the header information.
-    
Optional.ofNullable(_config).map(CSVRecordReaderConfig::getHeader).ifPresent(header
 -> {
-      _headerMap = parseHeaderMapFromLine(_format, header);
-      _format = _format.builder().setHeader(_headerMap.keySet().toArray(new 
String[0])).build();
-    });
-
-    validateHeaderWithDelimiter();
-    initIterator();
-    initRecordExtractor(fieldsToRead);
-  }
-
-  private void initRecordExtractor(Set<String> fieldsToRead) {
-    final CSVRecordExtractorConfig recordExtractorConfig = new 
CSVRecordExtractorConfig();
-    
recordExtractorConfig.setMultiValueDelimiter(getMultiValueDelimiter(_config));
-    recordExtractorConfig.setColumnNames(_headerMap.keySet());
-    _recordExtractor = new CSVRecordExtractor();
-    _recordExtractor.init(fieldsToRead, recordExtractorConfig);
-  }
-
-  private CSVFormat createCSVFormat() {
-    if (_config == null) {
-      return 
CSVFormat.DEFAULT.builder().setDelimiter(CSVRecordReaderConfig.DEFAULT_DELIMITER).setHeader().build();
+    _config = recordReaderConfig != null ? (CSVRecordReaderConfig) 
recordReaderConfig : new CSVRecordReaderConfig();
+    _format = getCSVFormat();
+    _reader = RecordReaderUtils.getBufferedReader(_dataFile);
+    _parser = _format.parse(_reader);
+    _columns = _parser.getHeaderNames();
+    _iterator = _parser.iterator();
+    _recordExtractor = getRecordExtractor(fieldsToRead);
+    _nextLineId = (int) _parser.getCurrentLineNumber();
+
+    // Read the first record, and validate if the header uses the configured 
delimiter
+    // (address https://github.com/apache/pinot/issues/7187)
+    boolean hasNext;
+    try {
+      hasNext = _iterator.hasNext();
+    } catch (RuntimeException e) {
+      throw new IOException("Failed to read first record from file: " + 
_dataFile, e);
+    }
+    if (hasNext) {
+      CSVRecord record = _iterator.next();
+      if (record.size() > 1 && _columns.size() <= 1) {
+        throw new IllegalStateException("Header does not contain the 
configured delimiter");
+      }
+      _reader.close();
+      _reader = RecordReaderUtils.getBufferedReader(_dataFile);
+      _parser = _format.parse(_reader);
+      _iterator = _parser.iterator();
     }
+  }
 
-    final CSVFormat.Builder builder = baseCsvFormat(_config).builder()
+  private CSVFormat getCSVFormat() {
+    CSVFormat.Builder builder = getCSVFormat(_config.getFileFormat()).builder()
+        .setHeader()  // Parse header from the file
         .setDelimiter(_config.getDelimiter())
-        .setHeader()
-        .setSkipHeaderRecord(_config.isSkipHeader())
-        .setCommentMarker(_config.getCommentMarker())
-        .setEscape(_config.getEscapeCharacter())
         .setIgnoreEmptyLines(_config.isIgnoreEmptyLines())
         .setIgnoreSurroundingSpaces(_config.isIgnoreSurroundingSpaces())
         .setQuote(_config.getQuoteCharacter());
-
-    
Optional.ofNullable(_config.getQuoteMode()).map(QuoteMode::valueOf).ifPresent(builder::setQuoteMode);
-    
Optional.ofNullable(_config.getRecordSeparator()).ifPresent(builder::setRecordSeparator);
-    
Optional.ofNullable(_config.getNullStringValue()).ifPresent(builder::setNullString);
-
-    return builder.build();
-  }
-
-  private void initIterator()
-      throws IOException {
-    if (useLineIterator(_config)) {
-      _bufferedReader = new BufferedReader(new FileReader(_dataFile), 1024 * 
32); // 32KB buffer size
-      _iterator = new LineIterator();
-    } else {
-      _parser = _format.parse(RecordReaderUtils.getBufferedReader(_dataFile));
-      _headerMap = _parser.getHeaderMap();
-      _iterator = _parser.iterator();
+    if (_config.getCommentMarker() != null) {
+      builder.setCommentMarker(_config.getCommentMarker());
     }
-  }
-
-  private void validateHeaderWithDelimiter()
-      throws IOException {
-    if (_config == null || _config.getHeader() == null || 
useLineIterator(_config)) {
-      return;
+    if (_config.getEscapeCharacter() != null) {
+      builder.setEscape(_config.getEscapeCharacter());
     }
-    final CSVParser parser = 
_format.parse(RecordReaderUtils.getBufferedReader(_dataFile));
-    final Iterator<CSVRecord> iterator = parser.iterator();
-    if (iterator.hasNext() && recordHasMultipleValues(iterator.next()) && 
delimiterNotPresentInHeader(
-        _config.getDelimiter(), _config.getHeader())) {
-      throw new IllegalArgumentException("Configured header does not contain 
the configured delimiter");
+    if (_config.getNullStringValue() != null) {
+      builder.setNullString(_config.getNullStringValue());
     }
+    if (_config.getQuoteMode() != null) {
+      builder.setQuoteMode(QuoteMode.valueOf(_config.getQuoteMode()));
+    }
+    if (_config.getRecordSeparator() != null) {
+      builder.setRecordSeparator(_config.getRecordSeparator());
+    }
+    CSVFormat format = builder.build();
+    String header = _config.getHeader();
+    if (header == null) {
+      return format;
+    }
+    // Parse header using the current format, and set it into the builder
+    try (CSVParser parser = CSVParser.parse(header, format)) {
+      format = builder.setHeader(parser.getHeaderNames().toArray(new 
String[0]))
+          .setSkipHeaderRecord(_config.isSkipHeader()).build();
+    } catch (IOException e) {
+      throw new RuntimeException("Failed to parse header from line: " + 
header, e);
+    }
+    return format;
   }
 
-  private boolean recordHasMultipleValues(CSVRecord record) {
-    return record.size() > 1;
-  }
-
-  private boolean delimiterNotPresentInHeader(char delimiter, String 
csvHeader) {
-    return !StringUtils.contains(csvHeader, delimiter);
+  private CSVRecordExtractor getRecordExtractor(@Nullable Set<String> 
fieldsToRead) {
+    CSVRecordExtractorConfig recordExtractorConfig = new 
CSVRecordExtractorConfig();
+    if (_config.isMultiValueDelimiterEnabled()) {
+      
recordExtractorConfig.setMultiValueDelimiter(_config.getMultiValueDelimiter());
+    }
+    recordExtractorConfig.setColumnNames(new HashSet<>(_columns));
+    CSVRecordExtractor recordExtractor = new CSVRecordExtractor();
+    recordExtractor.init(fieldsToRead, recordExtractorConfig);
+    return recordExtractor;
   }
 
-  /**
-   * Returns a copy of the header map that iterates in column order.
-   * <p>
-   * The map keys are column names. The map values are 0-based indices.
-   * </p>
-   * @return a copy of the header map that iterates in column order.
-   */
-  public Map<String, Integer> getCSVHeaderMap() {
-    // if header row is not configured and input file doesn't contain a valid 
header record, the returned map would
-    // contain values from the first row in the input file.
-    return _headerMap;
+  public List<String> getColumns() {
+    return _columns;
   }
 
   @Override
   public boolean hasNext() {
-    return _iterator.hasNext();
+    try {
+      return _iterator.hasNext();
+    } catch (RuntimeException e) {
+      if (_config.isStopOnError()) {
+        LOGGER.warn("Caught exception while reading CSV file: {}, stopping 
processing", _dataFile, e);
+        return false;
+      } else {
+        // Cache exception here and throw it in next() method
+        _exceptionInHasNext = e;
+        return true;
+      }
+    }
   }
 
   @Override
   public GenericRow next(GenericRow reuse)
       throws IOException {
+    if (_exceptionInHasNext != null) {
+      // When hasNext() throws an exception, recreate the reader and skip to 
the next line, then throw the exception
+      // TODO: This is very expensive. Consider marking the reader then reset 
it. The challenge here is that the reader
+      //       offset is not the same as parsed offset, and we need to mark at 
the correct offset.
+      _reader.close();
+      _reader = RecordReaderUtils.getBufferedReader(_dataFile);
+      _numSkippedLines = _nextLineId + 1;
+      for (int i = 0; i < _numSkippedLines; i++) {
+        _reader.readLine();
+      }
+      _nextLineId = _numSkippedLines;
+      // Create recovery format if not created yet. Recovery format has header 
preset, and does not skip header record.
+      if (_recoveryFormat == null) {
+        _recoveryFormat =
+            _format.builder().setHeader(_columns.toArray(new 
String[0])).setSkipHeaderRecord(false).build();
+      }
+      _parser = _recoveryFormat.parse(_reader);
+      _iterator = _parser.iterator();
+
+      RuntimeException exception = _exceptionInHasNext;
+      _exceptionInHasNext = null;
+      LOGGER.warn("Caught exception while reading CSV file: {}, recovering 
from line: {}", _dataFile, _numSkippedLines,
+          exception);
+
+      throw exception;
+    }
+
     CSVRecord record = _iterator.next();
     _recordExtractor.extract(record, reuse);
+    _nextLineId = _numSkippedLines + (int) _parser.getCurrentLineNumber();
     return reuse;
   }
 
   @Override
   public void rewind()
       throws IOException {
-    if (_parser != null && !_parser.isClosed()) {
-      _parser.close();
-    }
-    closeIterator();
-    initIterator();
+    _reader.close();
+    _reader = RecordReaderUtils.getBufferedReader(_dataFile);
+    _parser = _format.parse(_reader);
+    _iterator = _parser.iterator();
+    _nextLineId = (int) _parser.getCurrentLineNumber();
+    _numSkippedLines = 0;
   }
 
   @Override
   public void close()
       throws IOException {
-    closeIterator();
-
-    if (_parser != null && !_parser.isClosed()) {
-      _parser.close();
-    }
-  }
-
-  private void closeIterator()
-      throws IOException {
-    // if header is not provided by the client it would be rebuilt. When it's 
provided by the client it's initialized
-    // once in the constructor
-    if (useLineIterator(_config) && _config.getHeader() == null) {
-      _headerMap.clear();
-    }
-
-    if (_bufferedReader != null) {
-      _bufferedReader.close();
-    }
-  }
-
-  class LineIterator implements Iterator<CSVRecord> {
-    private String _nextLine;
-    private CSVRecord _current;
-
-    public LineIterator() {
-      init();
-    }
-
-    private void init() {
-      try {
-        if (_config.getHeader() != null) {
-          if (_config.isSkipHeader()) {
-            // When skip header config is set and header is supplied – skip 
the first line from the input file
-            _bufferedReader.readLine();
-            // turn off the property so that it doesn't interfere with further 
parsing
-            _format = _format.builder().setSkipHeaderRecord(false).build();
-          }
-        } else {
-          // read the first line
-          String headerLine = _bufferedReader.readLine();
-          _headerMap = parseHeaderMapFromLine(_format, headerLine);
-          // If header isn't provided, the first line would be set as header 
and the 'skipHeader' property
-          // is set to false.
-          _format = _format.builder()
-              .setSkipHeaderRecord(false)
-              .setHeader(_headerMap.keySet().toArray(new String[0]))
-              .build();
-        }
-        _nextLine = _bufferedReader.readLine();
-      } catch (IOException e) {
-        throw new RuntimeException(e);
-      }
-    }
-
-    private CSVRecord getNextRecord() {
-      while (_nextLine != null) {
-        try (Reader reader = new StringReader(_nextLine)) {
-          try (CSVParser csvParser = _format.parse(reader)) {
-            List<CSVRecord> csvRecords = csvParser.getRecords();
-            if (csvRecords == null || csvRecords.isEmpty()) {
-              // Can be thrown on: 1) Empty lines 2) Commented lines
-              throw new NoSuchElementException("Failed to find any records");
-            }
-            // There would be only one record as lines are read one after the 
other
-            CSVRecord csvRecord = csvRecords.get(0);
-
-            // move the pointer to the next line
-            _nextLine = _bufferedReader.readLine();
-            return csvRecord;
-          } catch (Exception e) {
-            // Find the next line that can be parsed
-            _nextLine = _bufferedReader.readLine();
-          }
-        } catch (IOException e) {
-          throw new RuntimeException(e);
-        }
-      }
-      return null;
-    }
-
-    @Override
-    public boolean hasNext() {
-      if (_current == null) {
-        _current = getNextRecord();
-      }
-
-      return _current != null;
-    }
-
-    @Override
-    public CSVRecord next() {
-      CSVRecord next = _current;
-      _current = null;
-
-      if (next == null) {
-        // hasNext() wasn't called before
-        next = getNextRecord();
-        if (next == null) {
-          throw new NoSuchElementException("No more CSV records available");
-        }
-      }
-
-      return next;
+    if (_reader != null) {
+      _reader.close();
     }
   }
 }
diff --git 
a/pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReaderConfig.java
 
b/pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReaderConfig.java
index bdc4ae2ed8..6d99f9712e 100644
--- 
a/pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReaderConfig.java
+++ 
b/pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReaderConfig.java
@@ -37,13 +37,14 @@ public class CSVRecordReaderConfig implements 
RecordReaderConfig {
   private Character _escapeCharacter; // Default is null
   private String _nullStringValue;
   private boolean _skipHeader;
-  private boolean _skipUnParseableLines = false;
   private boolean _ignoreEmptyLines = true;
   private boolean _ignoreSurroundingSpaces = true;
   private Character _quoteCharacter = '"';
   private String _quoteMode;
   private String _recordSeparator;
 
+  // When set to true, the record reader will stop processing the file if it 
encounters an error.
+  private boolean _stopOnError;
 
   public String getFileFormat() {
     return _fileFormat;
@@ -77,14 +78,6 @@ public class CSVRecordReaderConfig implements 
RecordReaderConfig {
     _multiValueDelimiter = multiValueDelimiter;
   }
 
-  public boolean isSkipUnParseableLines() {
-    return _skipUnParseableLines;
-  }
-
-  public void setSkipUnParseableLines(boolean skipUnParseableLines) {
-    _skipUnParseableLines = skipUnParseableLines;
-  }
-
   public boolean isMultiValueDelimiterEnabled() {
     return _multiValueDelimiterEnabled;
   }
@@ -165,6 +158,14 @@ public class CSVRecordReaderConfig implements 
RecordReaderConfig {
     _recordSeparator = recordSeparator;
   }
 
+  public boolean isStopOnError() {
+    return _stopOnError;
+  }
+
+  public void setStopOnError(boolean stopOnError) {
+    _stopOnError = stopOnError;
+  }
+
   @Override
   public String toString() {
     return ToStringBuilder.reflectionToString(this, 
ToStringStyle.SHORT_PREFIX_STYLE);
diff --git 
a/pinot-plugins/pinot-input-format/pinot-csv/src/test/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReaderTest.java
 
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReaderTest.java
index 6a1d86a48d..12303657b0 100644
--- 
a/pinot-plugins/pinot-input-format/pinot-csv/src/test/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReaderTest.java
+++ 
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReaderTest.java
@@ -21,11 +21,12 @@ package org.apache.pinot.plugin.inputformat.csv;
 import java.io.File;
 import java.io.FileWriter;
 import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
+import java.io.UncheckedIOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import javax.annotation.Nullable;
 import org.apache.commons.csv.CSVFormat;
 import org.apache.commons.csv.CSVPrinter;
 import org.apache.commons.lang3.StringUtils;
@@ -33,43 +34,49 @@ import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.AbstractRecordReaderTest;
 import org.apache.pinot.spi.data.readers.GenericRow;
-import org.apache.pinot.spi.data.readers.PrimaryKey;
 import org.apache.pinot.spi.data.readers.RecordReader;
-import org.testng.Assert;
 import org.testng.annotations.Test;
 
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertThrows;
+import static org.testng.Assert.assertTrue;
+
 
 public class CSVRecordReaderTest extends AbstractRecordReaderTest {
   private static final char CSV_MULTI_VALUE_DELIMITER = '\t';
+  private static final CSVRecordReaderConfig[] NULL_AND_EMPTY_CONFIGS = new 
CSVRecordReaderConfig[]{
+      null, new CSVRecordReaderConfig()
+  };
 
   @Override
   protected RecordReader createRecordReader(File file)
       throws Exception {
-    CSVRecordReaderConfig csvRecordReaderConfig = new CSVRecordReaderConfig();
-    csvRecordReaderConfig.setMultiValueDelimiter(CSV_MULTI_VALUE_DELIMITER);
-    CSVRecordReader csvRecordReader = new CSVRecordReader();
-    csvRecordReader.init(file, _sourceFields, csvRecordReaderConfig);
-    return csvRecordReader;
+    CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig();
+    readerConfig.setMultiValueDelimiter(CSV_MULTI_VALUE_DELIMITER);
+    CSVRecordReader recordReader = new CSVRecordReader();
+    recordReader.init(file, _sourceFields, readerConfig);
+    return recordReader;
   }
 
   @Override
-  protected void writeRecordsToFile(List<Map<String, Object>> recordsToWrite)
+  protected void writeRecordsToFile(List<Map<String, Object>> records)
       throws Exception {
-    Schema pinotSchema = getPinotSchema();
-    String[] columns = pinotSchema.getColumnNames().toArray(new String[0]);
-    try (FileWriter fileWriter = new FileWriter(_dataFile);
-        CSVPrinter csvPrinter = new CSVPrinter(fileWriter, 
CSVFormat.DEFAULT.withHeader(columns))) {
-
-      for (Map<String, Object> r : recordsToWrite) {
-        Object[] record = new Object[columns.length];
-        for (int i = 0; i < columns.length; i++) {
-          if (pinotSchema.getFieldSpecFor(columns[i]).isSingleValueField()) {
-            record[i] = r.get(columns[i]);
+    Schema schema = getPinotSchema();
+    String[] columns = schema.getColumnNames().toArray(new String[0]);
+    int numColumns = columns.length;
+    try (CSVPrinter csvPrinter = new CSVPrinter(new FileWriter(_dataFile),
+        CSVFormat.Builder.create().setHeader(columns).build())) {
+      for (Map<String, Object> record : records) {
+        Object[] values = new Object[numColumns];
+        for (int i = 0; i < numColumns; i++) {
+          if (schema.getFieldSpecFor(columns[i]).isSingleValueField()) {
+            values[i] = record.get(columns[i]);
           } else {
-            record[i] = StringUtils.join(((List) r.get(columns[i])).toArray(), 
CSV_MULTI_VALUE_DELIMITER);
+            values[i] = StringUtils.join(((List<?>) 
record.get(columns[i])).toArray(), CSV_MULTI_VALUE_DELIMITER);
           }
         }
-        csvPrinter.printRecord(record);
+        csvPrinter.printRecord(values);
       }
     }
   }
@@ -80,590 +87,572 @@ public class CSVRecordReaderTest extends 
AbstractRecordReaderTest {
   }
 
   @Override
-  protected void checkValue(RecordReader recordReader, List<Map<String, 
Object>> expectedRecordsMap,
+  protected void checkValue(RecordReader recordReader, List<Map<String, 
Object>> expectedRecords,
       List<Object[]> expectedPrimaryKeys)
       throws Exception {
-    for (int i = 0; i < expectedRecordsMap.size(); i++) {
-      Map<String, Object> expectedRecord = expectedRecordsMap.get(i);
+    int numRecords = expectedRecords.size();
+    for (int i = 0; i < numRecords; i++) {
+      Map<String, Object> expectedRecord = expectedRecords.get(i);
       GenericRow actualRecord = recordReader.next();
       for (FieldSpec fieldSpec : _pinotSchema.getAllFieldSpecs()) {
-        String fieldSpecName = fieldSpec.getName();
+        String column = fieldSpec.getName();
         if (fieldSpec.isSingleValueField()) {
-          Assert.assertEquals(actualRecord.getValue(fieldSpecName).toString(),
-              expectedRecord.get(fieldSpecName).toString());
+          assertEquals(actualRecord.getValue(column).toString(), 
expectedRecord.get(column).toString());
         } else {
-          List expectedRecords = (List) expectedRecord.get(fieldSpecName);
-          if (expectedRecords.size() == 1) {
-            
Assert.assertEquals(actualRecord.getValue(fieldSpecName).toString(), 
expectedRecords.get(0).toString());
+          List<?> expectedValues = (List<?>) expectedRecord.get(column);
+          if (expectedValues.size() == 1) {
+            assertEquals(actualRecord.getValue(column).toString(), 
expectedValues.get(0).toString());
           } else {
-            Object[] actualRecords = (Object[]) 
actualRecord.getValue(fieldSpecName);
-            Assert.assertEquals(actualRecords.length, expectedRecords.size());
-            for (int j = 0; j < actualRecords.length; j++) {
-              Assert.assertEquals(actualRecords[j].toString(), 
expectedRecords.get(j).toString());
+            Object[] actualValues = (Object[]) actualRecord.getValue(column);
+            assertEquals(actualValues.length, expectedValues.size());
+            for (int j = 0; j < actualValues.length; j++) {
+              assertEquals(actualValues[j].toString(), 
expectedValues.get(j).toString());
             }
           }
         }
-        PrimaryKey primaryKey = 
actualRecord.getPrimaryKey(getPrimaryKeyColumns());
-        for (int j = 0; j < primaryKey.getValues().length; j++) {
-          Assert.assertEquals(primaryKey.getValues()[j].toString(), 
expectedPrimaryKeys.get(i)[j].toString());
+        Object[] expectedPrimaryKey = expectedPrimaryKeys.get(i);
+        Object[] actualPrimaryKey = 
actualRecord.getPrimaryKey(getPrimaryKeyColumns()).getValues();
+        for (int j = 0; j < actualPrimaryKey.length; j++) {
+          assertEquals(actualPrimaryKey[j].toString(), 
expectedPrimaryKey[j].toString());
         }
       }
     }
-    Assert.assertFalse(recordReader.hasNext());
+    assertFalse(recordReader.hasNext());
   }
 
   @Test
-  public void testInvalidDelimiterInHeader() {
-    // setup
-    CSVRecordReaderConfig csvRecordReaderConfig = new CSVRecordReaderConfig();
-    csvRecordReaderConfig.setMultiValueDelimiter(CSV_MULTI_VALUE_DELIMITER);
-    
csvRecordReaderConfig.setHeader("col1;col2;col3;col4;col5;col6;col7;col8;col9;col10");
-    csvRecordReaderConfig.setDelimiter(',');
-    CSVRecordReader csvRecordReader = new CSVRecordReader();
-
-    //execute and assert
-    Assert.assertThrows(IllegalArgumentException.class,
-        () -> csvRecordReader.init(_dataFile, null, csvRecordReaderConfig));
+  public void testInvalidDelimiterInHeader()
+      throws IOException {
+    CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig();
+    readerConfig.setMultiValueDelimiter(CSV_MULTI_VALUE_DELIMITER);
+    
readerConfig.setHeader("col1;col2;col3;col4;col5;col6;col7;col8;col9;col10");
+    try (CSVRecordReader recordReader = new CSVRecordReader()) {
+      assertThrows(IllegalStateException.class, () -> 
recordReader.init(_dataFile, null, readerConfig));
+    }
   }
 
   @Test
   public void testValidDelimiterInHeader()
       throws IOException {
-    //setup
-    CSVRecordReaderConfig csvRecordReaderConfig = new CSVRecordReaderConfig();
-    csvRecordReaderConfig.setMultiValueDelimiter(CSV_MULTI_VALUE_DELIMITER);
-    
csvRecordReaderConfig.setHeader("col1,col2,col3,col4,col5,col6,col7,col8,col9,col10");
-    csvRecordReaderConfig.setDelimiter(',');
-    CSVRecordReader csvRecordReader = new CSVRecordReader();
-
-    //read all fields
-    //execute and assert
-    csvRecordReader.init(_dataFile, null, csvRecordReaderConfig);
-    Assert.assertEquals(10, csvRecordReader.getCSVHeaderMap().size());
-    Assert.assertTrue(csvRecordReader.getCSVHeaderMap().containsKey("col1"));
-    Assert.assertTrue(csvRecordReader.hasNext());
-  }
-
-  /**
-   * When CSV records contain a single value, then no exception should be 
throw while initialising.
-   * This test requires a different setup from the rest of the tests as it 
requires a single-column
-   * CSV. Therefore, we re-write already generated records into a new file, 
but only the first
-   * column.
-   *
-   * @throws IOException
-   */
-  @Test
-  public void testHeaderDelimiterSingleColumn()
-      throws IOException {
-    //setup
-
-    //create a single value CSV
-    Schema pinotSchema = getPinotSchema();
-    //write only the first column in the schema
-    String column = pinotSchema.getColumnNames().toArray(new String[0])[0];
-    //use a different file name so that other tests aren't affected
-    File file = new File(_tempDir, "data1.csv");
-    try (FileWriter fileWriter = new FileWriter(file);
-        CSVPrinter csvPrinter = new CSVPrinter(fileWriter, 
CSVFormat.DEFAULT.withHeader(column))) {
-      for (Map<String, Object> r : _records) {
-        Object[] record = new Object[1];
-        record[0] = r.get(column);
-        csvPrinter.printRecord(record);
-      }
+    CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig();
+    readerConfig.setMultiValueDelimiter(CSV_MULTI_VALUE_DELIMITER);
+    
readerConfig.setHeader("col1,col2,col3,col4,col5,col6,col7,col8,col9,col10");
+    try (CSVRecordReader recordReader = new CSVRecordReader()) {
+      recordReader.init(_dataFile, null, readerConfig);
+      assertEquals(recordReader.getColumns(),
+          List.of("col1", "col2", "col3", "col4", "col5", "col6", "col7", 
"col8", "col9", "col10"));
+      assertTrue(recordReader.hasNext());
     }
-
-    CSVRecordReaderConfig csvRecordReaderConfig = new CSVRecordReaderConfig();
-    csvRecordReaderConfig.setMultiValueDelimiter(CSV_MULTI_VALUE_DELIMITER);
-    csvRecordReaderConfig.setHeader("col1");
-    CSVRecordReader csvRecordReader = new CSVRecordReader();
-
-    //execute and assert
-    csvRecordReader.init(file, null, csvRecordReaderConfig);
-    Assert.assertTrue(csvRecordReader.hasNext());
   }
 
   @Test
-  public void testNullValueString()
-      throws IOException {
-    //setup
-    String nullString = "NULL";
-    //create a single value CSV
-    Schema pinotSchema = getPinotSchema();
-    //write only the first column in the schema
-    String column = pinotSchema.getColumnNames().toArray(new String[0])[0];
-    //use a different file name so that other tests aren't affected
-    File file = new File(_tempDir, "data1.csv");
-    try (FileWriter fileWriter = new FileWriter(file);
-        CSVPrinter csvPrinter = new CSVPrinter(fileWriter,
-            CSVFormat.DEFAULT.withHeader("col1", "col2", 
"col3").withNullString(nullString))) {
-      for (Map<String, Object> r : _records) {
-        Object[] record = new Object[3];
-        record[0] = r.get(column);
-        csvPrinter.printRecord(record);
-      }
+  public void testReadingDataFileBasic()
+      throws IOException {
+    File dataFile = getDataFile("dataFileBasic.csv");
+    for (CSVRecordReaderConfig readerConfig : NULL_AND_EMPTY_CONFIGS) {
+      validate(dataFile, readerConfig, List.of(
+          createMap("id", "100", "name", "John"),
+          createMap("id", "101", "name", "Jane"),
+          createMap("id", "102", "name", "Alice"),
+          createMap("id", "103", "name", "Bob")
+      ));
     }
-
-    CSVRecordReaderConfig csvRecordReaderConfig = new CSVRecordReaderConfig();
-    csvRecordReaderConfig.setMultiValueDelimiter(CSV_MULTI_VALUE_DELIMITER);
-    csvRecordReaderConfig.setHeader("col1,col2,col3");
-    csvRecordReaderConfig.setNullStringValue(nullString);
-    CSVRecordReader csvRecordReader = new CSVRecordReader();
-
-    //execute and assert
-    csvRecordReader.init(file, null, csvRecordReaderConfig);
-    Assert.assertTrue(csvRecordReader.hasNext());
-    csvRecordReader.next();
-
-    GenericRow row = csvRecordReader.next();
-    Assert.assertNotNull(row.getValue("col1"));
-    Assert.assertNull(row.getValue("col2"));
-    Assert.assertNull(row.getValue("col3"));
   }
 
   @Test
-  public void testReadingDataFileWithCommentedLines()
-      throws IOException, URISyntaxException {
-    URI uri = 
ClassLoader.getSystemResource("dataFileWithCommentedLines.csv").toURI();
-    File dataFile = new File(uri);
+  public void testReadingDataFileWithSingleColumn()
+      throws IOException {
+    File dataFile = getDataFile("dataFileWithSingleColumn.csv");
+    for (CSVRecordReaderConfig readerConfig : NULL_AND_EMPTY_CONFIGS) {
+      validate(dataFile, readerConfig, List.of(
+          createMap("name", "John"),
+          createMap("name", "Jane"),
+          createMap("name", "Jen")
+      ));
+    }
 
-    // test using line iterator
     CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig();
-    readerConfig.setSkipUnParseableLines(true);
-    readerConfig.setCommentMarker('#');
-    List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig, 
null, false);
-    Assert.assertEquals(3, genericRows.size());
-
-    // test using default CSVRecordReader
-    readerConfig.setSkipUnParseableLines(false);
-    genericRows = readCSVRecords(dataFile, readerConfig, null, false);
-    Assert.assertEquals(3, genericRows.size());
+    readerConfig.setHeader("firstName,lastName,id");
+    readerConfig.setSkipHeader(true);
+    validate(dataFile, readerConfig, List.of(
+        createMap("firstName", "John", "lastName", null, "id", null),
+        createMap("firstName", "Jane", "lastName", null, "id", null),
+        createMap("firstName", "Jen", "lastName", null, "id", null)
+    ));
   }
 
   @Test
-  public void testReadingDataFileWithEmptyLines()
-      throws URISyntaxException, IOException {
-    URI uri = 
ClassLoader.getSystemResource("dataFileWithEmptyLines.csv").toURI();
-    File dataFile = new File(uri);
+  public void testReadingDataFileWithInvalidHeader()
+      throws IOException {
+    File dataFile = getDataFile("dataFileWithInvalidHeader.csv");
+    for (CSVRecordReaderConfig readerConfig : NULL_AND_EMPTY_CONFIGS) {
+      try (CSVRecordReader recordReader = new CSVRecordReader()) {
+        assertThrows(IllegalStateException.class, () -> 
recordReader.init(dataFile, null, readerConfig));
+      }
+    }
 
-    // test using line iterator
     CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig();
-    readerConfig.setSkipUnParseableLines(true);
-    List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig, 
null, false);
-    Assert.assertEquals(5, genericRows.size());
-
-    // test using default CSVRecordReader
-    readerConfig.setSkipUnParseableLines(false);
-    genericRows = readCSVRecords(dataFile, readerConfig, null, false);
-    Assert.assertEquals(5, genericRows.size());
+    readerConfig.setHeader("firstName,lastName,id");
+    readerConfig.setSkipHeader(true);
+    validate(dataFile, readerConfig, List.of(
+        createMap("firstName", "John", "lastName", "Doe", "id", "100"),
+        createMap("firstName", "Jane", "lastName", "Doe", "id", "101"),
+        createMap("firstName", "Jen", "lastName", "Doe", "id", "102")
+    ));
   }
 
   @Test
-  public void testReadingDataFileWithEscapedQuotes()
-      throws URISyntaxException, IOException {
-    URI uri = 
ClassLoader.getSystemResource("dataFileWithEscapedQuotes.csv").toURI();
-    File dataFile = new File(uri);
-
-    // test using line iterator
+  public void testReadingDataFileWithAlternateDelimiter()
+      throws IOException {
+    File dataFile = getDataFile("dataFileWithAlternateDelimiter.csv");
     CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig();
-    readerConfig.setSkipUnParseableLines(true);
-    List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig, 
null, false);
-    Assert.assertEquals(2, genericRows.size());
-
-    // test using default CSVRecordReader
-    readerConfig.setSkipUnParseableLines(false);
-    genericRows = readCSVRecords(dataFile, readerConfig, null, false);
-    Assert.assertEquals(2, genericRows.size());
+    readerConfig.setDelimiter('|');
+    validate(dataFile, readerConfig, List.of(
+        createMap("id", "100", "firstName", "John", "lastName", "Doe"),
+        createMap("id", "101", "firstName", "Jane", "lastName", "Doe"),
+        createMap("id", "102", "firstName", "Jen", "lastName", "Doe")
+    ));
   }
 
   @Test
-  public void testReadingDataFileWithNoHeader()
-      throws URISyntaxException, IOException {
-    URI uri = 
ClassLoader.getSystemResource("dataFileWithNoHeader.csv").toURI();
-    File dataFile = new File(uri);
+  public void testReadingDataFileWithSurroundingSpaces()
+      throws IOException {
+    File dataFile = getDataFile("dataFileWithSurroundingSpaces.csv");
+    for (CSVRecordReaderConfig readerConfig : NULL_AND_EMPTY_CONFIGS) {
+      validate(dataFile, readerConfig, List.of(
+          createMap("firstName", "John", "lastName", "Doe", "id", "100"),
+          createMap("firstName", "Jane", "lastName", "Doe", "id", "101"),
+          createMap("firstName", "Jen", "lastName", "Doe", "id", "102")
+      ));
+    }
 
-    // test using line iterator
     CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig();
-    readerConfig.setSkipUnParseableLines(true);
-    readerConfig.setHeader("id,name");
-    List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig, 
null, false);
-    Assert.assertEquals(3, genericRows.size());
-
-    // test using default CSVRecordReader
-    readerConfig.setSkipUnParseableLines(false);
-    genericRows = readCSVRecords(dataFile, readerConfig, null, false);
-    Assert.assertEquals(3, genericRows.size());
+    readerConfig.setIgnoreSurroundingSpaces(false);
+    validate(dataFile, readerConfig, List.of(
+        createMap(" firstName ", "John  ", " lastName ", " Doe", " id", "100"),
+        createMap(" firstName ", "Jane", " lastName ", " Doe", " id", "  101"),
+        createMap(" firstName ", "Jen", " lastName ", "Doe ", " id", "102")
+    ));
   }
 
   @Test
-  public void testReadingDataFileWithQuotedHeaders()
-      throws URISyntaxException, IOException {
-    URI uri = 
ClassLoader.getSystemResource("dataFileWithQuotedHeaders.csv").toURI();
-    File dataFile = new File(uri);
-
-    // test using line iterator
-    CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig();
-    readerConfig.setSkipUnParseableLines(true);
-    List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig, 
null, false);
-    Assert.assertEquals(2, genericRows.size());
-
-    // test using default CSVRecordReader
-    readerConfig.setSkipUnParseableLines(false);
-    genericRows = readCSVRecords(dataFile, readerConfig, null, false);
-    Assert.assertEquals(2, genericRows.size());
+  public void testReadingDataFileWithQuotes()
+      throws IOException {
+    File dataFile = getDataFile("dataFileWithQuotes.csv");
+    for (CSVRecordReaderConfig readerConfig : NULL_AND_EMPTY_CONFIGS) {
+      validate(dataFile, readerConfig, List.of(
+          createMap("key", "key00", "num0", "12.3", "num1", "8.42"),
+          createMap("key", "key01", "num0", null, "num1", "7.1"),
+          createMap("key", "key02", "num0", null, "num1", "16.81"),
+          createMap("key", "key03", "num0", null, "num1", "7.12")
+      ));
+    }
   }
 
   @Test
-  public void testLineIteratorReadingDataFileWithUnparseableLines()
-      throws URISyntaxException, IOException {
-    URI uri = 
ClassLoader.getSystemResource("dataFileWithUnparseableLines.csv").toURI();
-    File dataFile = new File(uri);
-
+  public void testReadingDataFileWithCustomNull()
+      throws IOException {
+    File dataFile = getDataFile("dataFileWithCustomNull.csv");
     CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig();
-    readerConfig.setSkipUnParseableLines(true);
-    List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig, 
null, false);
-    Assert.assertEquals(1, genericRows.size());
+    readerConfig.setNullStringValue("NULL");
+    validate(dataFile, readerConfig, List.of(
+        createMap("id", "100", "name", null),
+        createMap("id", null, "name", "Jane"),
+        createMap("id", null, "name", null),
+        createMap("id", null, "name", null)
+    ));
   }
 
   @Test
-  public void testLineIteratorReadingDataFileWithUnparseableLinesWithRewind()
-      throws URISyntaxException, IOException {
-    URI uri = 
ClassLoader.getSystemResource("dataFileWithUnparseableLines2.csv").toURI();
-    File dataFile = new File(uri);
+  public void testReadingDataFileWithCommentedLines()
+      throws IOException {
+    File dataFile = getDataFile("dataFileWithCommentedLines.csv");
+    for (CSVRecordReaderConfig readerConfig : NULL_AND_EMPTY_CONFIGS) {
+      // Verify first row
+      validate(dataFile, readerConfig, 5, List.of(createMap("id", "# ignore 
line#1", "name", null)));
+    }
 
     CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig();
-    readerConfig.setSkipUnParseableLines(true);
-    final List<GenericRow> genericRows1 = readCSVRecords(dataFile, 
readerConfig, null, true);
-    Assert.assertEquals(3, genericRows1.size());
-
-    // Start reading again; results should be same
-    final List<GenericRow> genericRows2 = readCSVRecords(dataFile, 
readerConfig, null, true);
-    Assert.assertEquals(3, genericRows2.size());
-
-    // Check that the rows are the same
-    for (int i = 0; i < genericRows1.size(); i++) {
-      Assert.assertEquals(genericRows1.get(i), genericRows2.get(i));
-    }
+    readerConfig.setCommentMarker('#');
+    validate(dataFile, readerConfig, List.of(
+        createMap("id", "100", "name", "Jane"),
+        createMap("id", "101", "name", "John"),
+        createMap("id", "102", "name", "Sam")
+    ));
   }
 
   @Test
-  public void testReadingDataFileWithRewind()
-      throws URISyntaxException, IOException {
-    URI uri = ClassLoader.getSystemResource("dataFileBasic.csv").toURI();
-    File dataFile = new File(uri);
-
-    CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig();
-    List<GenericRow> genericRows1 = readCSVRecords(dataFile, readerConfig, 
null, true);
-    Assert.assertEquals(4, genericRows1.size());
-
-    // Start reading again; results should be same
-    List<GenericRow> genericRows2 = readCSVRecords(dataFile, readerConfig, 
null, true);
-    Assert.assertEquals(4, genericRows2.size());
-
-    // Check that the rows are the same
-    for (int i = 0; i < genericRows1.size(); i++) {
-      Assert.assertEquals(genericRows1.get(i), genericRows2.get(i));
+  public void testReadingDataFileWithEmptyLines()
+      throws IOException {
+    File dataFile = getDataFile("dataFileWithEmptyLines.csv");
+    for (CSVRecordReaderConfig readerConfig : NULL_AND_EMPTY_CONFIGS) {
+      validate(dataFile, readerConfig, 5);
     }
-  }
-
-  @Test (expectedExceptions = RuntimeException.class)
-  public void 
testDefaultCsvReaderExceptionReadingDataFileWithUnparseableLines()
-      throws URISyntaxException, IOException {
-    URI uri = 
ClassLoader.getSystemResource("dataFileWithUnparseableLines.csv").toURI();
-    File dataFile = new File(uri);
 
     CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig();
-    readCSVRecords(dataFile, readerConfig, null, false);
+    readerConfig.setIgnoreEmptyLines(false);
+    validate(dataFile, readerConfig, 8);
   }
 
   @Test
-  public void testLineIteratorReadingDataFileWithMultipleCombinations()
-      throws URISyntaxException, IOException {
-    URI uri = 
ClassLoader.getSystemResource("dataFileWithMultipleCombinations.csv").toURI();
-    File dataFile = new File(uri);
+  public void testReadingDataFileWithEscapedQuotes()
+      throws IOException {
+    File dataFile = getDataFile("dataFileWithEscapedQuotes.csv");
+    for (CSVRecordReaderConfig readerConfig : NULL_AND_EMPTY_CONFIGS) {
+      validate(dataFile, readerConfig, List.of(
+          createMap("\\\"id\\\"", "\\\"100\\\"", "\\\"name\\\"", 
"\\\"Jane\\\""),
+          createMap("\\\"id\\\"", "\\\"101\\\"", "\\\"name\\\"", 
"\\\"John\\\"")
+      ));
+    }
 
     CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig();
-    readerConfig.setSkipUnParseableLines(true);
-    readerConfig.setCommentMarker('#');
-    readerConfig.setIgnoreEmptyLines(true);
-
-    List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig, 
null, false);
-    Assert.assertEquals(7, genericRows.size());
+    readerConfig.setEscapeCharacter('\\');
+    validate(dataFile, readerConfig, List.of(
+        createMap("\"id\"", "\"100\"", "\"name\"", "\"Jane\""),
+        createMap("\"id\"", "\"101\"", "\"name\"", "\"John\"")
+    ));
   }
 
   @Test
-  public void testDefaultCsvReaderReadingDataFileWithMultipleCombinations()
-      throws URISyntaxException, IOException {
-    URI uri = 
ClassLoader.getSystemResource("dataFileWithMultipleCombinationsParseable.csv").toURI();
-    File dataFile = new File(uri);
+  public void testReadingDataFileWithNoHeader()
+      throws IOException {
+    File dataFile = getDataFile("dataFileWithNoHeader.csv");
+    for (CSVRecordReaderConfig readerConfig : NULL_AND_EMPTY_CONFIGS) {
+      validate(dataFile, readerConfig, List.of(
+          createMap("100", "101", "Jane", "John"),
+          createMap("100", "102", "Jane", "Sam")
+      ));
+    }
 
     CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig();
-    readerConfig.setCommentMarker('#');
-    readerConfig.setIgnoreEmptyLines(true);
-
-    List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig, new 
GenericRow(), false);
-    Assert.assertEquals(7, genericRows.size());
+    readerConfig.setHeader("id,name");
+    validate(dataFile, readerConfig, List.of(
+        createMap("id", "100", "name", "Jane"),
+        createMap("id", "101", "name", "John"),
+        createMap("id", "102", "name", "Sam")
+    ));
   }
 
   @Test
-  public void testLineIteratorRewindMethod()
-      throws URISyntaxException, IOException {
-    URI uri = 
ClassLoader.getSystemResource("dataFileWithMultipleCombinations.csv").toURI();
-    File dataFile = new File(uri);
+  public void testReadingDataFileWithNoHeaderAndEmptyValues()
+      throws IOException {
+    File dataFile = getDataFile("dataFileWithNoHeaderAndEmptyValues.csv");
+    for (CSVRecordReaderConfig readerConfig : NULL_AND_EMPTY_CONFIGS) {
+      validate(dataFile, readerConfig, List.of(
+          createMap("key00", "key01", "12.3", null, "8.42", "7.1"),
+          createMap("key00", "key02", "12.3", null, "8.42", "16.81"),
+          createMap("key00", "key03", "12.3", null, "8.42", "7.12")
+      ));
+    }
 
     CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig();
-    readerConfig.setSkipUnParseableLines(true);
-    readerConfig.setCommentMarker('#');
-    readerConfig.setIgnoreEmptyLines(true);
-    readCSVRecords(dataFile, readerConfig, null, true);
-
-    // Start reading again; results should be same
-    List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig, new 
GenericRow(), false);
-    Assert.assertEquals(7, genericRows.size());
+    readerConfig.setHeader("key,num0,num1");
+    validate(dataFile, readerConfig, List.of(
+        createMap("key", "key00", "num0", "12.3", "num1", "8.42"),
+        createMap("key", "key01", "num0", null, "num1", "7.1"),
+        createMap("key", "key02", "num0", null, "num1", "16.81"),
+        createMap("key", "key03", "num0", null, "num1", "7.12")
+    ));
   }
 
   @Test
-  public void testDefaultCsvReaderRewindMethod()
-      throws URISyntaxException, IOException {
-    URI uri = 
ClassLoader.getSystemResource("dataFileWithMultipleCombinationsParseable.csv").toURI();
-    File dataFile = new File(uri);
+  public void testReadingDataFileWithNoRecords()
+      throws IOException {
+    File dataFile = getDataFile("dataFileWithNoRecords.csv");
+    for (CSVRecordReaderConfig readerConfig : NULL_AND_EMPTY_CONFIGS) {
+      validate(dataFile, readerConfig, 0);
+    }
 
     CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig();
-    readerConfig.setCommentMarker('#');
-    readerConfig.setIgnoreEmptyLines(true);
-    readCSVRecords(dataFile, readerConfig, null, true);
-
-    // Start reading again; results should be same
-    List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig, 
null, false);
-    Assert.assertEquals(7, genericRows.size());
+    readerConfig.setHeader("id,name");
+    readerConfig.setSkipHeader(true);
+    validate(dataFile, readerConfig, 0);
   }
 
   @Test
-  public void testReadingDataFileWithInvalidHeader()
-      throws URISyntaxException, IOException {
-    URI uri = 
ClassLoader.getSystemResource("dataFileWithInvalidHeader.csv").toURI();
-    File dataFile = new File(uri);
+  public void testReadingDataFileEmpty()
+      throws IOException {
+    File dataFile = getDataFile("dataFileEmpty.csv");
+    for (CSVRecordReaderConfig readerConfig : NULL_AND_EMPTY_CONFIGS) {
+      validate(dataFile, readerConfig, 0);
+    }
 
-    // test using line iterator
     CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig();
-    readerConfig.setHeader("firstName,lastName,id");
-    readerConfig.setSkipHeader(true);
-    readerConfig.setSkipUnParseableLines(true);
-    List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig, 
null, false);
-    Assert.assertEquals(3, genericRows.size());
-
-    // test using default CSVRecordReader
-    readerConfig.setSkipUnParseableLines(false);
-    genericRows = readCSVRecords(dataFile, readerConfig, null, false);
-    Assert.assertEquals(3, genericRows.size());
+    readerConfig.setHeader("id,name");
+    validate(dataFile, readerConfig, 0);
   }
 
   @Test
-  public void testReadingDataFileWithAlternateDelimiter()
-      throws URISyntaxException, IOException {
-    URI uri = 
ClassLoader.getSystemResource("dataFileWithAlternateDelimiter.csv").toURI();
-    File dataFile = new File(uri);
-
-    // test using line iterator
-    CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig();
-    readerConfig.setDelimiter('|');
-    readerConfig.setSkipUnParseableLines(true);
-    List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig, 
null, false);
-    Assert.assertEquals(3, genericRows.size());
-
-    // test using default CSVRecordReader
-    readerConfig.setSkipUnParseableLines(false);
-    genericRows = readCSVRecords(dataFile, readerConfig, null, false);
-    Assert.assertEquals(3, genericRows.size());
+  public void testReadingDataFileWithMultiLineValues()
+      throws IOException {
+    File dataFile = getDataFile("dataFileWithMultiLineValues.csv");
+    for (CSVRecordReaderConfig readerConfig : NULL_AND_EMPTY_CONFIGS) {
+      validate(dataFile, readerConfig, List.of(
+          createMap("id", "100", "name", "John\n101,Jane"),
+          createMap("id", "102", "name", "Alice")
+      ));
+    }
   }
 
   @Test
-  public void testReadingDataFileWithSpaceAroundHeaderFields()
-      throws URISyntaxException, IOException {
-    URI uri = 
ClassLoader.getSystemResource("dataFileWithSpaceAroundHeaders.csv").toURI();
-    File dataFile = new File(uri);
-
-    // test using line iterator
-    CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig();
-    readerConfig.setSkipUnParseableLines(true);
-    readerConfig.setIgnoreSurroundingSpaces(true);
-    List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig, 
null, false);
-    Assert.assertEquals(3, genericRows.size());
-    validateSpaceAroundHeadersAreTrimmed(dataFile, readerConfig);
-
-    // test using default CSVRecordReader
-    readerConfig.setSkipUnParseableLines(false);
-    genericRows = readCSVRecords(dataFile, readerConfig, null, false);
-    Assert.assertEquals(3, genericRows.size());
-    validateSpaceAroundHeadersAreTrimmed(dataFile, readerConfig);
+  public void testReadingDataFileWithUnparseableFirstLine()
+      throws IOException {
+    File dataFile = getDataFile("dataFileWithUnparseableFirstLine.csv");
+    for (CSVRecordReaderConfig readerConfig : NULL_AND_EMPTY_CONFIGS) {
+      try (CSVRecordReader recordReader = new CSVRecordReader()) {
+        assertThrows(IOException.class, () -> recordReader.init(dataFile, 
null, readerConfig));
+      }
+    }
   }
 
   @Test
-  public void testReadingDataFileWithSpaceAroundHeaderAreRetained()
-      throws URISyntaxException, IOException {
-    URI uri = 
ClassLoader.getSystemResource("dataFileWithSpaceAroundHeaders.csv").toURI();
-    File dataFile = new File(uri);
+  public void testLineIteratorReadingDataFileWithUnparseableLine()
+      throws IOException {
+    File dataFile = getDataFile("dataFileWithUnparseableLine.csv");
+    for (CSVRecordReaderConfig readerConfig : NULL_AND_EMPTY_CONFIGS) {
+      try (CSVRecordReader recordReader = new CSVRecordReader()) {
+        recordReader.init(dataFile, null, readerConfig);
+        testUnparseableLine(recordReader);
+        recordReader.rewind();
+        testUnparseableLine(recordReader);
+      }
+    }
 
-    // test using line iterator
     CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig();
-    readerConfig.setSkipUnParseableLines(true);
-    readerConfig.setIgnoreSurroundingSpaces(false);
-    List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig, 
null, false);
-    Assert.assertEquals(3, genericRows.size());
-    validateSpaceAroundHeadersAreRetained(dataFile, readerConfig);
+    readerConfig.setStopOnError(true);
+    try (CSVRecordReader recordReader = new CSVRecordReader()) {
+      recordReader.init(dataFile, null, readerConfig);
+      testUnparseableLineStopOnError(recordReader);
+      recordReader.rewind();
+      testUnparseableLineStopOnError(recordReader);
+    }
+  }
 
-    // test using default CSVRecordReader
-    readerConfig.setSkipUnParseableLines(false);
-    genericRows = readCSVRecords(dataFile, readerConfig, null, false);
-    Assert.assertEquals(3, genericRows.size());
-    validateSpaceAroundHeadersAreRetained(dataFile, readerConfig);
+  private void testUnparseableLine(CSVRecordReader recordReader)
+      throws IOException {
+    // First line is parseable
+    assertTrue(recordReader.hasNext());
+    assertEquals(recordReader.next().getFieldToValueMap(), createMap("id", 
"100", "name", "John"));
+    // Second line is unparseable, should throw exception when next() is 
called, and being skipped
+    assertTrue(recordReader.hasNext());
+    assertThrows(UncheckedIOException.class, recordReader::next);
+    // Third line is parseable
+    assertTrue(recordReader.hasNext());
+    assertEquals(recordReader.next().getFieldToValueMap(), createMap("id", 
"102", "name", "Alice"));
+    // 3 lines in total
+    assertFalse(recordReader.hasNext());
+  }
+
+  private void testUnparseableLineStopOnError(CSVRecordReader recordReader)
+      throws IOException {
+    // First line is parseable
+    assertTrue(recordReader.hasNext());
+    assertEquals(recordReader.next().getFieldToValueMap(), createMap("id", 
"100", "name", "John"));
+    // Second line is unparseable, stop here
+    assertFalse(recordReader.hasNext());
   }
 
   @Test
-  public void testRewindMethodAndSkipHeader()
-      throws URISyntaxException, IOException {
-    URI uri = 
ClassLoader.getSystemResource("dataFileWithInvalidHeader.csv").toURI();
-    File dataFile = new File(uri);
+  public void testLineIteratorReadingDataFileWithUnparseableLastLine()
+      throws IOException {
+    File dataFile = getDataFile("dataFileWithUnparseableLastLine.csv");
+    for (CSVRecordReaderConfig readerConfig : NULL_AND_EMPTY_CONFIGS) {
+      try (CSVRecordReader recordReader = new CSVRecordReader()) {
+        recordReader.init(dataFile, null, readerConfig);
+        testUnparseableLastLine(recordReader);
+        recordReader.rewind();
+        testUnparseableLastLine(recordReader);
+      }
+    }
 
     CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig();
-    readerConfig.setSkipUnParseableLines(true);
-    readerConfig.setHeader("id,name");
-    readerConfig.setSkipHeader(true);
-    readCSVRecords(dataFile, readerConfig, new GenericRow(), true);
-
-    // Start reading again; results should be same
-    List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig, 
null, false);
-    Assert.assertEquals(3, genericRows.size());
+    readerConfig.setStopOnError(true);
+    try (CSVRecordReader recordReader = new CSVRecordReader()) {
+      recordReader.init(dataFile, null, readerConfig);
+      testUnparseableLastLineStopOnError(recordReader);
+      recordReader.rewind();
+      testUnparseableLastLineStopOnError(recordReader);
+    }
+  }
 
-    // test using default CSVRecordReader
-    readerConfig.setSkipUnParseableLines(false);
-    readCSVRecords(dataFile, readerConfig, null, true);
+  private void testUnparseableLastLine(CSVRecordReader recordReader)
+      throws IOException {
+    // First line is parseable
+    assertTrue(recordReader.hasNext());
+    assertEquals(recordReader.next().getFieldToValueMap(), createMap("id", 
"100", "name", "John"));
+    // Second line is unparseable, should throw exception when next() is 
called, and being skipped
+    assertTrue(recordReader.hasNext());
+    assertThrows(UncheckedIOException.class, recordReader::next);
+    // 2 lines in total
+    assertFalse(recordReader.hasNext());
+  }
 
-    // Start reading again; results should be same
-    genericRows = readCSVRecords(dataFile, readerConfig, null, false);
-    Assert.assertEquals(3, genericRows.size());
+  private void testUnparseableLastLineStopOnError(CSVRecordReader recordReader)
+      throws IOException {
+    // First line is parseable
+    assertTrue(recordReader.hasNext());
+    assertEquals(recordReader.next().getFieldToValueMap(), createMap("id", 
"100", "name", "John"));
+    // Second line is unparseable, stop here
+    assertFalse(recordReader.hasNext());
   }
 
   @Test
   public void testReadingDataFileWithPartialLastRow()
-      throws URISyntaxException, IOException {
-    URI uri = 
ClassLoader.getSystemResource("dataFileWithPartialLastRow.csv").toURI();
-    File dataFile = new File(uri);
+      throws IOException {
+    File dataFile = getDataFile("dataFileWithPartialLastRow.csv");
+    for (CSVRecordReaderConfig readerConfig : NULL_AND_EMPTY_CONFIGS) {
+      try (CSVRecordReader recordReader = new CSVRecordReader()) {
+        recordReader.init(dataFile, null, readerConfig);
+        testPartialLastRow(recordReader);
+        recordReader.rewind();
+        testPartialLastRow(recordReader);
+      }
+    }
 
-    // test using line iterator
     CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig();
-    readerConfig.setSkipUnParseableLines(true);
-    List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig, 
null, false);
-    Assert.assertEquals(2, genericRows.size());
-
-    // Note: The default CSVRecordReader cannot handle unparseable rows
+    readerConfig.setStopOnError(true);
+    try (CSVRecordReader recordReader = new CSVRecordReader()) {
+      recordReader.init(dataFile, null, readerConfig);
+      testPartialLastRowStopOnError(recordReader);
+      recordReader.rewind();
+      testPartialLastRowStopOnError(recordReader);
+    }
   }
 
-  @Test
-  public void testReadingDataFileWithNoRecords()
-      throws URISyntaxException, IOException {
-    URI uri = 
ClassLoader.getSystemResource("dataFileWithNoRecords.csv").toURI();
-    File dataFile = new File(uri);
-
-    // test using line iterator
-    CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig();
-    readerConfig.setSkipUnParseableLines(true);
-    List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig, 
null, false);
-    Assert.assertEquals(0, genericRows.size());
-
-    // Note: The default CSVRecordReader cannot handle unparseable rows
+  private void testPartialLastRow(CSVRecordReader recordReader)
+      throws IOException {
+    assertTrue(recordReader.hasNext());
+    assertEquals(recordReader.next().getFieldToValueMap(),
+        createMap("id", "100", "firstName", "jane", "lastName", "doe", 
"appVersion", "1.0.0", "active", "yes"));
+    assertTrue(recordReader.hasNext());
+    assertEquals(recordReader.next().getFieldToValueMap(),
+        createMap("id", "101", "firstName", "john", "lastName", "doe", 
"appVersion", "1.0.1", "active", "yes"));
+    assertTrue(recordReader.hasNext());
+    assertThrows(UncheckedIOException.class, recordReader::next);
+    assertFalse(recordReader.hasNext());
+  }
+
+  private void testPartialLastRowStopOnError(CSVRecordReader recordReader)
+      throws IOException {
+    assertTrue(recordReader.hasNext());
+    assertEquals(recordReader.next().getFieldToValueMap(),
+        createMap("id", "100", "firstName", "jane", "lastName", "doe", 
"appVersion", "1.0.0", "active", "yes"));
+    assertTrue(recordReader.hasNext());
+    assertEquals(recordReader.next().getFieldToValueMap(),
+        createMap("id", "101", "firstName", "john", "lastName", "doe", 
"appVersion", "1.0.1", "active", "yes"));
+    assertFalse(recordReader.hasNext());
   }
 
   @Test
-  public void testReadingDataFileWithNoHeaderAndDataRecordsWithEmptyValues()
-      throws URISyntaxException, IOException {
-    URI uri = 
ClassLoader.getSystemResource("dataFileWithNoHeader2.csv").toURI();
-    File dataFile = new File(uri);
-
-    // test using line iterator
+  public void testLineIteratorReadingDataFileWithMultipleCombinations()
+      throws IOException {
+    File dataFile = getDataFile("dataFileWithMultipleCombinations.csv");
     CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig();
-    readerConfig.setSkipUnParseableLines(true);
-    readerConfig.setHeader("key,num0,num1");
-    List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig, 
null, false);
-    Assert.assertEquals(4, genericRows.size());
+    readerConfig.setCommentMarker('#');
+    readerConfig.setEscapeCharacter('\\');
+    try (CSVRecordReader recordReader = new CSVRecordReader()) {
+      recordReader.init(dataFile, null, readerConfig);
+      testCombinations(recordReader);
+      recordReader.rewind();
+      testCombinations(recordReader);
+    }
 
-    // test using default CSVRecordReader
-    readerConfig.setSkipUnParseableLines(false);
-    genericRows = readCSVRecords(dataFile, readerConfig, null, false);
-    Assert.assertEquals(4, genericRows.size());
+    readerConfig.setStopOnError(true);
+    try (CSVRecordReader recordReader = new CSVRecordReader()) {
+      recordReader.init(dataFile, null, readerConfig);
+      testCombinationsStopOnError(recordReader);
+      recordReader.rewind();
+      testCombinationsStopOnError(recordReader);
+    }
   }
 
-  @Test
-  public void testReadingDataFileWithValidHeaders()
-      throws URISyntaxException, IOException {
-    URI uri = 
ClassLoader.getSystemResource("dataFileWithValidHeaders.csv").toURI();
-    File dataFile = new File(uri);
-
-    // test using line iterator
-    CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig();
-    readerConfig.setSkipUnParseableLines(true);
-    // No explicit header is set and attempt to skip the header should be 
ignored. 1st line would be treated as the
-    // header line.
-    readerConfig.setSkipHeader(false);
-    List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig, 
null, false);
-    Assert.assertEquals(4, genericRows.size());
+  private void testCombinations(CSVRecordReader recordReader)
+      throws IOException {
+    assertTrue(recordReader.hasNext());
+    assertEquals(recordReader.next().getFieldToValueMap(), createMap("id", 
"100", "name", "John"));
+    assertTrue(recordReader.hasNext());
+    assertEquals(recordReader.next().getFieldToValueMap(), createMap("id", 
"101", "name", "Jane"));
+    assertTrue(recordReader.hasNext());
+    assertEquals(recordReader.next().getFieldToValueMap(), createMap("id", 
"102", "name", "Jerry"));
+    assertTrue(recordReader.hasNext());
+    assertEquals(recordReader.next().getFieldToValueMap(), createMap("id", 
"103", "name", "Suzanne"));
+    // NOTE: Here we need to skip twice because the first line is a comment 
line
+    assertTrue(recordReader.hasNext());
+    assertThrows(UncheckedIOException.class, recordReader::next);
+    assertTrue(recordReader.hasNext());
+    assertThrows(UncheckedIOException.class, recordReader::next);
+    assertTrue(recordReader.hasNext());
+    assertEquals(recordReader.next().getFieldToValueMap(), createMap("id", 
"105", "name", "Zack\nZack"));
+    assertTrue(recordReader.hasNext());
+    assertEquals(recordReader.next().getFieldToValueMap(), createMap("id", 
"\"106\"", "name", "\"Ze\""));
+    assertTrue(recordReader.hasNext());
+    assertEquals(recordReader.next().getFieldToValueMap(), createMap("id", 
"107", "name", "Zu"));
+    assertFalse(recordReader.hasNext());
+  }
+
+  private void testCombinationsStopOnError(CSVRecordReader recordReader)
+      throws IOException {
+    assertTrue(recordReader.hasNext());
+    assertEquals(recordReader.next().getFieldToValueMap(), createMap("id", 
"100", "name", "John"));
+    assertTrue(recordReader.hasNext());
+    assertEquals(recordReader.next().getFieldToValueMap(), createMap("id", 
"101", "name", "Jane"));
+    assertTrue(recordReader.hasNext());
+    assertEquals(recordReader.next().getFieldToValueMap(), createMap("id", 
"102", "name", "Jerry"));
+    assertTrue(recordReader.hasNext());
+    assertEquals(recordReader.next().getFieldToValueMap(), createMap("id", 
"103", "name", "Suzanne"));
+    assertFalse(recordReader.hasNext());
+  }
 
-    // test using default CSVRecordReader
-    readerConfig.setSkipUnParseableLines(false);
-    genericRows = readCSVRecords(dataFile, readerConfig, null, false);
-    Assert.assertEquals(4, genericRows.size());
+  private File getDataFile(String fileName) {
+    return new File(ClassLoader.getSystemResource(fileName).getFile());
   }
 
-  private List<GenericRow> readCSVRecords(File dataFile,
-      CSVRecordReaderConfig readerConfig, GenericRow genericRow, boolean 
rewind)
+  private void validate(File dataFile, @Nullable CSVRecordReaderConfig 
readerConfig, int expectedNumRows,
+      @Nullable List<Map<String, Object>> expectedRows)
       throws IOException {
-    List<GenericRow> genericRows = new ArrayList<>();
+    List<GenericRow> genericRows = new ArrayList<>(expectedNumRows);
 
     try (CSVRecordReader recordReader = new CSVRecordReader()) {
       recordReader.init(dataFile, null, readerConfig);
-      GenericRow reuse = new GenericRow();
       while (recordReader.hasNext()) {
-        if (genericRow != null) {
-          recordReader.next(reuse);
-          genericRows.add(reuse);
-        } else {
-          GenericRow nextRow = recordReader.next();
-          genericRows.add(nextRow);
-        }
+        genericRows.add(recordReader.next());
       }
+      assertEquals(genericRows.size(), expectedNumRows);
 
-      if (rewind) {
-        // rewind the reader after reading all the lines
-        recordReader.rewind();
+      // Rewind the reader and read again
+      recordReader.rewind();
+      for (GenericRow row : genericRows) {
+        GenericRow genericRow = recordReader.next();
+        assertEquals(genericRow, row);
       }
+      assertFalse(recordReader.hasNext());
     }
-    return genericRows;
-  }
 
-  private void validateSpaceAroundHeadersAreTrimmed(File dataFile, 
CSVRecordReaderConfig readerConfig)
-      throws IOException {
-    try (CSVRecordReader recordReader = new CSVRecordReader()) {
-      recordReader.init(dataFile, null, readerConfig);
-      Map<String, Integer> headerMap = recordReader.getCSVHeaderMap();
-      Assert.assertEquals(3, headerMap.size());
-      List<String> headers = List.of("firstName", "lastName", "id");
-      for (String header : headers) {
-        // surrounding spaces in headers are trimmed
-        Assert.assertTrue(headerMap.containsKey(header));
+    if (expectedRows != null) {
+      int rowId = 0;
+      for (Map<String, Object> expectedRow : expectedRows) {
+        GenericRow genericRow = genericRows.get(rowId++);
+        assertEquals(genericRow.getFieldToValueMap(), expectedRow);
       }
     }
   }
 
-  private void validateSpaceAroundHeadersAreRetained(File dataFile, 
CSVRecordReaderConfig readerConfig)
+  private void validate(File dataFile, @Nullable CSVRecordReaderConfig 
readerConfig, int expectedNumRows)
       throws IOException {
-    try (CSVRecordReader recordReader = new CSVRecordReader()) {
-      recordReader.init(dataFile, null, readerConfig);
-      Map<String, Integer> headerMap = recordReader.getCSVHeaderMap();
-      Assert.assertEquals(3, headerMap.size());
-      List<String> headers = List.of(" firstName ", " lastName ", " id");
-      for (String header : headers) {
-        // surrounding spaces in headers are trimmed
-        Assert.assertTrue(headerMap.containsKey(header));
-      }
+    validate(dataFile, readerConfig, expectedNumRows, null);
+  }
+
+  private void validate(File dataFile, @Nullable CSVRecordReaderConfig 
readerConfig,
+      List<Map<String, Object>> expectedRows)
+      throws IOException {
+    validate(dataFile, readerConfig, expectedRows.size(), expectedRows);
+  }
+
+  private static Map<String, Object> createMap(String... keyValues) {
+    Map<String, Object> map = new HashMap<>();
+    for (int i = 0; i < keyValues.length; i += 2) {
+      map.put(keyValues[i], keyValues[i + 1]);
     }
+    return map;
   }
 }
diff --git 
a/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileBasic.csv
 
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileBasic.csv
index c2b0fe3262..c2b7445426 100644
--- 
a/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileBasic.csv
+++ 
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileBasic.csv
@@ -1,5 +1,5 @@
 id,name
-"100","John"
-"101","Jane"
-"102","Alice"
-"103","Bob"
+100,John
+101,Jane
+102,Alice
+103,Bob
diff --git 
a/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileEmpty.csv
 
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileEmpty.csv
new file mode 100644
index 0000000000..e69de29bb2
diff --git 
a/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithAlternateDelimiter.csv
 
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithAlternateDelimiter.csv
index 09d3a2ce2c..4b4c2fffe9 100644
--- 
a/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithAlternateDelimiter.csv
+++ 
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithAlternateDelimiter.csv
@@ -1,4 +1,4 @@
-id|fisrtName|lastName
+id|firstName|lastName
 100|John|Doe
 101|Jane|Doe
 102|Jen|Doe
diff --git 
a/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithCustomNull.csv
 
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithCustomNull.csv
new file mode 100644
index 0000000000..af119fc1a1
--- /dev/null
+++ 
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithCustomNull.csv
@@ -0,0 +1,5 @@
+id,name
+100,NULL
+,Jane
+NULL,NULL
+,
diff --git 
a/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithMultiLineValues.csv
 
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithMultiLineValues.csv
new file mode 100644
index 0000000000..927983fa98
--- /dev/null
+++ 
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithMultiLineValues.csv
@@ -0,0 +1,4 @@
+id,name
+100,"John
+101,Jane"
+102,Alice
diff --git 
a/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithMultipleCombinations.csv
 
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithMultipleCombinations.csv
index 5f173f4b6c..8590793e1e 100644
--- 
a/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithMultipleCombinations.csv
+++ 
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithMultipleCombinations.csv
@@ -10,8 +10,14 @@ id,name
 102,Jerry
 
 103,Suzanne
-# below line is  unparseable by the commons-csv library
+# below line is unparseable by the commons-csv library
 "104","Yu"s"
-"105","Zack"
+
+# below line is multi-line value
+"105","Zack
+Zack"
+
+# below line is escaped quotes
 \"106\",\"Ze\"
+
 107,Zu
diff --git 
a/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithMultipleCombinationsParseable.csv
 
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithMultipleCombinationsParseable.csv
deleted file mode 100644
index d804f04101..0000000000
--- 
a/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithMultipleCombinationsParseable.csv
+++ /dev/null
@@ -1,15 +0,0 @@
-id,name
-
-
-100,John
-# ignore line 1
-
-# ignore line 2
-
-101,Jane
-102,Jerry
-
-103,Suzanne
-"105","Zack"
-\"106\",\"Ze\"
-107,Zu
diff --git 
a/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithNoHeader2.csv
 
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithNoHeaderAndEmptyValues.csv
similarity index 100%
rename from 
pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithNoHeader2.csv
rename to 
pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithNoHeaderAndEmptyValues.csv
diff --git 
a/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithQuotedHeaders.csv
 
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithQuotedHeaders.csv
deleted file mode 100644
index ec7033d1d4..0000000000
--- 
a/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithQuotedHeaders.csv
+++ /dev/null
@@ -1,3 +0,0 @@
-"id","name"
-"100","Jane"
-"101","John"
diff --git 
a/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithValidHeaders.csv
 
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithQuotes.csv
similarity index 100%
rename from 
pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithValidHeaders.csv
rename to 
pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithQuotes.csv
diff --git 
a/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithSingleColumn.csv
 
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithSingleColumn.csv
new file mode 100644
index 0000000000..856771f621
--- /dev/null
+++ 
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithSingleColumn.csv
@@ -0,0 +1,4 @@
+name
+John
+Jane
+Jen
diff --git 
a/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithSpaceAroundHeaders.csv
 
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithSpaceAroundHeaders.csv
deleted file mode 100644
index c0f140a69c..0000000000
--- 
a/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithSpaceAroundHeaders.csv
+++ /dev/null
@@ -1,4 +0,0 @@
- firstName , lastName , id
-John,Doe,100
-Jane,Doe,101
-Jen,Doe,102
diff --git 
a/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithSurroundingSpaces.csv
 
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithSurroundingSpaces.csv
new file mode 100644
index 0000000000..40678051c3
--- /dev/null
+++ 
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithSurroundingSpaces.csv
@@ -0,0 +1,4 @@
+ firstName , lastName , id
+John  , Doe,100
+Jane, Doe,  101
+Jen,Doe ,102
diff --git 
a/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithUnparseableLines.csv
 
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithUnparseableFirstLine.csv
similarity index 100%
rename from 
pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithUnparseableLines.csv
rename to 
pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithUnparseableFirstLine.csv
diff --git 
a/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithUnparseableLastLine.csv
 
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithUnparseableLastLine.csv
new file mode 100644
index 0000000000..ef9b5942fb
--- /dev/null
+++ 
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithUnparseableLastLine.csv
@@ -0,0 +1,3 @@
+id,name
+"100","John"
+"101","Jane"s"
diff --git 
a/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileBasic.csv
 
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithUnparseableLine.csv
similarity index 58%
copy from 
pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileBasic.csv
copy to 
pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithUnparseableLine.csv
index c2b0fe3262..e0a711badf 100644
--- 
a/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileBasic.csv
+++ 
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithUnparseableLine.csv
@@ -1,5 +1,4 @@
 id,name
 "100","John"
-"101","Jane"
+"101","Jane"s"
 "102","Alice"
-"103","Bob"
diff --git 
a/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithUnparseableLines2.csv
 
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithUnparseableLines2.csv
deleted file mode 100644
index 80e9a736c3..0000000000
--- 
a/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithUnparseableLines2.csv
+++ /dev/null
@@ -1,5 +0,0 @@
-id,name
-"100","John"s"
-"101","Jane"
-"102","Alice"
-"103","Bob"
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReader.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReader.java
index 020b2b3135..db3a455da3 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReader.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReader.java
@@ -46,11 +46,14 @@ public interface RecordReader extends Closeable, 
Serializable {
 
   /**
    * Return <code>true</code> if more records remain to be read.
+   * <p>This method should not throw exception. Caller is not responsible for 
handling exceptions from this method.
    */
   boolean hasNext();
 
   /**
    * Get the next record.
+   * <p>This method should be called only if {@link #hasNext()} returns 
<code>true</code>. Caller is responsible for
+   * handling exceptions from this method and skip the row if user wants to 
continue reading the remaining rows.
    */
   default GenericRow next()
       throws IOException {
@@ -60,6 +63,8 @@ public interface RecordReader extends Closeable, Serializable 
{
   /**
    * Get the next record. Re-use the given row to reduce garbage.
    * <p>The passed in row should be cleared before calling this method.
+   * <p>This method should be called only if {@link #hasNext()} returns 
<code>true</code>. Caller is responsible for
+   * handling exceptions from this method and skip the row if user wants to 
continue reading the remaining rows.
    *
    * TODO: Consider clearing the row within the record reader to simplify the 
caller
    */
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReaderFileConfig.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReaderFileConfig.java
index e7566cb0ff..cd8c600399 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReaderFileConfig.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReaderFileConfig.java
@@ -99,10 +99,13 @@ public class RecordReaderFileConfig {
 
   // Return true if RecordReader is done processing.
   public boolean isRecordReaderDone() {
-    if (_isRecordReaderInitialized) {
-      return !_recordReader.hasNext();
+    if (!_isRecordReaderInitialized) {
+      return false;
+    }
+    if (_isRecordReaderClosed) {
+      return true;
     }
-    return false;
+    return !_recordReader.hasNext();
   }
 
   // For testing purposes only.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to