>From Preetham Poluparthi <[email protected]>:

Preetham Poluparthi has uploaded this change for review. ( 
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20885?usp=email )


Change subject: WIP: update parquet reader
......................................................................

WIP: update parquet reader

Change-Id: I212cd38479610afe211f2a37d6f527fb62324d2c
---
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/MapredParquetInputFormat.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/ParquetRecordReaderWrapper.java
2 files changed, 75 insertions(+), 140 deletions(-)



  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/85/20885/1

diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/MapredParquetInputFormat.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/MapredParquetInputFormat.java
index 7b10e09..dbada39 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/MapredParquetInputFormat.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/MapredParquetInputFormat.java
@@ -18,22 +18,12 @@
  */
 package org.apache.asterix.external.input.record.reader.hdfs.parquet;

-import static java.util.Arrays.asList;
-
-import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.IOException;
-import java.util.List;

 import 
org.apache.asterix.external.input.filter.embedder.IExternalFilterValueEmbedder;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hyracks.data.std.primitive.VoidPointable;
-import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
-import org.apache.parquet.hadoop.Footer;
-import org.apache.parquet.hadoop.ParquetInputFormat;
 import org.apache.parquet.hadoop.ParquetInputSplit;

 /**
@@ -45,49 +35,23 @@
  * The newer API (@see org.apache.hadoop.mapreduce) is not yet supported.
  * Beware before upgrading Apache Parquet version.
  */
