This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new c3e348693f CSVRecordReader. Refactor init and line iterator parsing logic (#13913) c3e348693f is described below commit c3e348693fe4b27a01e088be96160904cbb66ab5 Author: Suvodeep Pyne <suvodeep-p...@users.noreply.github.com> AuthorDate: Wed Sep 4 10:35:13 2024 -0700 CSVRecordReader. Refactor init and line iterator parsing logic (#13913) --- .../plugin/inputformat/csv/CSVRecordReader.java | 389 ++++++++++----------- .../inputformat/csv/CSVRecordReaderTest.java | 41 +++ .../pinot-csv/src/test/resources/dataFileBasic.csv | 5 + .../resources/dataFileWithUnparseableLines2.csv | 5 + 4 files changed, 237 insertions(+), 203 deletions(-) diff --git a/pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReader.java b/pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReader.java index c4dc8c167f..daf6041a09 100644 --- a/pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReader.java +++ b/pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReader.java @@ -29,6 +29,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.NoSuchElementException; +import java.util.Optional; import java.util.Set; import javax.annotation.Nullable; import javax.annotation.concurrent.NotThreadSafe; @@ -58,112 +59,124 @@ public class CSVRecordReader implements RecordReader { private Iterator<CSVRecord> _iterator; private CSVRecordExtractor _recordExtractor; private Map<String, Integer> _headerMap = new HashMap<>(); - private boolean _isHeaderProvided = false; - // line iterator specific variables - private boolean _useLineIterator = false; - private boolean _skipHeaderRecord = false; - private long _skippedLinesCount; private BufferedReader _bufferedReader; - private String _nextLine; - private GenericRow _nextRecord; + private CSVRecordReaderConfig _config = null; public CSVRecordReader() { } - @Override - public void init(File dataFile, @Nullable Set<String> fieldsToRead, @Nullable RecordReaderConfig recordReaderConfig) - throws IOException { - _dataFile = dataFile; - CSVRecordReaderConfig config = (CSVRecordReaderConfig) recordReaderConfig; - Character multiValueDelimiter = null; - if (config == null) { - _format = CSVFormat.DEFAULT.builder().setDelimiter(CSVRecordReaderConfig.DEFAULT_DELIMITER).setHeader().build(); - multiValueDelimiter = CSVRecordReaderConfig.DEFAULT_MULTI_VALUE_DELIMITER; - } else { - CSVFormat format; - String formatString = config.getFileFormat(); - if (formatString == null) { - format = CSVFormat.DEFAULT; - } else { - switch (formatString.toUpperCase()) { - case "EXCEL": - format = CSVFormat.EXCEL; - break; - case "MYSQL": - format = CSVFormat.MYSQL; - break; - case "RFC4180": - format = CSVFormat.RFC4180; - break; - case "TDF": - format = CSVFormat.TDF; - break; - default: - format = CSVFormat.DEFAULT; - break; - } - } - char delimiter = config.getDelimiter(); - format = format.builder().setDelimiter(delimiter).build(); + private static CSVFormat baseCsvFormat(CSVRecordReaderConfig config) { + if (config.getFileFormat() == null) { + return CSVFormat.DEFAULT; + } + switch (config.getFileFormat().toUpperCase()) { + case "EXCEL": + return CSVFormat.EXCEL; + case "MYSQL": + return CSVFormat.MYSQL; + case "RFC4180": + return CSVFormat.RFC4180; + case "TDF": + return CSVFormat.TDF; + default: + return CSVFormat.DEFAULT; + } + } - if (config.isSkipUnParseableLines()) { - _useLineIterator = true; + private static Map<String, Integer> parseHeaderMapFromLine(CSVFormat format, String line) { + try (StringReader stringReader = new StringReader(line)) { + try (CSVParser parser = format.parse(stringReader)) { + return parser.getHeaderMap(); } + } catch (IOException e) { + throw new RuntimeException("Failed to parse header from line: " + line, e); + } + } - _isHeaderProvided = config.getHeader() != null; - _skipHeaderRecord = config.isSkipHeader(); - _format = format.builder() - .setHeader() - .setSkipHeaderRecord(config.isSkipHeader()) - .setCommentMarker(config.getCommentMarker()) - .setEscape(config.getEscapeCharacter()) - .setIgnoreEmptyLines(config.isIgnoreEmptyLines()) - .setIgnoreSurroundingSpaces(config.isIgnoreSurroundingSpaces()) - .setQuote(config.getQuoteCharacter()) - .build(); - - if (config.getQuoteMode() != null) { - _format = _format.builder().setQuoteMode(QuoteMode.valueOf(config.getQuoteMode())).build(); - } + private static Character getMultiValueDelimiter(CSVRecordReaderConfig config) { + if (config == null) { + return CSVRecordReaderConfig.DEFAULT_MULTI_VALUE_DELIMITER; + } else if (config.isMultiValueDelimiterEnabled()) { + return config.getMultiValueDelimiter(); + } + return null; + } - if (config.getRecordSeparator() != null) { - _format = _format.builder().setRecordSeparator(config.getRecordSeparator()).build(); - } + private static boolean useLineIterator(CSVRecordReaderConfig config) { + return config != null && config.isSkipUnParseableLines(); + } - String nullString = config.getNullStringValue(); - if (nullString != null) { - _format = _format.builder().setNullString(nullString).build(); - } + @Override + public void init(File dataFile, @Nullable Set<String> fieldsToRead, @Nullable RecordReaderConfig recordReaderConfig) + throws IOException { + _dataFile = dataFile; + _config = (CSVRecordReaderConfig) recordReaderConfig; + _format = createCSVFormat(); + + // If header is provided by the client, use it. Otherwise, parse the header from the first line of the file. + // Overwrite the format with the header information. + Optional.ofNullable(_config).map(CSVRecordReaderConfig::getHeader).ifPresent(header -> { + _headerMap = parseHeaderMapFromLine(_format, header); + _format = _format.builder().setHeader(_headerMap.keySet().toArray(new String[0])).build(); + }); + + validateHeaderWithDelimiter(); + initIterator(); + initRecordExtractor(fieldsToRead); + } - if (_isHeaderProvided) { - _headerMap = parseLineAsHeader(config.getHeader()); - _format = _format.builder().setHeader(_headerMap.keySet().toArray(new String[0])).build(); - if (!_useLineIterator) { - validateHeaderForDelimiter(delimiter, config.getHeader(), _format); - } - } + private void initRecordExtractor(Set<String> fieldsToRead) { + final CSVRecordExtractorConfig recordExtractorConfig = new CSVRecordExtractorConfig(); + recordExtractorConfig.setMultiValueDelimiter(getMultiValueDelimiter(_config)); + recordExtractorConfig.setColumnNames(_headerMap.keySet()); + _recordExtractor = new CSVRecordExtractor(); + _recordExtractor.init(fieldsToRead, recordExtractorConfig); + } - if (config.isMultiValueDelimiterEnabled()) { - multiValueDelimiter = config.getMultiValueDelimiter(); - } + private CSVFormat createCSVFormat() { + if (_config == null) { + return CSVFormat.DEFAULT.builder().setDelimiter(CSVRecordReaderConfig.DEFAULT_DELIMITER).setHeader().build(); } - _recordExtractor = new CSVRecordExtractor(); - init(); + final CSVFormat.Builder builder = baseCsvFormat(_config).builder() + .setDelimiter(_config.getDelimiter()) + .setHeader() + .setSkipHeaderRecord(_config.isSkipHeader()) + .setCommentMarker(_config.getCommentMarker()) + .setEscape(_config.getEscapeCharacter()) + .setIgnoreEmptyLines(_config.isIgnoreEmptyLines()) + .setIgnoreSurroundingSpaces(_config.isIgnoreSurroundingSpaces()) + .setQuote(_config.getQuoteCharacter()); + + Optional.ofNullable(_config.getQuoteMode()).map(QuoteMode::valueOf).ifPresent(builder::setQuoteMode); + Optional.ofNullable(_config.getRecordSeparator()).ifPresent(builder::setRecordSeparator); + Optional.ofNullable(_config.getNullStringValue()).ifPresent(builder::setNullString); + + return builder.build(); + } - CSVRecordExtractorConfig recordExtractorConfig = new CSVRecordExtractorConfig(); - recordExtractorConfig.setMultiValueDelimiter(multiValueDelimiter); - recordExtractorConfig.setColumnNames(_headerMap.keySet()); - _recordExtractor.init(fieldsToRead, recordExtractorConfig); + private void initIterator() + throws IOException { + if (useLineIterator(_config)) { + _bufferedReader = new BufferedReader(new FileReader(_dataFile), 1024 * 32); // 32KB buffer size + _iterator = new LineIterator(); + } else { + _parser = _format.parse(RecordReaderUtils.getBufferedReader(_dataFile)); + _headerMap = _parser.getHeaderMap(); + _iterator = _parser.iterator(); + } } - private void validateHeaderForDelimiter(char delimiter, String csvHeader, CSVFormat format) + private void validateHeaderWithDelimiter() throws IOException { - CSVParser parser = format.parse(RecordReaderUtils.getBufferedReader(_dataFile)); - Iterator<CSVRecord> iterator = parser.iterator(); - if (iterator.hasNext() && recordHasMultipleValues(iterator.next()) && delimiterNotPresentInHeader(delimiter, - csvHeader)) { + if (_config == null || _config.getHeader() == null || useLineIterator(_config)) { + return; + } + final CSVParser parser = _format.parse(RecordReaderUtils.getBufferedReader(_dataFile)); + final Iterator<CSVRecord> iterator = parser.iterator(); + if (iterator.hasNext() && recordHasMultipleValues(iterator.next()) && delimiterNotPresentInHeader( + _config.getDelimiter(), _config.getHeader())) { throw new IllegalArgumentException("Configured header does not contain the configured delimiter"); } } @@ -176,17 +189,6 @@ public class CSVRecordReader implements RecordReader { return !StringUtils.contains(csvHeader, delimiter); } - private void init() - throws IOException { - if (_useLineIterator) { - initLineIteratorResources(); - return; - } - _parser = _format.parse(RecordReaderUtils.getBufferedReader(_dataFile)); - _headerMap = _parser.getHeaderMap(); - _iterator = _parser.iterator(); - } - /** * Returns a copy of the header map that iterates in column order. * <p> @@ -202,158 +204,139 @@ public class CSVRecordReader implements RecordReader { @Override public boolean hasNext() { - if (_useLineIterator) { - // When line iterator is used, the call to this method won't throw an exception. The default and the only iterator - // from commons-csv library can throw an exception upon calling the hasNext() method. The line iterator overcomes - // this limitation. - return readNextRecord(); - } return _iterator.hasNext(); } @Override public GenericRow next() throws IOException { - if (_useLineIterator) { - return _nextRecord; - } else { - return next(new GenericRow()); - } + return next(new GenericRow()); } @Override public GenericRow next(GenericRow reuse) throws IOException { - if (_useLineIterator) { - reuse.init(_nextRecord); - } else { - CSVRecord record = _iterator.next(); - _recordExtractor.extract(record, reuse); - } + CSVRecord record = _iterator.next(); + _recordExtractor.extract(record, reuse); return reuse; } @Override public void rewind() throws IOException { - if (_useLineIterator) { - resetLineIteratorResources(); - } - if (_parser != null && !_parser.isClosed()) { _parser.close(); } - - init(); + closeIterator(); + initIterator(); } @Override public void close() throws IOException { - if (_useLineIterator) { - resetLineIteratorResources(); - } + closeIterator(); if (_parser != null && !_parser.isClosed()) { _parser.close(); } } - private boolean readNextRecord() { - try { - _nextRecord = null; - GenericRow genericRow = new GenericRow(); - readNextLine(genericRow); - _nextRecord = genericRow; - } catch (Exception e) { - LOGGER.info("Error parsing next record.", e); + private void closeIterator() + throws IOException { + // if header is not provided by the client it would be rebuilt. When it's provided by the client it's initialized + // once in the constructor + if (useLineIterator(_config) && _config.getHeader() == null) { + _headerMap.clear(); + } + + if (_bufferedReader != null) { + _bufferedReader.close(); } - return _nextRecord != null; } - private void readNextLine(GenericRow reuse) - throws IOException { - while (_nextLine != null) { - try (Reader reader = new StringReader(_nextLine)) { - try (CSVParser csvParser = _format.parse(reader)) { - List<CSVRecord> csvRecords = csvParser.getRecords(); - if (csvRecords != null && csvRecords.size() > 0) { - // There would be only one record as lines are read one after the other - CSVRecord record = csvRecords.get(0); - _recordExtractor.extract(record, reuse); - break; - } else { - // Can be thrown on: 1) Empty lines 2) Commented lines - throw new NoSuchElementException("Failed to find any records"); + class LineIterator implements Iterator<CSVRecord> { + private String _nextLine; + private CSVRecord _current; + + public LineIterator() { + init(); + } + + private void init() { + try { + if (_config.getHeader() != null) { + if (_config.isSkipHeader()) { + // When skip header config is set and header is supplied – skip the first line from the input file + _bufferedReader.readLine(); + // turn off the property so that it doesn't interfere with further parsing + _format = _format.builder().setSkipHeaderRecord(false).build(); } - } catch (Exception e) { - _skippedLinesCount++; - LOGGER.debug("Skipped input line: {} from file: {}", _nextLine, _dataFile, e); - // Find the next line that can be parsed - _nextLine = _bufferedReader.readLine(); + } else { + // read the first line + String headerLine = _bufferedReader.readLine(); + _headerMap = parseHeaderMapFromLine(_format, headerLine); + // If header isn't provided, the first line would be set as header and the 'skipHeader' property + // is set to false. + _format = _format.builder() + .setSkipHeaderRecord(false) + .setHeader(_headerMap.keySet().toArray(new String[0])) + .build(); } + _nextLine = _bufferedReader.readLine(); + } catch (IOException e) { + throw new RuntimeException(e); } } - if (_nextLine != null) { - // Advance the pointer to the next line for future reading - _nextLine = _bufferedReader.readLine(); - } else { - throw new RuntimeException("No more parseable lines. Line iterator reached end of file."); - } - } - private Map<String, Integer> parseLineAsHeader(String line) - throws IOException { - Map<String, Integer> headerMap; - try (StringReader stringReader = new StringReader(line)) { - try (CSVParser parser = _format.parse(stringReader)) { - headerMap = parser.getHeaderMap(); + private CSVRecord getNextRecord() { + while (_nextLine != null) { + try (Reader reader = new StringReader(_nextLine)) { + try (CSVParser csvParser = _format.parse(reader)) { + List<CSVRecord> csvRecords = csvParser.getRecords(); + if (csvRecords == null || csvRecords.isEmpty()) { + // Can be thrown on: 1) Empty lines 2) Commented lines + throw new NoSuchElementException("Failed to find any records"); + } + // There would be only one record as lines are read one after the other + CSVRecord csvRecord = csvRecords.get(0); + + // move the pointer to the next line + _nextLine = _bufferedReader.readLine(); + return csvRecord; + } catch (Exception e) { + // Find the next line that can be parsed + _nextLine = _bufferedReader.readLine(); + } + } catch (IOException e) { + throw new RuntimeException(e); + } } + return null; } - return headerMap; - } - private void initLineIteratorResources() - throws IOException { - _bufferedReader = new BufferedReader(new FileReader(_dataFile), 1024 * 32); // 32KB buffer size - - // When header is supplied by the client - if (_isHeaderProvided) { - if (_skipHeaderRecord) { - // When skip header config is set and header is supplied – skip the first line from the input file - _bufferedReader.readLine(); - // turn off the property so that it doesn't interfere with further parsing - _format = _format.builder().setSkipHeaderRecord(false).build(); + @Override + public boolean hasNext() { + if (_current == null) { + _current = getNextRecord(); } - } else { - // read the first line - String headerLine = _bufferedReader.readLine(); - _headerMap = parseLineAsHeader(headerLine); - _format = _format.builder() - // If header isn't provided, the first line would be set as header and the 'skipHeader' property - // is set to false. - .setSkipHeaderRecord(false) - .setHeader(_headerMap.keySet().toArray(new String[0])) - .build(); - } - _nextLine = _bufferedReader.readLine(); - } - private void resetLineIteratorResources() - throws IOException { - _nextLine = null; + return _current != null; + } - LOGGER.info("Total lines skipped in file: {} were: {}", _dataFile, _skippedLinesCount); - _skippedLinesCount = 0; + @Override + public CSVRecord next() { + CSVRecord next = _current; + _current = null; - // if header is not provided by the client it would be rebuilt. When it's provided by the client it's initialized - // once in the constructor - if (!_isHeaderProvided) { - _headerMap.clear(); - } + if (next == null) { + // hasNext() wasn't called before + next = getNextRecord(); + if (next == null) { + throw new NoSuchElementException("No more CSV records available"); + } + } - if (_bufferedReader != null) { - _bufferedReader.close(); + return next; } } } diff --git a/pinot-plugins/pinot-input-format/pinot-csv/src/test/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReaderTest.java b/pinot-plugins/pinot-input-format/pinot-csv/src/test/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReaderTest.java index d245fb33c0..6a1d86a48d 100644 --- a/pinot-plugins/pinot-input-format/pinot-csv/src/test/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReaderTest.java +++ b/pinot-plugins/pinot-input-format/pinot-csv/src/test/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReaderTest.java @@ -324,6 +324,47 @@ public class CSVRecordReaderTest extends AbstractRecordReaderTest { Assert.assertEquals(1, genericRows.size()); } + @Test + public void testLineIteratorReadingDataFileWithUnparseableLinesWithRewind() + throws URISyntaxException, IOException { + URI uri = ClassLoader.getSystemResource("dataFileWithUnparseableLines2.csv").toURI(); + File dataFile = new File(uri); + + CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig(); + readerConfig.setSkipUnParseableLines(true); + final List<GenericRow> genericRows1 = readCSVRecords(dataFile, readerConfig, null, true); + Assert.assertEquals(3, genericRows1.size()); + + // Start reading again; results should be same + final List<GenericRow> genericRows2 = readCSVRecords(dataFile, readerConfig, null, true); + Assert.assertEquals(3, genericRows2.size()); + + // Check that the rows are the same + for (int i = 0; i < genericRows1.size(); i++) { + Assert.assertEquals(genericRows1.get(i), genericRows2.get(i)); + } + } + + @Test + public void testReadingDataFileWithRewind() + throws URISyntaxException, IOException { + URI uri = ClassLoader.getSystemResource("dataFileBasic.csv").toURI(); + File dataFile = new File(uri); + + CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig(); + List<GenericRow> genericRows1 = readCSVRecords(dataFile, readerConfig, null, true); + Assert.assertEquals(4, genericRows1.size()); + + // Start reading again; results should be same + List<GenericRow> genericRows2 = readCSVRecords(dataFile, readerConfig, null, true); + Assert.assertEquals(4, genericRows2.size()); + + // Check that the rows are the same + for (int i = 0; i < genericRows1.size(); i++) { + Assert.assertEquals(genericRows1.get(i), genericRows2.get(i)); + } + } + @Test (expectedExceptions = RuntimeException.class) public void testDefaultCsvReaderExceptionReadingDataFileWithUnparseableLines() throws URISyntaxException, IOException { diff --git a/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileBasic.csv b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileBasic.csv new file mode 100644 index 0000000000..c2b0fe3262 --- /dev/null +++ b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileBasic.csv @@ -0,0 +1,5 @@ +id,name +"100","John" +"101","Jane" +"102","Alice" +"103","Bob" diff --git a/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithUnparseableLines2.csv b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithUnparseableLines2.csv new file mode 100644 index 0000000000..80e9a736c3 --- /dev/null +++ b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithUnparseableLines2.csv @@ -0,0 +1,5 @@ +id,name +"100","John"s" +"101","Jane" +"102","Alice" +"103","Bob" --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org