This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 7548b67592 CSV Realtime Decoder (#8658) 7548b67592 is described below commit 7548b67592833e16bbbb8b8576035fb2256db891 Author: Prashant Pandey <84911643+suddend...@users.noreply.github.com> AuthorDate: Mon May 16 23:53:12 2022 +0530 CSV Realtime Decoder (#8658) --- .../plugin/inputformat/csv/CSVMessageDecoder.java | 146 +++++++++++++++++++ .../inputformat/csv/CSVMessageDecoderTest.java | 157 +++++++++++++++++++++ 2 files changed, 303 insertions(+) diff --git a/pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVMessageDecoder.java b/pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVMessageDecoder.java new file mode 100644 index 0000000000..a535adc1fa --- /dev/null +++ b/pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVMessageDecoder.java @@ -0,0 +1,146 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pinot.plugin.inputformat.csv; + +import com.google.common.collect.ImmutableSet; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Iterator; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import org.apache.commons.csv.CSVFormat; +import org.apache.commons.csv.CSVRecord; +import org.apache.commons.lang.StringUtils; +import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.stream.StreamMessageDecoder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class CSVMessageDecoder implements StreamMessageDecoder<byte[]> { + + private static final Logger LOGGER = LoggerFactory.getLogger(CSVMessageDecoder.class); + + private static final String CONFIG_FILE_FORMAT = "fileFormat"; + private static final String CONFIG_HEADER = "header"; + private static final String CONFIG_DELIMITER = "delimiter"; + private static final String CONFIG_COMMENT_MARKER = "commentMarker"; + private static final String CONFIG_CSV_ESCAPE_CHARACTER = "escapeCharacter"; + private static final String CONFIG_CSV_MULTI_VALUE_DELIMITER = "multiValueDelimiter"; + + private CSVFormat _format; + private CSVRecordExtractor _recordExtractor; + + @Override + public void init(Map<String, String> props, Set<String> fieldsToRead, String topicName) + throws Exception { + String csvFormat = props.get(CONFIG_FILE_FORMAT); + CSVFormat format; + if (csvFormat == null) { + format = CSVFormat.DEFAULT; + } else { + switch (csvFormat.toUpperCase()) { + case "EXCEL": + format = CSVFormat.EXCEL; + break; + case "MYSQL": + format = CSVFormat.MYSQL; + break; + case "RFC4180": + format = CSVFormat.RFC4180; + break; + case "TDF": + format = CSVFormat.TDF; + break; + case "DEFAULT": + format = CSVFormat.DEFAULT; + break; + default: + LOGGER.warn("Could not recognise the configured CSV file format: {}, falling back to DEFAULT format", + csvFormat); + format = CSVFormat.DEFAULT; + break; + } + } + + //delimiter + String csvDelimiter = props.get(CONFIG_DELIMITER); + if (csvDelimiter != null) { + format = format.withDelimiter(csvDelimiter.charAt(0)); + } + + //header + String csvHeader = props.get(CONFIG_HEADER); + if (csvHeader == null) { + //parse the header automatically from the input + format = format.withHeader(); + } else { + format = format.withHeader(StringUtils.split(csvHeader, csvDelimiter)); + } + + //comment marker + String commentMarker = props.get(CONFIG_COMMENT_MARKER); + if (commentMarker != null) { + format = format.withCommentMarker(commentMarker.charAt(0)); + } + + //escape char + String escapeChar = props.get(CONFIG_CSV_ESCAPE_CHARACTER); + if (escapeChar != null) { + format = format.withEscape(props.get(CONFIG_CSV_ESCAPE_CHARACTER).charAt(0)); + } + + _format = format; + + _recordExtractor = new CSVRecordExtractor(); + + CSVRecordExtractorConfig recordExtractorConfig = new CSVRecordExtractorConfig(); + + //multi-value delimiter + String multiValueDelimiter = props.get(CONFIG_CSV_MULTI_VALUE_DELIMITER); + if (multiValueDelimiter != null) { + recordExtractorConfig.setMultiValueDelimiter(multiValueDelimiter.charAt(0)); + } + + recordExtractorConfig.setColumnNames(ImmutableSet.copyOf( + Objects.requireNonNull(_format.getHeader()))); + _recordExtractor.init(fieldsToRead, recordExtractorConfig); + } + + @Override + public GenericRow decode(byte[] payload, GenericRow destination) { + try { + Iterator<CSVRecord> iterator = + _format.parse(new InputStreamReader(new ByteArrayInputStream(payload), StandardCharsets.UTF_8)).iterator(); + return _recordExtractor.extract(iterator.next(), destination); + } catch (IOException e) { + throw new RuntimeException("Error decoding CSV record from payload", e); + } + } + + @Override + public GenericRow decode(byte[] payload, int offset, int length, GenericRow destination) { + return decode(Arrays.copyOfRange(payload, offset, offset + length), destination); + } +} diff --git a/pinot-plugins/pinot-input-format/pinot-csv/src/test/java/org/apache/pinot/plugin/inputformat/csv/CSVMessageDecoderTest.java b/pinot-plugins/pinot-input-format/pinot-csv/src/test/java/org/apache/pinot/plugin/inputformat/csv/CSVMessageDecoderTest.java new file mode 100644 index 0000000000..ae407ee914 --- /dev/null +++ b/pinot-plugins/pinot-input-format/pinot-csv/src/test/java/org/apache/pinot/plugin/inputformat/csv/CSVMessageDecoderTest.java @@ -0,0 +1,157 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pinot.plugin.inputformat.csv; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; +import org.apache.pinot.spi.data.readers.GenericRow; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; + + +public class CSVMessageDecoderTest { + + @Test + public void testHappyCase() + throws Exception { + Map<String, String> decoderProps = getStandardDecoderProps(); + CSVMessageDecoder messageDecoder = new CSVMessageDecoder(); + messageDecoder.init(decoderProps, ImmutableSet.of("name", "age", "gender"), ""); + String incomingRecord = "Alice;18;F"; + GenericRow destination = new GenericRow(); + messageDecoder.decode(incomingRecord.getBytes(StandardCharsets.UTF_8), destination); + assertNotNull(destination.getValue("name")); + assertNotNull(destination.getValue("age")); + assertNotNull(destination.getValue("gender")); + + assertEquals(destination.getValue("name"), "Alice"); + assertEquals(destination.getValue("age"), "18"); + assertEquals(destination.getValue("gender"), "F"); + } + + @Test + public void testMultivalue() + throws Exception { + Map<String, String> decoderProps = getStandardDecoderProps(); + decoderProps.put("header", "name;age;gender;subjects"); + CSVMessageDecoder messageDecoder = new CSVMessageDecoder(); + messageDecoder.init(decoderProps, ImmutableSet.of("name", "age", "gender", "subjects"), ""); + String incomingRecord = "Alice;18;F;maths,German,history"; + GenericRow destination = new GenericRow(); + messageDecoder.decode(incomingRecord.getBytes(StandardCharsets.UTF_8), destination); + assertNotNull(destination.getValue("name")); + assertNotNull(destination.getValue("age")); + assertNotNull(destination.getValue("gender")); + assertNotNull(destination.getValue("subjects")); + + assertEquals(destination.getValue("name"), "Alice"); + assertEquals(destination.getValue("age"), "18"); + assertEquals(destination.getValue("gender"), "F"); + assertEquals(destination.getValue("subjects"), new String[]{"maths", "German", "history"}); + } + + @Test(expectedExceptions = java.util.NoSuchElementException.class) + public void testCommentMarker() + throws Exception { + Map<String, String> decoderProps = getStandardDecoderProps(); + decoderProps.put("header", "name,age,gender"); + decoderProps.put("delimiter", ","); + decoderProps.put("commentMarker", "#"); + CSVMessageDecoder messageDecoder = new CSVMessageDecoder(); + messageDecoder.init(decoderProps, ImmutableSet.of("name", "age", "gender"), ""); + String incomingRecord = "#Alice,18,F"; + GenericRow destination = new GenericRow(); + messageDecoder.decode(incomingRecord.getBytes(StandardCharsets.UTF_8), destination); + } + + @Test + public void testHeaderFromRecord() + throws Exception { + Map<String, String> decoderProps = getStandardDecoderProps(); + decoderProps.remove("header"); + decoderProps.put("delimiter", ","); + CSVMessageDecoder messageDecoder = new CSVMessageDecoder(); + messageDecoder.init(decoderProps, ImmutableSet.of("name", "age", "gender"), ""); + String incomingRecord = "name,age,gender\nAlice,18,F"; + GenericRow destination = new GenericRow(); + messageDecoder.decode(incomingRecord.getBytes(StandardCharsets.UTF_8), destination); + + assertNotNull(destination.getValue("name")); + assertNotNull(destination.getValue("age")); + assertNotNull(destination.getValue("gender")); + + assertEquals(destination.getValue("name"), "Alice"); + assertEquals(destination.getValue("age"), "18"); + assertEquals(destination.getValue("gender"), "F"); + } + + @Test + public void testEscapeCharacter() + throws Exception { + Map<String, String> decoderProps = getStandardDecoderProps(); + decoderProps.put("header", "name;age;gender;subjects"); + decoderProps.put("delimiter", ";"); + CSVMessageDecoder messageDecoder = new CSVMessageDecoder(); + messageDecoder.init(decoderProps, ImmutableSet.of("name", "age", "gender", "subjects"), ""); + String incomingRecord = "Alice;18;F;mat\\;hs"; + GenericRow destination = new GenericRow(); + messageDecoder.decode(incomingRecord.getBytes(StandardCharsets.UTF_8), destination); + assertNotNull(destination.getValue("name")); + assertNotNull(destination.getValue("age")); + assertNotNull(destination.getValue("gender")); + assertNotNull(destination.getValue("subjects")); + + assertEquals(destination.getValue("name"), "Alice"); + assertEquals(destination.getValue("age"), "18"); + assertEquals(destination.getValue("gender"), "F"); + assertEquals(destination.getValue("subjects"), "mat;hs"); + } + + @Test + public void testDefaultProps() + throws Exception { + Map<String, String> decoderProps = ImmutableMap.of(); + CSVMessageDecoder messageDecoder = new CSVMessageDecoder(); + messageDecoder.init(decoderProps, ImmutableSet.of("name", "age", "gender", "subjects"), ""); + String incomingRecord = "name,age,gender,subjects\nAlice,18,F,maths"; + GenericRow destination = new GenericRow(); + messageDecoder.decode(incomingRecord.getBytes(StandardCharsets.UTF_8), destination); + + assertEquals(destination.getValue("name"), "Alice"); + assertEquals(destination.getValue("age"), "18"); + assertEquals(destination.getValue("gender"), "F"); + assertEquals(destination.getValue("subjects"), "maths"); + } + + private static Map<String, String> getStandardDecoderProps() { + //setup + Map<String, String> props = new HashMap<>(); + props.put("header", "name;age;gender"); + props.put("delimiter", ";"); + props.put("multiValueDelimiter", ","); + props.put("escapeCharacter", "\\"); + return props; + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org