-public class MapredParquetInputFormat extends 
org.apache.hadoop.mapred.FileInputFormat<Void, VoidPointable> {
+public class MapredParquetInputFormat
+        extends org.apache.hadoop.mapreduce.lib.input.FileInputFormat<Void, 
VoidPointable> {

-    private final ParquetInputFormat<ArrayBackedValueStorage> realInputFormat 
= new ParquetInputFormat<>();
     private IExternalFilterValueEmbedder valueEmbedder;

     @Override
-    public RecordReader<Void, VoidPointable> getRecordReader(InputSplit split, 
JobConf job, Reporter reporter)
-            throws IOException {
-        return new ParquetRecordReaderWrapper(split, job, reporter, 
valueEmbedder);
-    }
-
-    @Override
-    public InputSplit[] getSplits(JobConf job, int numSplits) throws 
IOException {
-        if (isTaskSideMetaData(job)) {
-            return super.getSplits(job, numSplits);
-        }
-
-        List<Footer> footers = getFooters(job);
-        List<ParquetInputSplit> splits = realInputFormat.getSplits(job, 
footers);
-        if (splits == null) {
-            return null; //NOSONAR
-        }
-        InputSplit[] resultSplits = new InputSplit[splits.size()];
-        int i = 0;
-        for (ParquetInputSplit split : splits) {
-            resultSplits[i++] = new ParquetInputSplitWrapper(split);
-        }
-        return resultSplits;
-    }
-
-    public List<Footer> getFooters(JobConf job) throws IOException {
-        return realInputFormat.getFooters(job, asList(super.listStatus(job)));
+    public org.apache.hadoop.mapreduce.RecordReader<Void, VoidPointable> 
createRecordReader(
+            org.apache.hadoop.mapreduce.InputSplit split, TaskAttemptContext 
context)
+            throws IOException, InterruptedException {
+        return new ParquetRecordReaderWrapper(split, context, valueEmbedder);
     }

     public void setValueEmbedder(IExternalFilterValueEmbedder valueEmbedder) {
         this.valueEmbedder = valueEmbedder;
     }

-    public static boolean isTaskSideMetaData(JobConf job) {
-        return job.getBoolean(ParquetInputFormat.TASK_SIDE_METADATA, true);
-    }
-
-    static class ParquetInputSplitWrapper implements InputSplit {
+    static class ParquetInputSplitWrapper extends InputSplit {

         ParquetInputSplit realSplit;

@@ -95,10 +59,6 @@
         public ParquetInputSplitWrapper() {
         }

-        public ParquetInputSplitWrapper(ParquetInputSplit realSplit) {
-            this.realSplit = realSplit;
-        }
-
         @Override
         public long getLength() throws IOException {
             return realSplit.getLength();
@@ -110,19 +70,9 @@
         }

         @Override
-        public void readFields(DataInput in) throws IOException {
-            realSplit = new ParquetInputSplit();
-            realSplit.readFields(in);
-        }
-
-        @Override
-        public void write(DataOutput out) throws IOException {
-            realSplit.write(out);
-        }
-
-        @Override
         public String toString() {
             return realSplit.toString();
         }
     }
+
 }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/ParquetRecordReaderWrapper.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/ParquetRecordReaderWrapper.java
index a293ebb..ebfa1d0 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/ParquetRecordReaderWrapper.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/ParquetRecordReaderWrapper.java
@@ -24,11 +24,9 @@
 import org.apache.asterix.common.exceptions.RuntimeDataException;
 import 
org.apache.asterix.external.input.filter.embedder.IExternalFilterValueEmbedder;
 import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hyracks.data.std.api.IValueReference;
 import org.apache.hyracks.data.std.primitive.VoidPointable;
 import org.apache.hyracks.util.LogRedactionUtil;
@@ -36,7 +34,7 @@
 import org.apache.parquet.hadoop.ParquetRecordReader;
 import org.apache.parquet.hadoop.api.ReadSupport;

-public class ParquetRecordReaderWrapper implements RecordReader<Void, 
VoidPointable> {
+public class ParquetRecordReaderWrapper extends RecordReader<Void, 
VoidPointable> {
     private final ParquetRecordReader<IValueReference> realReader;
     private final long splitLen; // for getPos()

@@ -45,51 +43,39 @@
     private boolean firstRecord;
     private boolean eof;

-    public ParquetRecordReaderWrapper(InputSplit oldSplit, JobConf oldJobConf, 
Reporter reporter,
-            IExternalFilterValueEmbedder valueEmbedder) throws IOException {
+    public ParquetRecordReaderWrapper(InputSplit oldSplit, TaskAttemptContext 
context,
+            IExternalFilterValueEmbedder valueEmbedder) throws IOException, 
InterruptedException {
         splitLen = oldSplit.getLength();

-        try {
-            ReadSupport<IValueReference> readSupport = 
ParquetInputFormat.getReadSupportInstance(oldJobConf);
-            ParquetReadSupport parquetReadSupport = (ParquetReadSupport) 
readSupport;
-            parquetReadSupport.setValueEmbedder(valueEmbedder);
-            realReader = new ParquetRecordReader<>(readSupport, 
ParquetInputFormat.getFilter(oldJobConf));
+        ReadSupport<IValueReference> readSupport =
+                
ParquetInputFormat.getReadSupportInstance(context.getConfiguration());
+        ParquetReadSupport parquetReadSupport = (ParquetReadSupport) 
readSupport;
+        parquetReadSupport.setValueEmbedder(valueEmbedder);
+        realReader = new ParquetRecordReader<>(readSupport, 
ParquetInputFormat.getFilter(context.getConfiguration()));

-            if (oldSplit instanceof 
MapredParquetInputFormat.ParquetInputSplitWrapper) {
-                
realReader.initialize(((MapredParquetInputFormat.ParquetInputSplitWrapper) 
oldSplit).realSplit,
-                        oldJobConf, reporter);
-            } else if (oldSplit instanceof FileSplit) {
-                realReader.initialize((FileSplit) oldSplit, oldJobConf, 
reporter);
-            } else {
-                throw 
RuntimeDataException.create(ErrorCode.INVALID_PARQUET_FILE,
-                        LogRedactionUtil.userData(oldSplit.toString()), 
"invalid file split");
-            }
-
-            // Set the path for value embedder
-            valueEmbedder.setPath(getPath(oldSplit));
-
-            valueContainer = new VoidPointable();
-            firstRecord = false;
-            eof = false;
-            // read once to gain access to key and value objects
-            if (realReader.nextKeyValue()) {
-                firstRecord = true;
-                valueContainer.set(realReader.getCurrentValue());
-            } else {
-                eof = true;
-            }
-        } catch (InterruptedException e) {
-            throw new IOException(e);
-        } catch (HyracksDataException | AsterixParquetRuntimeException e) {
-            throw e;
-        } catch (Exception e) {
-            if (e.getMessage() != null && e.getMessage().contains("not a 
Parquet file")) {
-                throw 
RuntimeDataException.create(ErrorCode.INVALID_PARQUET_FILE,
-                        LogRedactionUtil.userData(getPath(oldSplit)), "not a 
Parquet file");
-            }
-
-            throw RuntimeDataException.create(e);
+        if (oldSplit instanceof 
MapredParquetInputFormat.ParquetInputSplitWrapper) {
+            
realReader.initialize(((MapredParquetInputFormat.ParquetInputSplitWrapper) 
oldSplit).realSplit, context);
+        } else if (oldSplit instanceof FileSplit) {
+            realReader.initialize((FileSplit) oldSplit, context);
+        } else {
+            throw RuntimeDataException.create(ErrorCode.INVALID_PARQUET_FILE,
+                    LogRedactionUtil.userData(oldSplit.toString()), "invalid 
file split");
         }
+
+        // Set the path for value embedder
+        valueEmbedder.setPath(getPath(oldSplit));
+
+        valueContainer = new VoidPointable();
+        firstRecord = false;
+        eof = false;
+        // read once to gain access to key and value objects
+        if (realReader.nextKeyValue()) {
+            firstRecord = true;
+            valueContainer.set(realReader.getCurrentValue());
+        } else {
+            eof = true;
+        }
+
     }

     private String getPath(InputSplit split) {
@@ -108,21 +94,46 @@
     }

     @Override
-    public Void createKey() {
+    public void initialize(org.apache.hadoop.mapreduce.InputSplit split, 
TaskAttemptContext context)
+            throws IOException, InterruptedException {
+
+    }
+
+    @Override
+    public boolean nextKeyValue() throws IOException {
+        if (eof) {
+            return false;
+        }
+
+        if (firstRecord) {
+            firstRecord = false;
+            return true;
+        }
+
+        try {
+            if (realReader.nextKeyValue()) {
+                valueContainer.set(realReader.getCurrentValue());
+                return true;
+            }
+        } catch (InterruptedException e) {
+            throw new IOException(e);
+        }
+
+        eof = true;
+        return false;
+    }
+
+    @Override
+    public Void getCurrentKey() {
         return null;
     }

     @Override
-    public VoidPointable createValue() {
+    public VoidPointable getCurrentValue() {
         return valueContainer;
     }

     @Override
-    public long getPos() throws IOException {
-        return (long) (splitLen * getProgress());
-    }
-
-    @Override
     public float getProgress() throws IOException {
         try {
             return realReader.getProgress();
@@ -131,30 +142,4 @@
         }
     }

-    @Override
-    public boolean next(Void key, VoidPointable value) throws IOException {
-        if (eof) {
-            return false;
-        }
-
-        if (firstRecord) { // key & value are already read.
-            firstRecord = false;
-            value.set(valueContainer);
-            return true;
-        }
-
-        try {
-            if (realReader.nextKeyValue()) {
-                if (value != null) {
-                    value.set(realReader.getCurrentValue());
-                }
-                return true;
-            }
-        } catch (InterruptedException e) {
-            throw new IOException(e);
-        }
-
-        eof = true; // strictly not required, just for consistency
-        return false;
-    }
 }

--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20885?usp=email
To unsubscribe, or for help writing mail filters, visit 
https://asterix-gerrit.ics.uci.edu/settings?usp=email

Gerrit-MessageType: newchange
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Change-Id: I212cd38479610afe211f2a37d6f527fb62324d2c
Gerrit-Change-Number: 20885
Gerrit-PatchSet: 1
Gerrit-Owner: Preetham Poluparthi <[email protected]>

Reply via email to