This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 6e0adf6c41 Replace deprecated methods in ParquetNativeRecordReader 
(#9106)
6e0adf6c41 is described below

commit 6e0adf6c41a02b448400cbe5e457dc90fecd01f9
Author: Kartik Khare <kharekar...@gmail.com>
AuthorDate: Tue Jul 26 15:25:59 2022 +0530

    Replace deprecated methods in ParquetNativeRecordReader (#9106)
    
    Co-authored-by: Kartik Khare <kharekartik@Kartiks-MacBook-Pro.local>
---
 .../parquet/ParquetNativeRecordReader.java         | 28 ++++++++++++----------
 .../plugin/inputformat/parquet/ParquetUtils.java   |  8 +++----
 2 files changed, 20 insertions(+), 16 deletions(-)

diff --git 
a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordReader.java
 
b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordReader.java
index 5448818d90..3f413b9a68 100644
--- 
a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordReader.java
+++ 
b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordReader.java
@@ -24,12 +24,13 @@ import java.util.Set;
 import javax.annotation.Nullable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.parquet.ParquetReadOptions;
 import org.apache.parquet.column.page.PageReadStore;
 import org.apache.parquet.example.data.Group;
 import org.apache.parquet.example.data.simple.convert.GroupRecordConverter;
 import org.apache.parquet.format.converter.ParquetMetadataConverter;
 import org.apache.parquet.hadoop.ParquetFileReader;
-import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
 import org.apache.parquet.io.ColumnIOFactory;
 import org.apache.parquet.io.MessageColumnIO;
 import org.apache.parquet.schema.MessageType;
@@ -45,32 +46,37 @@ public class ParquetNativeRecordReader implements 
RecordReader {
   private Path _dataFilePath;
   private ParquetNativeRecordExtractor _recordExtractor;
   private MessageType _schema;
-  private ParquetMetadata _parquetMetadata;
   private ParquetFileReader _parquetFileReader;
   private Group _nextRecord;
   private PageReadStore _pageReadStore;
   private MessageColumnIO _columnIO;
   private org.apache.parquet.io.RecordReader _parquetRecordReader;
   private int _currentPageIdx;
+  private Configuration _hadoopConf;
+  private ParquetReadOptions _parquetReadOptions;
 
   @Override
   public void init(File dataFile, @Nullable Set<String> fieldsToRead, 
@Nullable RecordReaderConfig recordReaderConfig)
       throws IOException {
     _dataFilePath = new Path(dataFile.getAbsolutePath());
-    Configuration conf = new Configuration();
-    _parquetMetadata = ParquetFileReader.readFooter(conf, _dataFilePath, 
ParquetMetadataConverter.NO_FILTER);
+    _hadoopConf = ParquetUtils.getParquetHadoopConfiguration();
     _recordExtractor = new ParquetNativeRecordExtractor();
     _recordExtractor.init(fieldsToRead, null);
-    _schema = _parquetMetadata.getFileMetaData().getSchema();
-    _parquetFileReader =
-        new ParquetFileReader(conf, _parquetMetadata.getFileMetaData(), 
_dataFilePath, _parquetMetadata.getBlocks(),
-            _schema.getColumns());
+
+    _parquetReadOptions = ParquetReadOptions.builder()
+        .withMetadataFilter(ParquetMetadataConverter.NO_FILTER)
+        .build();
+
+    _parquetFileReader = 
ParquetFileReader.open(HadoopInputFile.fromPath(_dataFilePath, _hadoopConf),
+        _parquetReadOptions);
+    _schema = _parquetFileReader.getFooter().getFileMetaData().getSchema();
     _pageReadStore = _parquetFileReader.readNextRowGroup();
     _columnIO = new ColumnIOFactory().getColumnIO(_schema);
     _parquetRecordReader = _columnIO.getRecordReader(_pageReadStore, new 
GroupRecordConverter(_schema));
     _currentPageIdx = 0;
   }
 
+
   @Override
   public boolean hasNext() {
     if (_pageReadStore == null) {
@@ -113,10 +119,8 @@ public class ParquetNativeRecordReader implements 
RecordReader {
   public void rewind()
       throws IOException {
     _parquetFileReader.close();
-    Configuration conf = new Configuration();
-    _parquetFileReader =
-        new ParquetFileReader(conf, _parquetMetadata.getFileMetaData(), 
_dataFilePath, _parquetMetadata.getBlocks(),
-            _schema.getColumns());
+    _parquetFileReader = 
ParquetFileReader.open(HadoopInputFile.fromPath(_dataFilePath, _hadoopConf),
+        _parquetReadOptions);
     _pageReadStore = _parquetFileReader.readNextRowGroup();
     _parquetRecordReader = _columnIO.getRecordReader(_pageReadStore, new 
GroupRecordConverter(_schema));
     _currentPageIdx = 0;
diff --git 
a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetUtils.java
 
b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetUtils.java
index 5f3dd81909..c3a6bd7a2d 100644
--- 
a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetUtils.java
+++ 
b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetUtils.java
@@ -49,7 +49,7 @@ public class ParquetUtils {
       throws IOException {
     //noinspection unchecked
     return 
AvroParquetReader.<GenericRecord>builder(path).disableCompatibility().withDataModel(GenericData.get())
-        .withConf(getParquetAvroReaderConfiguration()).build();
+        .withConf(getParquetHadoopConfiguration()).build();
   }
 
   /**
@@ -58,7 +58,7 @@ public class ParquetUtils {
   public static ParquetWriter<GenericRecord> getParquetAvroWriter(Path path, 
Schema schema)
       throws IOException {
     return AvroParquetWriter.<GenericRecord>builder(path).withSchema(schema)
-        .withConf(getParquetAvroReaderConfiguration()).build();
+        .withConf(getParquetHadoopConfiguration()).build();
   }
 
   /**
@@ -67,7 +67,7 @@ public class ParquetUtils {
   public static Schema getParquetAvroSchema(Path path)
       throws IOException {
     ParquetMetadata footer =
-        ParquetFileReader.readFooter(getParquetAvroReaderConfiguration(), 
path, ParquetMetadataConverter.NO_FILTER);
+        ParquetFileReader.readFooter(getParquetHadoopConfiguration(), path, 
ParquetMetadataConverter.NO_FILTER);
     Map<String, String> metaData = 
footer.getFileMetaData().getKeyValueMetaData();
     String schemaString = metaData.get("parquet.avro.schema");
     if (schemaString == null) {
@@ -82,7 +82,7 @@ public class ParquetUtils {
     }
   }
 
-  private static Configuration getParquetAvroReaderConfiguration() {
+  public static Configuration getParquetHadoopConfiguration() {
     // The file path used in ParquetRecordReader is a local file path without 
prefix 'file:///',
     // so we have to make sure that the configuration item 'fs.defaultFS' is 
set to 'file:///'
     // in case that user's hadoop conf overwrite this item


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to