This is an automated email from the ASF dual-hosted git repository. xiangfu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new af01aa5 Handle fields missing in the source in ParquetNativeRecordReader (#7742) af01aa5 is described below commit af01aa5778a3097afda48c154dfe6e68e36f63bc Author: Neha Pawar <neha.pawa...@gmail.com> AuthorDate: Wed Nov 10 20:43:34 2021 -0800 Handle fields missing in the source in ParquetNativeRecordReader (#7742) * Fix ParquetNativeRecordExtractor for fields missing in the source * nit * Same bug in proto --- .../parquet/ParquetNativeRecordExtractor.java | 2 +- .../parquet/ParquetNativeRecordReaderTest.java | 66 ++++++++++++++++++++++ .../protobuf/ProtoBufRecordExtractor.java | 2 +- .../spi/data/readers/AbstractRecordReaderTest.java | 4 +- 4 files changed, 71 insertions(+), 3 deletions(-) diff --git a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordExtractor.java b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordExtractor.java index ccc24c2..175782b 100644 --- a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordExtractor.java +++ b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordExtractor.java @@ -114,7 +114,7 @@ public class ParquetNativeRecordExtractor extends BaseRecordExtractor<Group> { } } else { for (String fieldName : _fields) { - Object value = extractValue(from, fromType.getFieldIndex(fieldName)); + Object value = fromType.containsField(fieldName) ? extractValue(from, fromType.getFieldIndex(fieldName)) : null; if (value != null) { value = convert(value); } diff --git a/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordReaderTest.java b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordReaderTest.java new file mode 100644 index 0000000..ffc80a9 --- /dev/null +++ b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordReaderTest.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.plugin.inputformat.parquet; + +import java.io.File; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.pinot.plugin.inputformat.avro.AvroUtils; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.readers.AbstractRecordReaderTest; +import org.apache.pinot.spi.data.readers.RecordReader; + + +public class ParquetNativeRecordReaderTest extends AbstractRecordReaderTest { + private final File _dataFile = new File(_tempDir, "data.parquet"); + + @Override + protected RecordReader createRecordReader() + throws Exception { + ParquetNativeRecordReader recordReader = new ParquetNativeRecordReader(); + recordReader.init(_dataFile, _sourceFields, null); + return recordReader; + } + + @Override + protected void writeRecordsToFile(List<Map<String, Object>> recordsToWrite) + throws Exception { + Schema schema = AvroUtils.getAvroSchemaFromPinotSchema(getPinotSchema()); + List<GenericRecord> records = new ArrayList<>(); + for (Map<String, Object> r : recordsToWrite) { + GenericRecord record = new GenericData.Record(schema); + for (FieldSpec fieldSpec : getPinotSchema().getAllFieldSpecs()) { + record.put(fieldSpec.getName(), r.get(fieldSpec.getName())); + } + records.add(record); + } + try (ParquetWriter<GenericRecord> writer = ParquetUtils + .getParquetAvroWriter(new Path(_dataFile.getAbsolutePath()), schema)) { + for (GenericRecord record : records) { + writer.write(record); + } + } + } +} diff --git a/pinot-plugins/pinot-input-format/pinot-protobuf/src/main/java/org/apache/pinot/plugin/inputformat/protobuf/ProtoBufRecordExtractor.java b/pinot-plugins/pinot-input-format/pinot-protobuf/src/main/java/org/apache/pinot/plugin/inputformat/protobuf/ProtoBufRecordExtractor.java index 17cb445..8685d0c 100644 --- a/pinot-plugins/pinot-input-format/pinot-protobuf/src/main/java/org/apache/pinot/plugin/inputformat/protobuf/ProtoBufRecordExtractor.java +++ b/pinot-plugins/pinot-input-format/pinot-protobuf/src/main/java/org/apache/pinot/plugin/inputformat/protobuf/ProtoBufRecordExtractor.java @@ -67,7 +67,7 @@ public class ProtoBufRecordExtractor extends BaseRecordExtractor<Message> { } else { for (String fieldName : _fields) { Descriptors.FieldDescriptor fieldDescriptor = descriptor.findFieldByName(fieldName); - Object fieldValue = from.getField(fieldDescriptor); + Object fieldValue = fieldDescriptor != null ? from.getField(fieldDescriptor) : null; if (fieldValue != null) { fieldValue = convert(new ProtoBufFieldInfo(fieldValue, descriptor.findFieldByName(fieldName))); } diff --git a/pinot-spi/src/test/java/org/apache/pinot/spi/data/readers/AbstractRecordReaderTest.java b/pinot-spi/src/test/java/org/apache/pinot/spi/data/readers/AbstractRecordReaderTest.java index 7eadbe3..b4821bb 100644 --- a/pinot-spi/src/test/java/org/apache/pinot/spi/data/readers/AbstractRecordReaderTest.java +++ b/pinot-spi/src/test/java/org/apache/pinot/spi/data/readers/AbstractRecordReaderTest.java @@ -150,7 +150,9 @@ public abstract class AbstractRecordReaderTest { } protected Set<String> getSourceFields(Schema schema) { - return Sets.newHashSet(schema.getColumnNames()); + Set<String> sourceFields = Sets.newHashSet(schema.getColumnNames()); + sourceFields.add("column_not_in_source"); + return sourceFields; } @BeforeClass --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org