This is an automated email from the ASF dual-hosted git repository. xiangfu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 6e0adf6c41 Replace deprecated methods in ParquetNativeRecordReader (#9106) 6e0adf6c41 is described below commit 6e0adf6c41a02b448400cbe5e457dc90fecd01f9 Author: Kartik Khare <kharekar...@gmail.com> AuthorDate: Tue Jul 26 15:25:59 2022 +0530 Replace deprecated methods in ParquetNativeRecordReader (#9106) Co-authored-by: Kartik Khare <kharekartik@Kartiks-MacBook-Pro.local> --- .../parquet/ParquetNativeRecordReader.java | 28 ++++++++++++---------- .../plugin/inputformat/parquet/ParquetUtils.java | 8 +++---- 2 files changed, 20 insertions(+), 16 deletions(-) diff --git a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordReader.java b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordReader.java index 5448818d90..3f413b9a68 100644 --- a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordReader.java +++ b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordReader.java @@ -24,12 +24,13 @@ import java.util.Set; import javax.annotation.Nullable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.parquet.ParquetReadOptions; import org.apache.parquet.column.page.PageReadStore; import org.apache.parquet.example.data.Group; import org.apache.parquet.example.data.simple.convert.GroupRecordConverter; import org.apache.parquet.format.converter.ParquetMetadataConverter; import org.apache.parquet.hadoop.ParquetFileReader; -import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.hadoop.util.HadoopInputFile; import org.apache.parquet.io.ColumnIOFactory; import org.apache.parquet.io.MessageColumnIO; import org.apache.parquet.schema.MessageType; @@ -45,32 +46,37 @@ public class ParquetNativeRecordReader implements RecordReader { private Path _dataFilePath; private ParquetNativeRecordExtractor _recordExtractor; private MessageType _schema; - private ParquetMetadata _parquetMetadata; private ParquetFileReader _parquetFileReader; private Group _nextRecord; private PageReadStore _pageReadStore; private MessageColumnIO _columnIO; private org.apache.parquet.io.RecordReader _parquetRecordReader; private int _currentPageIdx; + private Configuration _hadoopConf; + private ParquetReadOptions _parquetReadOptions; @Override public void init(File dataFile, @Nullable Set<String> fieldsToRead, @Nullable RecordReaderConfig recordReaderConfig) throws IOException { _dataFilePath = new Path(dataFile.getAbsolutePath()); - Configuration conf = new Configuration(); - _parquetMetadata = ParquetFileReader.readFooter(conf, _dataFilePath, ParquetMetadataConverter.NO_FILTER); + _hadoopConf = ParquetUtils.getParquetHadoopConfiguration(); _recordExtractor = new ParquetNativeRecordExtractor(); _recordExtractor.init(fieldsToRead, null); - _schema = _parquetMetadata.getFileMetaData().getSchema(); - _parquetFileReader = - new ParquetFileReader(conf, _parquetMetadata.getFileMetaData(), _dataFilePath, _parquetMetadata.getBlocks(), - _schema.getColumns()); + + _parquetReadOptions = ParquetReadOptions.builder() + .withMetadataFilter(ParquetMetadataConverter.NO_FILTER) + .build(); + + _parquetFileReader = ParquetFileReader.open(HadoopInputFile.fromPath(_dataFilePath, _hadoopConf), + _parquetReadOptions); + _schema = _parquetFileReader.getFooter().getFileMetaData().getSchema(); _pageReadStore = _parquetFileReader.readNextRowGroup(); _columnIO = new ColumnIOFactory().getColumnIO(_schema); _parquetRecordReader = _columnIO.getRecordReader(_pageReadStore, new GroupRecordConverter(_schema)); _currentPageIdx = 0; } + @Override public boolean hasNext() { if (_pageReadStore == null) { @@ -113,10 +119,8 @@ public class ParquetNativeRecordReader implements RecordReader { public void rewind() throws IOException { _parquetFileReader.close(); - Configuration conf = new Configuration(); - _parquetFileReader = - new ParquetFileReader(conf, _parquetMetadata.getFileMetaData(), _dataFilePath, _parquetMetadata.getBlocks(), - _schema.getColumns()); + _parquetFileReader = ParquetFileReader.open(HadoopInputFile.fromPath(_dataFilePath, _hadoopConf), + _parquetReadOptions); _pageReadStore = _parquetFileReader.readNextRowGroup(); _parquetRecordReader = _columnIO.getRecordReader(_pageReadStore, new GroupRecordConverter(_schema)); _currentPageIdx = 0; diff --git a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetUtils.java b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetUtils.java index 5f3dd81909..c3a6bd7a2d 100644 --- a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetUtils.java +++ b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetUtils.java @@ -49,7 +49,7 @@ public class ParquetUtils { throws IOException { //noinspection unchecked return AvroParquetReader.<GenericRecord>builder(path).disableCompatibility().withDataModel(GenericData.get()) - .withConf(getParquetAvroReaderConfiguration()).build(); + .withConf(getParquetHadoopConfiguration()).build(); } /** @@ -58,7 +58,7 @@ public class ParquetUtils { public static ParquetWriter<GenericRecord> getParquetAvroWriter(Path path, Schema schema) throws IOException { return AvroParquetWriter.<GenericRecord>builder(path).withSchema(schema) - .withConf(getParquetAvroReaderConfiguration()).build(); + .withConf(getParquetHadoopConfiguration()).build(); } /** @@ -67,7 +67,7 @@ public class ParquetUtils { public static Schema getParquetAvroSchema(Path path) throws IOException { ParquetMetadata footer = - ParquetFileReader.readFooter(getParquetAvroReaderConfiguration(), path, ParquetMetadataConverter.NO_FILTER); + ParquetFileReader.readFooter(getParquetHadoopConfiguration(), path, ParquetMetadataConverter.NO_FILTER); Map<String, String> metaData = footer.getFileMetaData().getKeyValueMetaData(); String schemaString = metaData.get("parquet.avro.schema"); if (schemaString == null) { @@ -82,7 +82,7 @@ public class ParquetUtils { } } - private static Configuration getParquetAvroReaderConfiguration() { + public static Configuration getParquetHadoopConfiguration() { // The file path used in ParquetRecordReader is a local file path without prefix 'file:///', // so we have to make sure that the configuration item 'fs.defaultFS' is set to 'file:///' // in case that user's hadoop conf overwrite this item --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org