This is an automated email from the ASF dual-hosted git repository. xiangfu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push: new d6fd42d Adding native parquet record reader support (#6525) d6fd42d is described below commit d6fd42d7883ceed4f6e207dfae7b7b809d2c2c5c Author: Xiang Fu <fx19880...@gmail.com> AuthorDate: Thu Feb 18 00:02:34 2021 -0800 Adding native parquet record reader support (#6525) * Adding native parquet record reader support * Adding more test data * Address comments --- ...ordReader.java => ParquetAvroRecordReader.java} | 13 +- .../parquet/ParquetNativeRecordExtractor.java | 263 +++++++++++++++++++++ .../parquet/ParquetNativeRecordReader.java | 129 ++++++++++ .../inputformat/parquet/ParquetRecordReader.java | 41 ++-- .../parquet/ParquetRecordReaderConfig.java | 52 ++++ .../plugin/inputformat/parquet/ParquetUtils.java | 24 +- .../parquet/ParquetRecordReaderTest.java | 77 +++++- .../src/test/resources/airlineStats.snappy.parquet | Bin 0 -> 1095802 bytes .../test/resources/baseballStats.snappy.parquet | Bin 0 -> 1993064 bytes .../src/test/resources/githubActivities.gz.parquet | Bin 0 -> 1610474 bytes .../src/test/resources/githubEvents.snappy.parquet | Bin 0 -> 4537684 bytes .../test/resources/starbucksStores.snappy.parquet | Bin 0 -> 451742 bytes .../src/test/resources/test-comparison.gz.parquet | Bin 0 -> 10617970 bytes .../test/resources/test-comparison.snappy.parquet | Bin 0 -> 18350 bytes ...test-file-with-int96-and-decimal.snappy.parquet | Bin 0 -> 19659 bytes .../pinot-parquet/src/test/resources/users.parquet | Bin 0 -> 4065 bytes .../apache/pinot/spi/data/readers/GenericRow.java | 57 ++++- .../spi/data/readers/AbstractRecordReaderTest.java | 5 +- 18 files changed, 622 insertions(+), 39 deletions(-) diff --git a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReader.java b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetAvroRecordReader.java similarity index 78% copy from pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReader.java copy to pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetAvroRecordReader.java index 10ce618..9c494f0 100644 --- a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReader.java +++ b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetAvroRecordReader.java @@ -32,9 +32,14 @@ import org.apache.pinot.spi.data.readers.RecordReaderConfig; /** - * Record reader for Parquet file. + * Avro Record reader for Parquet file. This reader doesn't read parquet file with incompatible Avro schemas, + * e.g. INT96, DECIMAL. Please use {@link org.apache.pinot.plugin.inputformat.parquet.ParquetNativeRecordReader} + * instead.<p><p> + * For More info on Avro to Parquet schema conversion: + * <a href="https://javadoc.io/doc/org.apache.parquet/parquet-avro/latest/index.html"> + * https://javadoc.io/doc/org.apache.parquet/parquet-avro/latest/index.html</a> */ -public class ParquetRecordReader implements RecordReader { +public class ParquetAvroRecordReader implements RecordReader { private Path _dataFilePath; private AvroRecordExtractor _recordExtractor; private ParquetReader<GenericRecord> _parquetReader; @@ -44,7 +49,7 @@ public class ParquetRecordReader implements RecordReader { public void init(File dataFile, @Nullable Set<String> fieldsToRead, @Nullable RecordReaderConfig recordReaderConfig) throws IOException { _dataFilePath = new Path(dataFile.getAbsolutePath()); - _parquetReader = ParquetUtils.getParquetReader(_dataFilePath); + _parquetReader = ParquetUtils.getParquetAvroReader(_dataFilePath); _recordExtractor = new AvroRecordExtractor(); _recordExtractor.init(fieldsToRead, null); _nextRecord = _parquetReader.read(); @@ -73,7 +78,7 @@ public class ParquetRecordReader implements RecordReader { public void rewind() throws IOException { _parquetReader.close(); - _parquetReader = ParquetUtils.getParquetReader(_dataFilePath); + _parquetReader = ParquetUtils.getParquetAvroReader(_dataFilePath); _nextRecord = _parquetReader.read(); } diff --git a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordExtractor.java b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordExtractor.java new file mode 100644 index 0000000..46b989e --- /dev/null +++ b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordExtractor.java @@ -0,0 +1,263 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.plugin.inputformat.parquet; + +import com.google.common.collect.ImmutableSet; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import javax.annotation.Nullable; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.io.api.Binary; +import org.apache.parquet.schema.DecimalMetadata; +import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.OriginalType; +import org.apache.parquet.schema.Type; +import org.apache.pinot.spi.data.readers.BaseRecordExtractor; +import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.data.readers.RecordExtractorConfig; +import org.joda.time.DateTimeConstants; + +import static java.lang.Math.pow; + + +/** + * ParquetNativeRecordExtractor extract values from Parquet {@link Group}. + */ +public class ParquetNativeRecordExtractor extends BaseRecordExtractor<Group> { + + /** + * Number of days between Julian day epoch (January 1, 4713 BC) and Unix day epoch (January 1, 1970). + * The value of this constant is {@value}. + */ + public static final long JULIAN_DAY_NUMBER_FOR_UNIX_EPOCH = 2440588; + + public static final long NANOS_PER_MILLISECOND = 1000000; + + private Set<String> _fields; + private boolean _extractAll = false; + + public static BigDecimal binaryToDecimal(Binary value, int precision, int scale) { + /* + * Precision <= 18 checks for the max number of digits for an unscaled long, + * else treat with big integer conversion + */ + if (precision <= 18) { + ByteBuffer buffer = value.toByteBuffer(); + byte[] bytes = buffer.array(); + int start = buffer.arrayOffset() + buffer.position(); + int end = buffer.arrayOffset() + buffer.limit(); + long unscaled = 0L; + int i = start; + while (i < end) { + unscaled = (unscaled << 8 | bytes[i] & 0xff); + i++; + } + int bits = 8 * (end - start); + long unscaledNew = (unscaled << (64 - bits)) >> (64 - bits); + if (unscaledNew <= -pow(10, 18) || unscaledNew >= pow(10, 18)) { + return new BigDecimal(unscaledNew); + } else { + return BigDecimal.valueOf(unscaledNew / pow(10, scale)); + } + } else { + return new BigDecimal(new BigInteger(value.getBytes()), scale); + } + } + + @Override + public void init(@Nullable Set<String> fields, RecordExtractorConfig recordExtractorConfig) { + if (fields == null || fields.isEmpty()) { + _extractAll = true; + _fields = Collections.emptySet(); + } else { + _fields = ImmutableSet.copyOf(fields); + } + } + + @Override + public GenericRow extract(Group from, GenericRow to) { + GroupType fromType = from.getType(); + if (_extractAll) { + List<Type> fields = fromType.getFields(); + for (Type field : fields) { + String fieldName = field.getName(); + Object value = extractValue(from, fromType.getFieldIndex(fieldName)); + if (value != null) { + value = convert(value); + } + to.putValue(fieldName, value); + } + } else { + for (String fieldName : _fields) { + Object value = extractValue(from, fromType.getFieldIndex(fieldName)); + if (value != null) { + value = convert(value); + } + to.putValue(fieldName, value); + } + } + return to; + } + + private Object extractValue(Group from, int fieldIndex) { + int valueCount = from.getFieldRepetitionCount(fieldIndex); + Type fieldType = from.getType().getType(fieldIndex); + if (valueCount == 0) { + return null; + } + if (valueCount == 1) { + return extractValue(from, fieldIndex, fieldType, 0); + } + // For multi-value (repeated field) + Object[] results = new Object[valueCount]; + for (int index = 0; index < valueCount; index++) { + results[index] = extractValue(from, fieldIndex, fieldType, index); + } + return results; + } + + private Object extractValue(Group from, int fieldIndex, Type fieldType, int index) { + OriginalType originalType = fieldType.getOriginalType(); + if (fieldType.isPrimitive()) { + switch (fieldType.asPrimitiveType().getPrimitiveTypeName()) { + case INT32: + return from.getInteger(fieldIndex, index); + case INT64: + return from.getLong(fieldIndex, index); + case FLOAT: + return from.getFloat(fieldIndex, index); + case DOUBLE: + return from.getDouble(fieldIndex, index); + case BOOLEAN: + return from.getValueToString(fieldIndex, index); + case BINARY: + case FIXED_LEN_BYTE_ARRAY: + if (originalType == OriginalType.UTF8) { + return from.getValueToString(fieldIndex, index); + } + if (originalType == OriginalType.DECIMAL) { + DecimalMetadata decimalMetadata = fieldType.asPrimitiveType().getDecimalMetadata(); + return binaryToDecimal(from.getBinary(fieldIndex, index), decimalMetadata.getPrecision(), + decimalMetadata.getScale()); + } + return from.getBinary(fieldIndex, index).getBytes(); + case INT96: + Binary int96 = from.getInt96(fieldIndex, index); + ByteBuffer buf = ByteBuffer.wrap(int96.getBytes()).order(ByteOrder.LITTLE_ENDIAN); + long dateTime = (buf.getInt(8) - JULIAN_DAY_NUMBER_FOR_UNIX_EPOCH) * DateTimeConstants.MILLIS_PER_DAY + + buf.getLong(0) / NANOS_PER_MILLISECOND; + return dateTime; + } + } else if ((fieldType.isRepetition(Type.Repetition.OPTIONAL)) || (fieldType.isRepetition(Type.Repetition.REQUIRED)) + || (fieldType.isRepetition(Type.Repetition.REPEATED))) { + Group group = from.getGroup(fieldIndex, index); + if (originalType == OriginalType.LIST) { + return extractList(group); + } + return extractMap(group); + } + return null; + } + + public Object[] extractList(Group group) { + int repFieldCount = group.getType().getFieldCount(); + if (repFieldCount < 1) { + return null; + } + Object[] list = new Object[repFieldCount]; + for (int repFieldIdx = 0; repFieldIdx < repFieldCount; repFieldIdx++) { + list[repFieldIdx] = extractValue(group, repFieldIdx); + } + if (repFieldCount == 1 && list[0] == null) { + return null; + } + if (repFieldCount == 1 && list[0].getClass().isArray()) { + return (Object[]) list[0]; + } + return list; + } + + public Map<String, Object> extractMap(Group group) { + final int repFieldCount = group.getType().getFieldCount(); + if (repFieldCount < 1) { + return null; + } + Map<String, Object> resultMap = new HashMap<>(); + for (int repFieldIdx = 0; repFieldIdx < repFieldCount; repFieldIdx++) { + Object value = extractValue(group, repFieldIdx); + resultMap.put(group.getType().getType(repFieldIdx).getName(), value); + } + return resultMap; + } + + @Override + public Object convertMap(Object value) { + Map<Object, Object> map = (Map) value; + if (map.isEmpty()) { + return null; + } + Map<Object, Object> convertedMap = new HashMap<>(); + for (Map.Entry<Object, Object> entry : map.entrySet()) { + Object mapKey = entry.getKey(); + Object mapValue = entry.getValue(); + if (mapKey != null) { + Object convertedMapValue = null; + if (mapValue != null) { + convertedMapValue = convert(mapValue); + } + convertedMap.put(convertSingleValue(entry.getKey()), convertedMapValue); + } + } + if (convertedMap.isEmpty()) { + return null; + } + return convertedMap; + } + + @Override + public boolean isMultiValue(Object value) { + if (super.isMultiValue(value)) { + return true; + } + if (value instanceof byte[]) { + return false; + } + return value.getClass().isArray(); + } + + @Nullable + @Override + protected Object convertMultiValue(Object value) { + if (value instanceof Collection) { + return super.convertMultiValue(value); + } + // value is Object[] + Object[] values = (Object[]) value; + return super.convertMultiValue(Arrays.asList(values)); + } +} diff --git a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordReader.java b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordReader.java new file mode 100644 index 0000000..3c55137 --- /dev/null +++ b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordReader.java @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.plugin.inputformat.parquet; + +import java.io.File; +import java.io.IOException; +import java.util.Set; +import javax.annotation.Nullable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.column.page.PageReadStore; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.example.data.simple.convert.GroupRecordConverter; +import org.apache.parquet.format.converter.ParquetMetadataConverter; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.io.ColumnIOFactory; +import org.apache.parquet.io.MessageColumnIO; +import org.apache.parquet.schema.MessageType; +import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.data.readers.RecordReader; +import org.apache.pinot.spi.data.readers.RecordReaderConfig; + + +/** + * Record reader for Native Parquet file. + */ +public class ParquetNativeRecordReader implements RecordReader { + private Path _dataFilePath; + private ParquetNativeRecordExtractor _recordExtractor; + private MessageType _schema; + private ParquetMetadata _parquetMetadata; + private ParquetFileReader _parquetFileReader; + private Group _nextRecord; + private PageReadStore _pageReadStore; + private MessageColumnIO _columnIO; + private org.apache.parquet.io.RecordReader _parquetRecordReader; + private int _currentPageIdx; + + @Override + public void init(File dataFile, @Nullable Set<String> fieldsToRead, @Nullable RecordReaderConfig recordReaderConfig) + throws IOException { + _dataFilePath = new Path(dataFile.getAbsolutePath()); + Configuration conf = new Configuration(); + _parquetMetadata = ParquetFileReader.readFooter(conf, _dataFilePath, ParquetMetadataConverter.NO_FILTER); + _recordExtractor = new ParquetNativeRecordExtractor(); + _recordExtractor.init(fieldsToRead, null); + _schema = _parquetMetadata.getFileMetaData().getSchema(); + _parquetFileReader = + new ParquetFileReader(conf, _parquetMetadata.getFileMetaData(), _dataFilePath, _parquetMetadata.getBlocks(), + _schema.getColumns()); + _pageReadStore = _parquetFileReader.readNextRowGroup(); + _columnIO = new ColumnIOFactory().getColumnIO(_schema); + _parquetRecordReader = _columnIO.getRecordReader(_pageReadStore, new GroupRecordConverter(_schema)); + _currentPageIdx = 0; + } + + @Override + public boolean hasNext() { + if (_pageReadStore == null) { + return false; + } + if (_pageReadStore.getRowCount() - _currentPageIdx >= 1) { + // System.out.println("_pageReadStore.getRowCount() = " + _pageReadStore.getRowCount() + ", _currentPageIdx = " + _currentPageIdx); + return true; + } + try { + _pageReadStore = _parquetFileReader.readNextRowGroup(); + _currentPageIdx = 0; + if (_pageReadStore == null) { + return false; + } + _parquetRecordReader = _columnIO.getRecordReader(_pageReadStore, new GroupRecordConverter(_schema)); + } catch (IOException e) { + throw new RuntimeException(e); + } + return hasNext(); + } + + @Override + public GenericRow next() + throws IOException { + return next(new GenericRow()); + } + + @Override + public GenericRow next(GenericRow reuse) + throws IOException { + _nextRecord = (Group) _parquetRecordReader.read(); + _recordExtractor.extract(_nextRecord, reuse); + _currentPageIdx++; + return reuse; + } + + @Override + public void rewind() + throws IOException { + _parquetFileReader.close(); + Configuration conf = new Configuration(); + _parquetFileReader = + new ParquetFileReader(conf, _parquetMetadata.getFileMetaData(), _dataFilePath, _parquetMetadata.getBlocks(), + _schema.getColumns()); + _pageReadStore = _parquetFileReader.readNextRowGroup(); + _parquetRecordReader = _columnIO.getRecordReader(_pageReadStore, new GroupRecordConverter(_schema)); + _currentPageIdx = 0; + } + + @Override + public void close() + throws IOException { + _parquetFileReader.close(); + } +} diff --git a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReader.java b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReader.java index 10ce618..790e97f 100644 --- a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReader.java +++ b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReader.java @@ -22,37 +22,34 @@ import java.io.File; import java.io.IOException; import java.util.Set; import javax.annotation.Nullable; -import org.apache.avro.generic.GenericRecord; -import org.apache.hadoop.fs.Path; -import org.apache.parquet.hadoop.ParquetReader; -import org.apache.pinot.plugin.inputformat.avro.AvroRecordExtractor; import org.apache.pinot.spi.data.readers.GenericRow; import org.apache.pinot.spi.data.readers.RecordReader; import org.apache.pinot.spi.data.readers.RecordReaderConfig; /** - * Record reader for Parquet file. + * Pinot Record reader for Parquet file.<p> + * It has two implementations: {@link ParquetAvroRecordReader} (Default) and {@link ParquetNativeRecordReader}. */ public class ParquetRecordReader implements RecordReader { - private Path _dataFilePath; - private AvroRecordExtractor _recordExtractor; - private ParquetReader<GenericRecord> _parquetReader; - private GenericRecord _nextRecord; + private RecordReader _internalParquetRecordReader; + private boolean _useAvroParquetRecordReader = true; @Override public void init(File dataFile, @Nullable Set<String> fieldsToRead, @Nullable RecordReaderConfig recordReaderConfig) throws IOException { - _dataFilePath = new Path(dataFile.getAbsolutePath()); - _parquetReader = ParquetUtils.getParquetReader(_dataFilePath); - _recordExtractor = new AvroRecordExtractor(); - _recordExtractor.init(fieldsToRead, null); - _nextRecord = _parquetReader.read(); + if (recordReaderConfig == null || ((ParquetRecordReaderConfig) recordReaderConfig).useParquetAvroRecordReader()) { + _internalParquetRecordReader = new ParquetAvroRecordReader(); + } else { + _useAvroParquetRecordReader = false; + _internalParquetRecordReader = new ParquetNativeRecordReader(); + } + _internalParquetRecordReader.init(dataFile, fieldsToRead, recordReaderConfig); } @Override public boolean hasNext() { - return _nextRecord != null; + return _internalParquetRecordReader.hasNext(); } @Override @@ -64,22 +61,22 @@ public class ParquetRecordReader implements RecordReader { @Override public GenericRow next(GenericRow reuse) throws IOException { - _recordExtractor.extract(_nextRecord, reuse); - _nextRecord = _parquetReader.read(); - return reuse; + return _internalParquetRecordReader.next(reuse); } @Override public void rewind() throws IOException { - _parquetReader.close(); - _parquetReader = ParquetUtils.getParquetReader(_dataFilePath); - _nextRecord = _parquetReader.read(); + _internalParquetRecordReader.rewind(); } @Override public void close() throws IOException { - _parquetReader.close(); + _internalParquetRecordReader.close(); + } + + public boolean useAvroParquetRecordReader() { + return _useAvroParquetRecordReader; } } diff --git a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReaderConfig.java b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReaderConfig.java new file mode 100644 index 0000000..d6bdce2 --- /dev/null +++ b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReaderConfig.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.plugin.inputformat.parquet; + +import org.apache.commons.configuration.Configuration; +import org.apache.pinot.spi.data.readers.RecordReaderConfig; + + +/** + * Config for ParquetRecordReader + */ +public class ParquetRecordReaderConfig implements RecordReaderConfig { + private static final String USE_PARQUET_AVRO_RECORDER_READER = "useParquetAvroRecordReader"; + private boolean _useParquetAvroRecordReader = true; + private Configuration _conf; + + public ParquetRecordReaderConfig() { + } + + public ParquetRecordReaderConfig(Configuration conf) { + _conf = conf; + _useParquetAvroRecordReader = conf.getBoolean(USE_PARQUET_AVRO_RECORDER_READER, true); + } + + public boolean useParquetAvroRecordReader() { + return _useParquetAvroRecordReader; + } + + public void setUseParquetAvroRecordReader(boolean useParquetAvroRecordReader) { + _useParquetAvroRecordReader = useParquetAvroRecordReader; + } + + public Configuration getConfig() { + return _conf; + } +} diff --git a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetUtils.java b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetUtils.java index f2a49b1..5f3dd81 100644 --- a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetUtils.java +++ b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetUtils.java @@ -33,38 +33,41 @@ import org.apache.parquet.hadoop.ParquetFileReader; import org.apache.parquet.hadoop.ParquetReader; import org.apache.parquet.hadoop.ParquetWriter; import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.schema.MessageType; public class ParquetUtils { + private static final String DEFAULT_FS = "file:///"; + private ParquetUtils() { } - private static final String DEFAULT_FS = "file:///"; - /** * Returns a ParquetReader with the given path. */ - public static ParquetReader<GenericRecord> getParquetReader(Path path) + public static ParquetReader<GenericRecord> getParquetAvroReader(Path path) throws IOException { //noinspection unchecked return AvroParquetReader.<GenericRecord>builder(path).disableCompatibility().withDataModel(GenericData.get()) - .withConf(getConfiguration()).build(); + .withConf(getParquetAvroReaderConfiguration()).build(); } /** * Returns a ParquetWriter with the given path and schema. */ - public static ParquetWriter<GenericRecord> getParquetWriter(Path path, Schema schema) + public static ParquetWriter<GenericRecord> getParquetAvroWriter(Path path, Schema schema) throws IOException { - return AvroParquetWriter.<GenericRecord>builder(path).withSchema(schema).withConf(getConfiguration()).build(); + return AvroParquetWriter.<GenericRecord>builder(path).withSchema(schema) + .withConf(getParquetAvroReaderConfiguration()).build(); } /** * Returns the schema for the given Parquet file path. */ - public static Schema getParquetSchema(Path path) + public static Schema getParquetAvroSchema(Path path) throws IOException { - ParquetMetadata footer = ParquetFileReader.readFooter(getConfiguration(), path, ParquetMetadataConverter.NO_FILTER); + ParquetMetadata footer = + ParquetFileReader.readFooter(getParquetAvroReaderConfiguration(), path, ParquetMetadataConverter.NO_FILTER); Map<String, String> metaData = footer.getFileMetaData().getKeyValueMetaData(); String schemaString = metaData.get("parquet.avro.schema"); if (schemaString == null) { @@ -74,11 +77,12 @@ public class ParquetUtils { if (schemaString != null) { return new Schema.Parser().parse(schemaString); } else { - return new AvroSchemaConverter().convert(footer.getFileMetaData().getSchema()); + MessageType parquetSchema = footer.getFileMetaData().getSchema(); + return new AvroSchemaConverter().convert(parquetSchema); } } - private static Configuration getConfiguration() { + private static Configuration getParquetAvroReaderConfiguration() { // The file path used in ParquetRecordReader is a local file path without prefix 'file:///', // so we have to make sure that the configuration item 'fs.defaultFS' is set to 'file:///' // in case that user's hadoop conf overwrite this item diff --git a/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReaderTest.java b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReaderTest.java index ec9beed..f39df45 100644 --- a/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReaderTest.java +++ b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReaderTest.java @@ -18,7 +18,9 @@ */ package org.apache.pinot.plugin.inputformat.parquet; +import com.google.common.collect.ImmutableSet; import java.io.File; +import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -30,11 +32,16 @@ import org.apache.parquet.hadoop.ParquetWriter; import org.apache.pinot.plugin.inputformat.avro.AvroUtils; import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.readers.AbstractRecordReaderTest; +import org.apache.pinot.spi.data.readers.GenericRow; import org.apache.pinot.spi.data.readers.RecordReader; +import org.testng.Assert; +import org.testng.annotations.Test; public class ParquetRecordReaderTest extends AbstractRecordReaderTest { private final File _dataFile = new File(_tempDir, "data.parquet"); + private final File _testParquetFileWithInt96AndDecimal = + new File(getClass().getClassLoader().getResource("test-file-with-int96-and-decimal.snappy.parquet").getFile()); @Override protected RecordReader createRecordReader() @@ -57,10 +64,78 @@ public class ParquetRecordReaderTest extends AbstractRecordReaderTest { records.add(record); } try (ParquetWriter<GenericRecord> writer = ParquetUtils - .getParquetWriter(new Path(_dataFile.getAbsolutePath()), schema)) { + .getParquetAvroWriter(new Path(_dataFile.getAbsolutePath()), schema)) { for (GenericRecord record : records) { writer.write(record); } } } + + @Test + public void testParquetAvroRecordReader() + throws IOException { + ParquetAvroRecordReader avroRecordReader = new ParquetAvroRecordReader(); + avroRecordReader.init(_dataFile, null, new ParquetRecordReaderConfig()); + testReadParquetFile(avroRecordReader, SAMPLE_RECORDS_SIZE); + } + + private void testReadParquetFile(RecordReader reader, int totalRecords) + throws IOException { + int numRecordsRead = 0; + while (reader.hasNext()) { + reader.next(); + numRecordsRead++; + } + Assert.assertEquals(numRecordsRead, totalRecords); + } + + @Test + public void testParquetNativeRecordReader() + throws IOException { + ParquetNativeRecordReader nativeRecordReader = new ParquetNativeRecordReader(); + nativeRecordReader.init(_testParquetFileWithInt96AndDecimal, ImmutableSet.of(), new ParquetRecordReaderConfig()); + testReadParquetFile(nativeRecordReader, 1965); + nativeRecordReader.init(_dataFile, ImmutableSet.of(), new ParquetRecordReaderConfig()); + testReadParquetFile(nativeRecordReader, SAMPLE_RECORDS_SIZE); + } + + @Test + public void testComparison() + throws IOException { + testComparison(_dataFile, SAMPLE_RECORDS_SIZE); + testComparison(new File(getClass().getClassLoader().getResource("users.parquet").getFile()), 1); + testComparison(new File(getClass().getClassLoader().getResource("test-comparison.gz.parquet").getFile()), 363667); + testComparison(new File(getClass().getClassLoader().getResource("test-comparison.snappy.parquet").getFile()), 2870); + testComparison(new File(getClass().getClassLoader().getResource("baseballStats.snappy.parquet").getFile()), 97889); + testComparison(new File(getClass().getClassLoader().getResource("githubEvents.snappy.parquet").getFile()), 10000); + testComparison(new File(getClass().getClassLoader().getResource("starbucksStores.snappy.parquet").getFile()), 6443); + testComparison(new File(getClass().getClassLoader().getResource("airlineStats.snappy.parquet").getFile()), 19492); + testComparison(new File(getClass().getClassLoader().getResource("githubActivities.gz.parquet").getFile()), 2000); + } + + private void testComparison(File dataFile, int totalRecords) + throws IOException { + final ParquetRecordReader avroRecordReader = new ParquetRecordReader(); + avroRecordReader.init(dataFile, null, null); + final ParquetRecordReader nativeRecordReader = new ParquetRecordReader(); + ParquetRecordReaderConfig parquetRecordReaderConfig = new ParquetRecordReaderConfig(); + parquetRecordReaderConfig.setUseParquetAvroRecordReader(false); + nativeRecordReader.init(dataFile, null, parquetRecordReaderConfig); + Assert.assertTrue(avroRecordReader.useAvroParquetRecordReader()); + Assert.assertFalse(nativeRecordReader.useAvroParquetRecordReader()); + + GenericRow avroReuse = new GenericRow(); + GenericRow nativeReuse = new GenericRow(); + int recordsRead = 0; + while (avroRecordReader.hasNext()) { + Assert.assertTrue(nativeRecordReader.hasNext()); + final GenericRow avroReaderRow = avroRecordReader.next(avroReuse); + final GenericRow nativeReaderRow = nativeRecordReader.next(nativeReuse); + Assert.assertEquals(nativeReaderRow.toString(), avroReaderRow.toString()); + Assert.assertTrue(avroReaderRow.equals(nativeReaderRow)); + recordsRead++; + } + Assert.assertEquals(recordsRead, totalRecords, + "Message read from ParquetRecordReader doesn't match the expected number."); + } } diff --git a/pinot-plugins/pinot-input-format/pinot-parquet/src/test/resources/airlineStats.snappy.parquet b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/resources/airlineStats.snappy.parquet new file mode 100644 index 0000000..9d1954b Binary files /dev/null and b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/resources/airlineStats.snappy.parquet differ diff --git a/pinot-plugins/pinot-input-format/pinot-parquet/src/test/resources/baseballStats.snappy.parquet b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/resources/baseballStats.snappy.parquet new file mode 100644 index 0000000..931f882 Binary files /dev/null and b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/resources/baseballStats.snappy.parquet differ diff --git a/pinot-plugins/pinot-input-format/pinot-parquet/src/test/resources/githubActivities.gz.parquet b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/resources/githubActivities.gz.parquet new file mode 100644 index 0000000..1723e2f Binary files /dev/null and b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/resources/githubActivities.gz.parquet differ diff --git a/pinot-plugins/pinot-input-format/pinot-parquet/src/test/resources/githubEvents.snappy.parquet b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/resources/githubEvents.snappy.parquet new file mode 100644 index 0000000..6bd49d0 Binary files /dev/null and b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/resources/githubEvents.snappy.parquet differ diff --git a/pinot-plugins/pinot-input-format/pinot-parquet/src/test/resources/starbucksStores.snappy.parquet b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/resources/starbucksStores.snappy.parquet new file mode 100644 index 0000000..c3b3efc Binary files /dev/null and b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/resources/starbucksStores.snappy.parquet differ diff --git a/pinot-plugins/pinot-input-format/pinot-parquet/src/test/resources/test-comparison.gz.parquet b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/resources/test-comparison.gz.parquet new file mode 100644 index 0000000..160c659 Binary files /dev/null and b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/resources/test-comparison.gz.parquet differ diff --git a/pinot-plugins/pinot-input-format/pinot-parquet/src/test/resources/test-comparison.snappy.parquet b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/resources/test-comparison.snappy.parquet new file mode 100644 index 0000000..8aed9b4 Binary files /dev/null and b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/resources/test-comparison.snappy.parquet differ diff --git a/pinot-plugins/pinot-input-format/pinot-parquet/src/test/resources/test-file-with-int96-and-decimal.snappy.parquet b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/resources/test-file-with-int96-and-decimal.snappy.parquet new file mode 100644 index 0000000..7818851 Binary files /dev/null and b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/resources/test-file-with-int96-and-decimal.snappy.parquet differ diff --git a/pinot-plugins/pinot-input-format/pinot-parquet/src/test/resources/users.parquet b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/resources/users.parquet new file mode 100644 index 0000000..58ed9a8 Binary files /dev/null and b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/resources/users.parquet differ diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/GenericRow.java b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/GenericRow.java index 72e5c52..b3a79ce 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/GenericRow.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/GenericRow.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.core.JsonProcessingException; import java.io.IOException; import java.io.Serializable; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -167,11 +168,65 @@ public class GenericRow implements Serializable { } if (obj instanceof GenericRow) { GenericRow that = (GenericRow) obj; - return _fieldToValueMap.equals(that._fieldToValueMap) && _nullValueFields.equals(that._nullValueFields); + if (!_nullValueFields.containsAll(that._nullValueFields) || !that._nullValueFields + .containsAll(_nullValueFields)) { + return false; + } + return compareMap(_fieldToValueMap, that._fieldToValueMap); } return false; } + private boolean compareMap(Map<String, Object> thisMap, Map<String, Object> thatMap) { + if (thisMap.size() == thatMap.size()) { + for (String key : thisMap.keySet()) { + Object fieldValue = thisMap.get(key); + Object thatFieldValue = thatMap.get(key); + if (fieldValue == null) { + if (thatFieldValue != null) { + return false; + } + } else if (!fieldValue.equals(thatFieldValue)) { + if (fieldValue instanceof Map && thatFieldValue instanceof Map) { + return compareMap((Map<String, Object>) fieldValue, (Map<String, Object>) thatFieldValue); + } + if ((fieldValue instanceof byte[]) && (thatFieldValue instanceof byte[])) { + return Arrays.equals((byte[]) fieldValue, (byte[]) thatFieldValue); + } + if (fieldValue.getClass().isArray() && thatFieldValue.getClass().isArray()) { + return compareArray((Object[]) fieldValue, (Object[]) thatFieldValue); + } + return false; + } + } + } + return true; + } + + private boolean compareArray(Object[] fieldValue, Object[] thatFieldValue) { + for (int i = 0; i < fieldValue.length; i++) { + if (fieldValue[i] instanceof Map) { + if (!(thatFieldValue[i] instanceof Map)) { + return false; + } + if (!compareMap((Map<String, Object>) fieldValue[i], (Map<String, Object>) thatFieldValue[i])) { + return false; + } + continue; + } + if (fieldValue[i].getClass().isArray()) { + if (!compareArray((Object[]) fieldValue[i], (Object[]) thatFieldValue[i])) { + return false; + } + continue; + } + if (!fieldValue[i].equals(thatFieldValue[i])) { + return false; + } + } + return true; + } + @Override public String toString() { try { diff --git a/pinot-spi/src/test/java/org/apache/pinot/spi/data/readers/AbstractRecordReaderTest.java b/pinot-spi/src/test/java/org/apache/pinot/spi/data/readers/AbstractRecordReaderTest.java index 91035cd..91efd6c 100644 --- a/pinot-spi/src/test/java/org/apache/pinot/spi/data/readers/AbstractRecordReaderTest.java +++ b/pinot-spi/src/test/java/org/apache/pinot/spi/data/readers/AbstractRecordReaderTest.java @@ -39,7 +39,7 @@ import org.testng.collections.Lists; public abstract class AbstractRecordReaderTest { private final static Random RANDOM = new Random(System.currentTimeMillis()); - private final static int SAMPLE_RECORDS_SIZE = 10000; + protected final static int SAMPLE_RECORDS_SIZE = 10000; protected final File _tempDir = new File(FileUtils.getTempDirectory(), "RecordReaderTest"); protected List<Map<String, Object>> _records; @@ -155,6 +155,9 @@ public abstract class AbstractRecordReaderTest { @BeforeClass public void setUp() throws Exception { + if (_tempDir.exists()) { + FileUtils.cleanDirectory(_tempDir); + } FileUtils.forceMkdir(_tempDir); // Generate Pinot schema _pinotSchema = getPinotSchema(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org