This is an automated email from the ASF dual-hosted git repository. snlee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 3267a749f8 int96 parity with native parquet reader (#12496) 3267a749f8 is described below commit 3267a749f828e8941c753b13a514b1ae41c76ca2 Author: swaminathanmanish <126024920+swaminathanman...@users.noreply.github.com> AuthorDate: Mon Feb 26 18:36:26 2024 -0800 int96 parity with native parquet reader (#12496) --- .../inputformat/avro/AvroRecordExtractor.java | 10 ++-- .../parquet/ParquetAvroRecordExtractor.java | 62 ++++++++++++++++++++++ .../parquet/ParquetAvroRecordReader.java | 6 +-- .../parquet/ParquetNativeRecordExtractor.java | 10 ++-- .../parquet/ParquetRecordReaderTest.java | 44 ++++++--------- 5 files changed, 95 insertions(+), 37 deletions(-) diff --git a/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractor.java b/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractor.java index c84b15e47c..d1a4dea0dc 100644 --- a/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractor.java +++ b/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractor.java @@ -68,7 +68,7 @@ public class AvroRecordExtractor extends BaseRecordExtractor<GenericRecord> { value = AvroSchemaUtil.applyLogicalType(field, value); } if (value != null) { - value = convert(value); + value = transformValue(value, field); } to.putValue(fieldName, value); } @@ -80,7 +80,7 @@ public class AvroRecordExtractor extends BaseRecordExtractor<GenericRecord> { value = AvroSchemaUtil.applyLogicalType(field, value); } if (value != null) { - value = convert(value); + value = transformValue(value, field); } to.putValue(fieldName, value); } @@ -88,6 +88,10 @@ public class AvroRecordExtractor extends BaseRecordExtractor<GenericRecord> { return to; } + protected Object transformValue(Object value, Schema.Field field) { + return convert(value); + } + /** * Returns whether the object is an Avro GenericRecord. */ @@ -116,7 +120,7 @@ public class AvroRecordExtractor extends BaseRecordExtractor<GenericRecord> { String fieldName = field.name(); Object fieldValue = record.get(fieldName); if (fieldValue != null) { - fieldValue = convert(fieldValue); + fieldValue = transformValue(fieldValue, field); } convertedMap.put(fieldName, fieldValue); } diff --git a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetAvroRecordExtractor.java b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetAvroRecordExtractor.java new file mode 100644 index 0000000000..8f8ba25a1c --- /dev/null +++ b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetAvroRecordExtractor.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.plugin.inputformat.parquet; + +import java.util.Set; +import javax.annotation.Nullable; +import org.apache.avro.Schema; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.pinot.plugin.inputformat.avro.AvroRecordExtractor; +import org.apache.pinot.spi.data.readers.RecordExtractorConfig; + + +public class ParquetAvroRecordExtractor extends AvroRecordExtractor { + + @Override + public void init(@Nullable Set<String> fields, @Nullable RecordExtractorConfig recordExtractorConfig) { + super.init(fields, recordExtractorConfig); + } + + @Override + protected Object transformValue(Object value, Schema.Field field) { + return handleDeprecatedTypes(convert(value), field); + } + + Object handleDeprecatedTypes(Object value, Schema.Field field) { + Schema.Type avroColumnType = field.schema().getType(); + if (avroColumnType == org.apache.avro.Schema.Type.UNION) { + org.apache.avro.Schema nonNullSchema = null; + for (org.apache.avro.Schema childFieldSchema : field.schema().getTypes()) { + if (childFieldSchema.getType() != org.apache.avro.Schema.Type.NULL) { + if (nonNullSchema == null) { + nonNullSchema = childFieldSchema; + } else { + throw new IllegalStateException("More than one non-null schema in UNION schema"); + } + } + } + + //INT96 is deprecated. We convert to long as we do in the native parquet extractor. + if (nonNullSchema.getName().equals(PrimitiveType.PrimitiveTypeName.INT96.name())) { + return ParquetNativeRecordExtractor.convertInt96ToLong((byte[]) value); + } + } + return value; + } +} diff --git a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetAvroRecordReader.java b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetAvroRecordReader.java index 8caf384630..e1db085b8e 100644 --- a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetAvroRecordReader.java +++ b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetAvroRecordReader.java @@ -25,13 +25,11 @@ import javax.annotation.Nullable; import org.apache.avro.generic.GenericRecord; import org.apache.hadoop.fs.Path; import org.apache.parquet.hadoop.ParquetReader; -import org.apache.pinot.plugin.inputformat.avro.AvroRecordExtractor; import org.apache.pinot.spi.data.readers.GenericRow; import org.apache.pinot.spi.data.readers.RecordReader; import org.apache.pinot.spi.data.readers.RecordReaderConfig; import org.apache.pinot.spi.data.readers.RecordReaderUtils; - /** * Avro Record reader for Parquet file. This reader doesn't read parquet file with incompatible Avro schemas, * e.g. INT96, DECIMAL. Please use {@link org.apache.pinot.plugin.inputformat.parquet.ParquetNativeRecordReader} @@ -44,7 +42,7 @@ public class ParquetAvroRecordReader implements RecordReader { private static final String EXTENSION = "parquet"; private Path _dataFilePath; - private AvroRecordExtractor _recordExtractor; + private ParquetAvroRecordExtractor _recordExtractor; private ParquetReader<GenericRecord> _parquetReader; private GenericRecord _nextRecord; @@ -54,7 +52,7 @@ public class ParquetAvroRecordReader implements RecordReader { File parquetFile = RecordReaderUtils.unpackIfRequired(dataFile, EXTENSION); _dataFilePath = new Path(parquetFile.getAbsolutePath()); _parquetReader = ParquetUtils.getParquetAvroReader(_dataFilePath); - _recordExtractor = new AvroRecordExtractor(); + _recordExtractor = new ParquetAvroRecordExtractor(); _recordExtractor.init(fieldsToRead, null); _nextRecord = _parquetReader.read(); } diff --git a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordExtractor.java b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordExtractor.java index d7a1dd0e22..8a67d63925 100644 --- a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordExtractor.java +++ b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordExtractor.java @@ -170,9 +170,7 @@ public class ParquetNativeRecordExtractor extends BaseRecordExtractor<Group> { return from.getValueToString(fieldIndex, index); case INT96: Binary int96 = from.getInt96(fieldIndex, index); - ByteBuffer buf = ByteBuffer.wrap(int96.getBytes()).order(ByteOrder.LITTLE_ENDIAN); - return (buf.getInt(8) - JULIAN_DAY_NUMBER_FOR_UNIX_EPOCH) * DateTimeConstants.MILLIS_PER_DAY - + buf.getLong(0) / NANOS_PER_MILLISECOND; + return convertInt96ToLong(int96.getBytes()); case BINARY: case FIXED_LEN_BYTE_ARRAY: if (logicalTypeAnnotation instanceof LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) { @@ -204,6 +202,12 @@ public class ParquetNativeRecordExtractor extends BaseRecordExtractor<Group> { return null; } + public static long convertInt96ToLong(byte[] int96Bytes) { + ByteBuffer buf = ByteBuffer.wrap(int96Bytes).order(ByteOrder.LITTLE_ENDIAN); + return (buf.getInt(8) - JULIAN_DAY_NUMBER_FOR_UNIX_EPOCH) * DateTimeConstants.MILLIS_PER_DAY + + buf.getLong(0) / NANOS_PER_MILLISECOND; + } + public Object[] extractList(Group group) { int repFieldCount = group.getType().getFieldCount(); if (repFieldCount < 1) { diff --git a/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReaderTest.java b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReaderTest.java index a6fbb54247..f286042cd4 100644 --- a/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReaderTest.java +++ b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReaderTest.java @@ -125,28 +125,20 @@ public class ParquetRecordReaderTest extends AbstractRecordReaderTest { @Test public void testComparison() throws IOException { - testComparison(_dataFile, SAMPLE_RECORDS_SIZE, false); - testComparison(new File(getClass().getClassLoader().getResource("users.parquet").getFile()), 1, false); - testComparison(new File(getClass().getClassLoader().getResource("test-comparison.gz.parquet").getFile()), 363667, - false); - testComparison(new File(getClass().getClassLoader().getResource("test-comparison.snappy.parquet").getFile()), 2870, - false); - testComparison(new File(getClass().getClassLoader().getResource("baseballStats.snappy.parquet").getFile()), 97889, - false); - testComparison(new File(getClass().getClassLoader().getResource("baseballStats.zstd.parquet").getFile()), 97889, - false); - testComparison(new File(getClass().getClassLoader().getResource("githubEvents.snappy.parquet").getFile()), 10000, - false); - testComparison(new File(getClass().getClassLoader().getResource("starbucksStores.snappy.parquet").getFile()), 6443, - false); - testComparison(new File(getClass().getClassLoader().getResource("airlineStats.snappy.parquet").getFile()), 19492, - false); - testComparison(new File(getClass().getClassLoader().getResource("githubActivities.gz.parquet").getFile()), 2000, - false); - testComparison(new File(getClass().getClassLoader().getResource("int96AvroParquet.parquet").getFile()), 1, true); + testComparison(_dataFile, SAMPLE_RECORDS_SIZE); + testComparison(new File(getClass().getClassLoader().getResource("users.parquet").getFile()), 1); + testComparison(new File(getClass().getClassLoader().getResource("test-comparison.gz.parquet").getFile()), 363667); + testComparison(new File(getClass().getClassLoader().getResource("test-comparison.snappy.parquet").getFile()), 2870); + testComparison(new File(getClass().getClassLoader().getResource("baseballStats.snappy.parquet").getFile()), 97889); + testComparison(new File(getClass().getClassLoader().getResource("baseballStats.zstd.parquet").getFile()), 97889); + testComparison(new File(getClass().getClassLoader().getResource("githubEvents.snappy.parquet").getFile()), 10000); + testComparison(new File(getClass().getClassLoader().getResource("starbucksStores.snappy.parquet").getFile()), 6443); + testComparison(new File(getClass().getClassLoader().getResource("airlineStats.snappy.parquet").getFile()), 19492); + testComparison(new File(getClass().getClassLoader().getResource("githubActivities.gz.parquet").getFile()), 2000); + testComparison(new File(getClass().getClassLoader().getResource("int96AvroParquet.parquet").getFile()), 1); } - private void testComparison(File dataFile, int totalRecords, boolean skipIndividualRecordComparison) + private void testComparison(File dataFile, int totalRecords) throws IOException { final ParquetRecordReader avroRecordReader = new ParquetRecordReader(); ParquetRecordReaderConfig avroRecordReaderConfig = new ParquetRecordReaderConfig(); @@ -159,14 +151,14 @@ public class ParquetRecordReaderTest extends AbstractRecordReaderTest { Assert.assertTrue(avroRecordReader.useAvroParquetRecordReader()); Assert.assertFalse(nativeRecordReader.useAvroParquetRecordReader()); - testComparison(avroRecordReader, nativeRecordReader, totalRecords, skipIndividualRecordComparison); + testComparison(avroRecordReader, nativeRecordReader, totalRecords); avroRecordReader.rewind(); nativeRecordReader.rewind(); - testComparison(avroRecordReader, nativeRecordReader, totalRecords, skipIndividualRecordComparison); + testComparison(avroRecordReader, nativeRecordReader, totalRecords); } private void testComparison(ParquetRecordReader avroRecordReader, ParquetRecordReader nativeRecordReader, - int totalRecords, boolean skipIndividualRecordComparison) + int totalRecords) throws IOException { GenericRow avroReuse = new GenericRow(); GenericRow nativeReuse = new GenericRow(); @@ -175,10 +167,8 @@ public class ParquetRecordReaderTest extends AbstractRecordReaderTest { Assert.assertTrue(nativeRecordReader.hasNext()); final GenericRow avroReaderRow = avroRecordReader.next(avroReuse); final GenericRow nativeReaderRow = nativeRecordReader.next(nativeReuse); - if (!skipIndividualRecordComparison) { - Assert.assertEquals(nativeReaderRow.toString(), avroReaderRow.toString()); - Assert.assertTrue(avroReaderRow.equals(nativeReaderRow)); - } + Assert.assertEquals(nativeReaderRow.toString(), avroReaderRow.toString()); + Assert.assertTrue(avroReaderRow.equals(nativeReaderRow)); recordsRead++; } Assert.assertFalse(nativeRecordReader.hasNext()); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org