swaminathanmanish commented on code in PR #13913: URL: https://github.com/apache/pinot/pull/13913#discussion_r1740153504
########## pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReader.java: ########## @@ -60,102 +61,119 @@ public class CSVRecordReader implements RecordReader { private Map<String, Integer> _headerMap = new HashMap<>(); private boolean _isHeaderProvided = false; - // line iterator specific variables - private boolean _useLineIterator = false; - private boolean _skipHeaderRecord = false; - private long _skippedLinesCount; private BufferedReader _bufferedReader; - private String _nextLine; - private GenericRow _nextRecord; + private CSVRecordReaderConfig _config = null; public CSVRecordReader() { } - @Override - public void init(File dataFile, @Nullable Set<String> fieldsToRead, @Nullable RecordReaderConfig recordReaderConfig) - throws IOException { - _dataFile = dataFile; - CSVRecordReaderConfig config = (CSVRecordReaderConfig) recordReaderConfig; - Character multiValueDelimiter = null; - if (config == null) { - _format = CSVFormat.DEFAULT.builder().setDelimiter(CSVRecordReaderConfig.DEFAULT_DELIMITER).setHeader().build(); - multiValueDelimiter = CSVRecordReaderConfig.DEFAULT_MULTI_VALUE_DELIMITER; - } else { - CSVFormat format; - String formatString = config.getFileFormat(); - if (formatString == null) { - format = CSVFormat.DEFAULT; - } else { - switch (formatString.toUpperCase()) { - case "EXCEL": - format = CSVFormat.EXCEL; - break; - case "MYSQL": - format = CSVFormat.MYSQL; - break; - case "RFC4180": - format = CSVFormat.RFC4180; - break; - case "TDF": - format = CSVFormat.TDF; - break; - default: - format = CSVFormat.DEFAULT; - break; - } - } - char delimiter = config.getDelimiter(); - format = format.builder().setDelimiter(delimiter).build(); + private static CSVFormat baseCsvFormat(CSVRecordReaderConfig config) { + if (config.getFileFormat() == null) { + return CSVFormat.DEFAULT; + } + switch (config.getFileFormat().toUpperCase()) { + case "EXCEL": + return CSVFormat.EXCEL; + case "MYSQL": + return CSVFormat.MYSQL; + case "RFC4180": + return CSVFormat.RFC4180; + case "TDF": + return CSVFormat.TDF; + default: + return CSVFormat.DEFAULT; + } + } - if (config.isSkipUnParseableLines()) { - _useLineIterator = true; - } + private static CSVFormat defaultFormat() { + return CSVFormat.DEFAULT.builder().setDelimiter(CSVRecordReaderConfig.DEFAULT_DELIMITER).setHeader().build(); + } - _isHeaderProvided = config.getHeader() != null; - _skipHeaderRecord = config.isSkipHeader(); - _format = format.builder() - .setHeader() - .setSkipHeaderRecord(config.isSkipHeader()) - .setCommentMarker(config.getCommentMarker()) - .setEscape(config.getEscapeCharacter()) - .setIgnoreEmptyLines(config.isIgnoreEmptyLines()) - .setIgnoreSurroundingSpaces(config.isIgnoreSurroundingSpaces()) - .setQuote(config.getQuoteCharacter()) - .build(); - - if (config.getQuoteMode() != null) { - _format = _format.builder().setQuoteMode(QuoteMode.valueOf(config.getQuoteMode())).build(); - } + private static <T> Optional<T> optional(T value) { + return Optional.ofNullable(value); + } - if (config.getRecordSeparator() != null) { - _format = _format.builder().setRecordSeparator(config.getRecordSeparator()).build(); - } + private static CSVFormat.Builder formatBuilder(CSVRecordReaderConfig config) { + final CSVFormat.Builder builder = baseCsvFormat(config).builder().setDelimiter(config.getDelimiter()).setHeader() + .setSkipHeaderRecord(config.isSkipHeader()).setCommentMarker(config.getCommentMarker()) + .setEscape(config.getEscapeCharacter()).setIgnoreEmptyLines(config.isIgnoreEmptyLines()) + .setIgnoreSurroundingSpaces(config.isIgnoreSurroundingSpaces()).setQuote(config.getQuoteCharacter()); + + optional(config.getQuoteMode()).map(QuoteMode::valueOf).ifPresent(builder::setQuoteMode); + optional(config.getRecordSeparator()).ifPresent(builder::setRecordSeparator); + optional(config.getNullStringValue()).ifPresent(builder::setNullString); + return builder; + } - String nullString = config.getNullStringValue(); - if (nullString != null) { - _format = _format.builder().setNullString(nullString).build(); + private static Map<String, Integer> parseLineAsHeader(CSVFormat format, String line) + throws IOException { + try (StringReader stringReader = new StringReader(line)) { + try (CSVParser parser = format.parse(stringReader)) { + return parser.getHeaderMap(); } + } + } + + private static Character getMultiValueDelimiter(CSVRecordReaderConfig config) { + if (config == null) { + return CSVRecordReaderConfig.DEFAULT_MULTI_VALUE_DELIMITER; + } else if (config.isMultiValueDelimiterEnabled()) { + return config.getMultiValueDelimiter(); + } + return null; Review Comment: When can this (null) happen and is it an issue? ########## pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReader.java: ########## @@ -202,158 +209,142 @@ public Map<String, Integer> getCSVHeaderMap() { @Override public boolean hasNext() { - if (_useLineIterator) { - // When line iterator is used, the call to this method won't throw an exception. The default and the only iterator - // from commons-csv library can throw an exception upon calling the hasNext() method. The line iterator overcomes - // this limitation. - return readNextRecord(); - } return _iterator.hasNext(); } @Override public GenericRow next() throws IOException { - if (_useLineIterator) { - return _nextRecord; - } else { - return next(new GenericRow()); - } + return next(new GenericRow()); } @Override public GenericRow next(GenericRow reuse) throws IOException { - if (_useLineIterator) { - reuse.init(_nextRecord); - } else { - CSVRecord record = _iterator.next(); - _recordExtractor.extract(record, reuse); - } + CSVRecord record = _iterator.next(); + _recordExtractor.extract(record, reuse); return reuse; } @Override public void rewind() throws IOException { - if (_useLineIterator) { - resetLineIteratorResources(); - } - if (_parser != null && !_parser.isClosed()) { _parser.close(); } - - init(); + closeIterator(); + initIterator(); } @Override public void close() throws IOException { - if (_useLineIterator) { - resetLineIteratorResources(); - } + closeIterator(); if (_parser != null && !_parser.isClosed()) { _parser.close(); } } - private boolean readNextRecord() { - try { - _nextRecord = null; - GenericRow genericRow = new GenericRow(); - readNextLine(genericRow); - _nextRecord = genericRow; - } catch (Exception e) { - LOGGER.info("Error parsing next record.", e); + private void closeIterator() + throws IOException { + // if header is not provided by the client it would be rebuilt. When it's provided by the client it's initialized + // once in the constructor + if (useLineIterator(_config) && !_isHeaderProvided) { + _headerMap.clear(); + } + + if (_bufferedReader != null) { + _bufferedReader.close(); } - return _nextRecord != null; } - private void readNextLine(GenericRow reuse) - throws IOException { - while (_nextLine != null) { - try (Reader reader = new StringReader(_nextLine)) { - try (CSVParser csvParser = _format.parse(reader)) { - List<CSVRecord> csvRecords = csvParser.getRecords(); - if (csvRecords != null && csvRecords.size() > 0) { - // There would be only one record as lines are read one after the other - CSVRecord record = csvRecords.get(0); - _recordExtractor.extract(record, reuse); - break; - } else { - // Can be thrown on: 1) Empty lines 2) Commented lines - throw new NoSuchElementException("Failed to find any records"); + class LineIterator implements Iterator<CSVRecord> { + private final boolean _skipHeaderRecord; + + private String _nextLine; + + private CSVRecord _current; + + public LineIterator(CSVRecordReaderConfig config) { + _skipHeaderRecord = config.isSkipHeader(); + + init(); + } + + private void init() { + try { + if (_isHeaderProvided) { + if (_skipHeaderRecord) { + // When skip header config is set and header is supplied – skip the first line from the input file + _bufferedReader.readLine(); + // turn off the property so that it doesn't interfere with further parsing + _format = _format.builder().setSkipHeaderRecord(false).build(); } - } catch (Exception e) { - _skippedLinesCount++; - LOGGER.debug("Skipped input line: {} from file: {}", _nextLine, _dataFile, e); - // Find the next line that can be parsed - _nextLine = _bufferedReader.readLine(); + } else { + // read the first line + String headerLine = _bufferedReader.readLine(); + _headerMap = parseLineAsHeader(_format, headerLine); + _format = _format.builder() + // If header isn't provided, the first line would be set as header and the 'skipHeader' property + // is set to false. + .setSkipHeaderRecord(false).setHeader(_headerMap.keySet().toArray(new String[0])).build(); } + _nextLine = _bufferedReader.readLine(); + } catch (IOException e) { + throw new RuntimeException(e); Review Comment: There's no clean up needed on failure (eg: for _bufferedReader) ########## pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReader.java: ########## @@ -60,102 +61,119 @@ public class CSVRecordReader implements RecordReader { private Map<String, Integer> _headerMap = new HashMap<>(); private boolean _isHeaderProvided = false; - // line iterator specific variables - private boolean _useLineIterator = false; - private boolean _skipHeaderRecord = false; - private long _skippedLinesCount; private BufferedReader _bufferedReader; - private String _nextLine; - private GenericRow _nextRecord; + private CSVRecordReaderConfig _config = null; public CSVRecordReader() { } - @Override - public void init(File dataFile, @Nullable Set<String> fieldsToRead, @Nullable RecordReaderConfig recordReaderConfig) - throws IOException { - _dataFile = dataFile; - CSVRecordReaderConfig config = (CSVRecordReaderConfig) recordReaderConfig; - Character multiValueDelimiter = null; - if (config == null) { - _format = CSVFormat.DEFAULT.builder().setDelimiter(CSVRecordReaderConfig.DEFAULT_DELIMITER).setHeader().build(); - multiValueDelimiter = CSVRecordReaderConfig.DEFAULT_MULTI_VALUE_DELIMITER; - } else { - CSVFormat format; - String formatString = config.getFileFormat(); - if (formatString == null) { - format = CSVFormat.DEFAULT; - } else { - switch (formatString.toUpperCase()) { - case "EXCEL": - format = CSVFormat.EXCEL; - break; - case "MYSQL": - format = CSVFormat.MYSQL; - break; - case "RFC4180": - format = CSVFormat.RFC4180; - break; - case "TDF": - format = CSVFormat.TDF; - break; - default: - format = CSVFormat.DEFAULT; - break; - } - } - char delimiter = config.getDelimiter(); - format = format.builder().setDelimiter(delimiter).build(); + private static CSVFormat baseCsvFormat(CSVRecordReaderConfig config) { + if (config.getFileFormat() == null) { + return CSVFormat.DEFAULT; + } + switch (config.getFileFormat().toUpperCase()) { + case "EXCEL": + return CSVFormat.EXCEL; + case "MYSQL": + return CSVFormat.MYSQL; + case "RFC4180": + return CSVFormat.RFC4180; + case "TDF": + return CSVFormat.TDF; + default: + return CSVFormat.DEFAULT; + } + } - if (config.isSkipUnParseableLines()) { - _useLineIterator = true; - } + private static CSVFormat defaultFormat() { + return CSVFormat.DEFAULT.builder().setDelimiter(CSVRecordReaderConfig.DEFAULT_DELIMITER).setHeader().build(); + } - _isHeaderProvided = config.getHeader() != null; - _skipHeaderRecord = config.isSkipHeader(); - _format = format.builder() - .setHeader() - .setSkipHeaderRecord(config.isSkipHeader()) - .setCommentMarker(config.getCommentMarker()) - .setEscape(config.getEscapeCharacter()) - .setIgnoreEmptyLines(config.isIgnoreEmptyLines()) - .setIgnoreSurroundingSpaces(config.isIgnoreSurroundingSpaces()) - .setQuote(config.getQuoteCharacter()) - .build(); - - if (config.getQuoteMode() != null) { - _format = _format.builder().setQuoteMode(QuoteMode.valueOf(config.getQuoteMode())).build(); - } + private static <T> Optional<T> optional(T value) { + return Optional.ofNullable(value); + } - if (config.getRecordSeparator() != null) { - _format = _format.builder().setRecordSeparator(config.getRecordSeparator()).build(); - } + private static CSVFormat.Builder formatBuilder(CSVRecordReaderConfig config) { + final CSVFormat.Builder builder = baseCsvFormat(config).builder().setDelimiter(config.getDelimiter()).setHeader() + .setSkipHeaderRecord(config.isSkipHeader()).setCommentMarker(config.getCommentMarker()) + .setEscape(config.getEscapeCharacter()).setIgnoreEmptyLines(config.isIgnoreEmptyLines()) + .setIgnoreSurroundingSpaces(config.isIgnoreSurroundingSpaces()).setQuote(config.getQuoteCharacter()); + + optional(config.getQuoteMode()).map(QuoteMode::valueOf).ifPresent(builder::setQuoteMode); + optional(config.getRecordSeparator()).ifPresent(builder::setRecordSeparator); + optional(config.getNullStringValue()).ifPresent(builder::setNullString); + return builder; + } - String nullString = config.getNullStringValue(); - if (nullString != null) { - _format = _format.builder().setNullString(nullString).build(); + private static Map<String, Integer> parseLineAsHeader(CSVFormat format, String line) + throws IOException { + try (StringReader stringReader = new StringReader(line)) { + try (CSVParser parser = format.parse(stringReader)) { + return parser.getHeaderMap(); } + } + } + + private static Character getMultiValueDelimiter(CSVRecordReaderConfig config) { + if (config == null) { + return CSVRecordReaderConfig.DEFAULT_MULTI_VALUE_DELIMITER; + } else if (config.isMultiValueDelimiterEnabled()) { + return config.getMultiValueDelimiter(); + } + return null; + } + private static boolean useLineIterator(CSVRecordReaderConfig config) { + return config != null && config.isSkipUnParseableLines(); + } + + @Override + public void init(File dataFile, @Nullable Set<String> fieldsToRead, @Nullable RecordReaderConfig recordReaderConfig) + throws IOException { + _dataFile = dataFile; + _config = (CSVRecordReaderConfig) recordReaderConfig; + if (_config == null) { + _format = defaultFormat(); + } else { + _isHeaderProvided = _config.getHeader() != null; + final CSVFormat.Builder builder = formatBuilder(_config); if (_isHeaderProvided) { - _headerMap = parseLineAsHeader(config.getHeader()); - _format = _format.builder().setHeader(_headerMap.keySet().toArray(new String[0])).build(); - if (!_useLineIterator) { - validateHeaderForDelimiter(delimiter, config.getHeader(), _format); - } + // use an intermediate format to parse the header line. It still needs to be updated later + _headerMap = parseLineAsHeader(builder.build(), _config.getHeader()); + builder.setHeader(_headerMap.keySet().toArray(new String[0])); } + _format = builder.build(); - if (config.isMultiValueDelimiterEnabled()) { - multiValueDelimiter = config.getMultiValueDelimiter(); + if (_isHeaderProvided) { Review Comment: The isMultiValueDelimiterEnable check is now moved to getMultiValueDelimiter and we are setting this param in newCsvRecordExtractorConfig, right? ########## pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReader.java: ########## @@ -202,158 +209,142 @@ public Map<String, Integer> getCSVHeaderMap() { @Override public boolean hasNext() { - if (_useLineIterator) { - // When line iterator is used, the call to this method won't throw an exception. The default and the only iterator - // from commons-csv library can throw an exception upon calling the hasNext() method. The line iterator overcomes - // this limitation. - return readNextRecord(); - } return _iterator.hasNext(); } @Override public GenericRow next() throws IOException { - if (_useLineIterator) { - return _nextRecord; - } else { - return next(new GenericRow()); - } + return next(new GenericRow()); } @Override public GenericRow next(GenericRow reuse) throws IOException { - if (_useLineIterator) { - reuse.init(_nextRecord); - } else { - CSVRecord record = _iterator.next(); - _recordExtractor.extract(record, reuse); - } + CSVRecord record = _iterator.next(); + _recordExtractor.extract(record, reuse); return reuse; } @Override public void rewind() throws IOException { - if (_useLineIterator) { - resetLineIteratorResources(); - } - if (_parser != null && !_parser.isClosed()) { _parser.close(); } - - init(); + closeIterator(); + initIterator(); } @Override public void close() throws IOException { - if (_useLineIterator) { - resetLineIteratorResources(); - } + closeIterator(); if (_parser != null && !_parser.isClosed()) { _parser.close(); } } - private boolean readNextRecord() { - try { - _nextRecord = null; - GenericRow genericRow = new GenericRow(); - readNextLine(genericRow); - _nextRecord = genericRow; - } catch (Exception e) { - LOGGER.info("Error parsing next record.", e); + private void closeIterator() + throws IOException { + // if header is not provided by the client it would be rebuilt. When it's provided by the client it's initialized + // once in the constructor + if (useLineIterator(_config) && !_isHeaderProvided) { Review Comment: Should this close be part of LineIterator implementation ? ########## pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReader.java: ########## @@ -202,158 +209,142 @@ public Map<String, Integer> getCSVHeaderMap() { @Override public boolean hasNext() { - if (_useLineIterator) { - // When line iterator is used, the call to this method won't throw an exception. The default and the only iterator - // from commons-csv library can throw an exception upon calling the hasNext() method. The line iterator overcomes - // this limitation. - return readNextRecord(); - } return _iterator.hasNext(); } @Override public GenericRow next() throws IOException { - if (_useLineIterator) { - return _nextRecord; - } else { - return next(new GenericRow()); - } + return next(new GenericRow()); } @Override public GenericRow next(GenericRow reuse) throws IOException { - if (_useLineIterator) { - reuse.init(_nextRecord); - } else { - CSVRecord record = _iterator.next(); - _recordExtractor.extract(record, reuse); - } + CSVRecord record = _iterator.next(); + _recordExtractor.extract(record, reuse); return reuse; } @Override public void rewind() throws IOException { - if (_useLineIterator) { - resetLineIteratorResources(); - } - if (_parser != null && !_parser.isClosed()) { _parser.close(); } - - init(); + closeIterator(); + initIterator(); } @Override public void close() throws IOException { - if (_useLineIterator) { - resetLineIteratorResources(); - } + closeIterator(); if (_parser != null && !_parser.isClosed()) { _parser.close(); } } - private boolean readNextRecord() { - try { - _nextRecord = null; - GenericRow genericRow = new GenericRow(); - readNextLine(genericRow); - _nextRecord = genericRow; - } catch (Exception e) { - LOGGER.info("Error parsing next record.", e); + private void closeIterator() + throws IOException { + // if header is not provided by the client it would be rebuilt. When it's provided by the client it's initialized Review Comment: We dont need this logic that I see in resetLineIteratorResources ? _nextLine = null; LOGGER.info("Total lines skipped in file: {} were: {}", _dataFile, _skippedLinesCount); _skippedLinesCount = 0; -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org