This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git

commit cf9969af31f996d7a8e734697d08267e83d80a73
Author: Ashin Gau <ashin...@users.noreply.github.com>
AuthorDate: Tue Jul 4 17:04:49 2023 +0800

    [opt](hudi) use spark bundle to read hudi data (#21260)
    
    Use spark-bundle to read hudi data instead of using hive-bundle to read 
hudi data.
    
    **Advantage** for using spark-bundle to read hudi data:
    1. The performance of spark-bundle is more than twice that of hive-bundle
    2. spark-bundle using `UnsafeRow` can reduce data copying and GC time of 
the jvm
    3. spark-bundle support `Time Travel`, `Incremental Read`, and `Schema 
Change`, these functions can be quickly ported to Doris
    
    **Disadvantage** for using spark-bundle to read hudi data:
    1. More dependencies make hudi-dependency.jar very cumbersome(from 138M -> 
300M)
    2. spark-bundle only provides `RDD` interface and cannot be used directly
---
 be/src/vec/exec/scan/hudi_jni_reader.cpp           |   6 +-
 be/src/vec/exec/scan/hudi_jni_reader.h             |   2 +-
 bin/start_be.sh                                    |  12 +-
 .../org/apache/doris/avro/AvroColumnValue.java     |   5 +
 .../java/org/apache/doris/avro/AvroJNIScanner.java |   1 +
 fe/be-java-extensions/hudi-scanner/pom.xml         | 202 +++++-
 .../org/apache/doris/hudi/HudiColumnValue.java     | 141 +++--
 .../java/org/apache/doris/hudi/HudiJniScanner.java | 194 ++----
 .../java/org/apache/doris/hudi/HudiScanParam.java  | 194 ------
 .../java/org/apache/doris/hudi/HudiScanUtils.java  |  62 --
 .../src/main/java/org/apache/doris/hudi/Utils.java |  21 +
 .../org/apache/doris/hudi/BaseSplitReader.scala    | 702 +++++++++++++++++++++
 .../apache/doris/hudi/HoodieRecordIterator.scala   | 142 +++++
 .../apache/doris/hudi/MORSnapshotSplitReader.scala | 183 ++++++
 .../org/apache/doris/common/jni/JniScanner.java    |   9 +-
 .../apache/doris/common/jni/MockJniScanner.java    |  11 +-
 .../apache/doris/common/jni/vec/ColumnValue.java   |   2 +
 .../doris/common/jni/vec/NativeColumnValue.java    |  48 ++
 .../apache/doris/common/jni/vec/ScanPredicate.java |   5 +
 .../apache/doris/common/jni/vec/VectorColumn.java  |  35 +-
 .../apache/doris/common/jni/vec/VectorTable.java   |   5 +
 .../doris/maxcompute/MaxComputeColumnValue.java    |  10 +-
 .../doris/maxcompute/MaxComputeJniScanner.java     |   7 -
 .../org/apache/doris/paimon/PaimonColumnValue.java |   7 +-
 .../org/apache/doris/paimon/PaimonJniScanner.java  |   7 -
 .../java/org/apache/doris/catalog/HudiUtils.java   |   3 +
 26 files changed, 1500 insertions(+), 516 deletions(-)

diff --git a/be/src/vec/exec/scan/hudi_jni_reader.cpp 
b/be/src/vec/exec/scan/hudi_jni_reader.cpp
index 1e02b8ee79..029135ac67 100644
--- a/be/src/vec/exec/scan/hudi_jni_reader.cpp
+++ b/be/src/vec/exec/scan/hudi_jni_reader.cpp
@@ -21,6 +21,7 @@
 #include <ostream>
 
 #include "runtime/descriptors.h"
+#include "runtime/runtime_state.h"
 #include "runtime/types.h"
 #include "vec/core/types.h"
 
@@ -35,7 +36,7 @@ class Block;
 
 namespace doris::vectorized {
 
-const std::string HudiJniReader::HADOOP_FS_PREFIX = "hadoop_fs.";
+const std::string HudiJniReader::HADOOP_CONF_PREFIX = "hadoop_conf.";
 
 HudiJniReader::HudiJniReader(const TFileScanRangeParams& scan_params,
                              const THudiFileDesc& hudi_params,
@@ -52,6 +53,7 @@ HudiJniReader::HudiJniReader(const TFileScanRangeParams& 
scan_params,
     }
 
     std::map<String, String> params = {
+            {"query_id", print_id(_state->query_id())},
             {"base_path", _hudi_params.base_path},
             {"data_file_path", _hudi_params.data_file_path},
             {"data_file_length", 
std::to_string(_hudi_params.data_file_length)},
@@ -65,7 +67,7 @@ HudiJniReader::HudiJniReader(const TFileScanRangeParams& 
scan_params,
 
     // Use compatible hadoop client to read data
     for (auto& kv : _scan_params.properties) {
-        params[HADOOP_FS_PREFIX + kv.first] = kv.second;
+        params[HADOOP_CONF_PREFIX + kv.first] = kv.second;
     }
 
     _jni_connector = 
std::make_unique<JniConnector>("org/apache/doris/hudi/HudiJniScanner", params,
diff --git a/be/src/vec/exec/scan/hudi_jni_reader.h 
b/be/src/vec/exec/scan/hudi_jni_reader.h
index c3bd68056d..bf2dab943d 100644
--- a/be/src/vec/exec/scan/hudi_jni_reader.h
+++ b/be/src/vec/exec/scan/hudi_jni_reader.h
@@ -46,7 +46,7 @@ class HudiJniReader : public GenericReader {
     ENABLE_FACTORY_CREATOR(HudiJniReader);
 
 public:
-    static const std::string HADOOP_FS_PREFIX;
+    static const std::string HADOOP_CONF_PREFIX;
 
     HudiJniReader(const TFileScanRangeParams& scan_params, const 
THudiFileDesc& hudi_params,
                   const std::vector<SlotDescriptor*>& file_slot_descs, 
RuntimeState* state,
diff --git a/bin/start_be.sh b/bin/start_be.sh
index abcaedf682..c603239a85 100755
--- a/bin/start_be.sh
+++ b/bin/start_be.sh
@@ -82,23 +82,23 @@ for f in "${DORIS_HOME}/lib/java_extensions"/*.jar; do
     if [[ -z "${DORIS_CLASSPATH}" ]]; then
         export DORIS_CLASSPATH="${f}"
     else
-        export DORIS_CLASSPATH="${f}:${DORIS_CLASSPATH}"
+        export DORIS_CLASSPATH="${DORIS_CLASSPATH}:${f}"
     fi
 done
 
 if [[ -d "${DORIS_HOME}/lib/hadoop_hdfs/" ]]; then
     # add hadoop libs
     for f in "${DORIS_HOME}/lib/hadoop_hdfs/common"/*.jar; do
-        DORIS_CLASSPATH="${f}:${DORIS_CLASSPATH}"
+        DORIS_CLASSPATH="${DORIS_CLASSPATH}:${f}"
     done
     for f in "${DORIS_HOME}/lib/hadoop_hdfs/common/lib"/*.jar; do
-        DORIS_CLASSPATH="${f}:${DORIS_CLASSPATH}"
+        DORIS_CLASSPATH="${DORIS_CLASSPATH}:${f}"
     done
     for f in "${DORIS_HOME}/lib/hadoop_hdfs/hdfs"/*.jar; do
-        DORIS_CLASSPATH="${f}:${DORIS_CLASSPATH}"
+        DORIS_CLASSPATH="${DORIS_CLASSPATH}:${f}"
     done
     for f in "${DORIS_HOME}/lib/hadoop_hdfs/hdfs/lib"/*.jar; do
-        DORIS_CLASSPATH="${f}:${DORIS_CLASSPATH}"
+        DORIS_CLASSPATH="${DORIS_CLASSPATH}:${f}"
     done
 fi
 
@@ -107,7 +107,7 @@ fi
 if command -v hadoop >/dev/null 2>&1; then
     HADOOP_SYSTEM_CLASSPATH="$(hadoop classpath --glob)"
 fi
-export 
CLASSPATH="${HADOOP_SYSTEM_CLASSPATH}:${DORIS_HOME}/conf/:${DORIS_CLASSPATH}"
+export 
CLASSPATH="${DORIS_CLASSPATH}:${HADOOP_SYSTEM_CLASSPATH}:${DORIS_HOME}/conf/"
 # DORIS_CLASSPATH is for self-managed jni
 export DORIS_CLASSPATH="-Djava.class.path=${DORIS_CLASSPATH}"
 
diff --git 
a/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroColumnValue.java
 
b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroColumnValue.java
index 3c796f1fc7..dd72c9aad5 100644
--- 
a/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroColumnValue.java
+++ 
b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroColumnValue.java
@@ -105,6 +105,11 @@ public class AvroColumnValue implements ColumnValue {
         return inspectObject().toString();
     }
 
+    @Override
+    public byte[] getStringAsBytes() {
+        throw new UnsupportedOperationException();
+    }
+
     @Override
     public LocalDate getDate() {
         // avro has no date type
diff --git 
a/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroJNIScanner.java
 
b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroJNIScanner.java
index 72b9e41416..d3e1cad579 100644
--- 
a/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroJNIScanner.java
+++ 
b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroJNIScanner.java
@@ -192,6 +192,7 @@ public class AvroJNIScanner extends JniScanner {
         return numRows;
     }
 
+    @Override
     protected TableSchema parseTableSchema() throws 
UnsupportedOperationException {
         Schema schema = avroReader.getSchema();
         List<Field> schemaFields = schema.getFields();
diff --git a/fe/be-java-extensions/hudi-scanner/pom.xml 
b/fe/be-java-extensions/hudi-scanner/pom.xml
index 098073504e..1b19da9887 100644
--- a/fe/be-java-extensions/hudi-scanner/pom.xml
+++ b/fe/be-java-extensions/hudi-scanner/pom.xml
@@ -30,83 +30,217 @@ under the License.
     <properties>
         <doris.home>${basedir}/../../</doris.home>
         <fe_ut_parallel>1</fe_ut_parallel>
+        <scala.version>2.12.15</scala.version>
+        <scala.binary.version>2.12</scala.binary.version>
+        <spark.version>3.2.0</spark.version>
+        <sparkbundle.version>3.2</sparkbundle.version>
+        <hudi.version>0.13.0</hudi.version>
+        <janino.version>3.0.16</janino.version>
+        <fasterxml.jackson.version>2.14.3</fasterxml.jackson.version>
     </properties>
 
     <dependencies>
         <dependency>
-            <groupId>org.apache.doris</groupId>
-            <artifactId>java-common</artifactId>
-            <version>${project.version}</version>
+            <groupId>org.scala-lang</groupId>
+            <artifactId>scala-library</artifactId>
+            <version>${scala.version}</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hudi</groupId>
+            <artifactId>hudi-spark-common_${scala.binary.version}</artifactId>
+            <version>${hudi.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hudi</groupId>
+            <artifactId>hudi-spark-client</artifactId>
+            <version>${hudi.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hudi</groupId>
+            <artifactId>hudi-spark3-common</artifactId>
+            <version>${hudi.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hudi</groupId>
+            <artifactId>hudi-spark3.2.x_${scala.binary.version}</artifactId>
+            <version>${hudi.version}</version>
             <exclusions>
                 <exclusion>
-                    <artifactId>fe-common</artifactId>
-                    <groupId>org.apache.doris</groupId>
+                    <artifactId>json4s-ast_2.11</artifactId>
+                    <groupId>org.json4s</groupId>
+                </exclusion>
+                <exclusion>
+                    <artifactId>json4s-core_2.11</artifactId>
+                    <groupId>org.json4s</groupId>
+                </exclusion>
+                <exclusion>
+                    <artifactId>json4s-jackson_2.11</artifactId>
+                    <groupId>org.json4s</groupId>
+                </exclusion>
+                <exclusion>
+                    <artifactId>json4s-scalap_2.11</artifactId>
+                    <groupId>org.json4s</groupId>
                 </exclusion>
             </exclusions>
         </dependency>
-
         <dependency>
-            <groupId>org.apache.hudi</groupId>
-            <artifactId>hudi-hadoop-mr-bundle</artifactId>
-            <version>${hudi.version}</version>
+            <groupId>org.apache.parquet</groupId>
+            <artifactId>parquet-avro</artifactId>
+            <version>1.10.1</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-core_${scala.binary.version}</artifactId>
             <exclusions>
                 <exclusion>
-                    <groupId>com.google.protobuf</groupId>
-                    <artifactId>protobuf-java</artifactId>
+                    <groupId>javax.servlet</groupId>
+                    <artifactId>*</artifactId>
                 </exclusion>
                 <exclusion>
-                    <groupId>commons-lang</groupId>
-                    <artifactId>commons-lang</artifactId>
+                    <artifactId>jackson-module-scala_2.12</artifactId>
+                    <groupId>com.fasterxml.jackson.module</groupId>
                 </exclusion>
                 <exclusion>
-                    <groupId>org.apache.hudi</groupId>
-                    <artifactId>hudi-common</artifactId>
+                    <artifactId>hadoop-client-api</artifactId>
+                    <groupId>org.apache.hadoop</groupId>
                 </exclusion>
                 <exclusion>
-                    <groupId>org.apache.parquet</groupId>
-                    <artifactId>parquet-avro</artifactId>
+                    <artifactId>hadoop-client-runtime</artifactId>
+                    <groupId>org.apache.hadoop</groupId>
                 </exclusion>
+            </exclusions>
+            <version>${spark.version}</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-catalyst_${scala.binary.version}</artifactId>
+            <version>${spark.version}</version>
+            <scope>compile</scope>
+            <exclusions>
                 <exclusion>
-                    <groupId>org.apache.avro</groupId>
-                    <artifactId>avro</artifactId>
+                    <groupId>org.codehaus.janino</groupId>
+                    <artifactId>janino</artifactId>
                 </exclusion>
                 <exclusion>
-                    <artifactId>hudi-hadoop-mr</artifactId>
-                    <groupId>org.apache.hudi</groupId>
+                    <groupId>org.codehaus.janino</groupId>
+                    <artifactId>commons-compiler</artifactId>
                 </exclusion>
             </exclusions>
         </dependency>
         <dependency>
-            <groupId>com.facebook.presto.hive</groupId>
-            <artifactId>hive-apache</artifactId>
-            <version>${presto.hive.version}</version>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-sql_${scala.binary.version}</artifactId>
+            <version>${spark.version}</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-launcher_${scala.binary.version}</artifactId>
+            <version>${spark.version}</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <!-- version of spark's janino is error -->
+            <groupId>org.codehaus.janino</groupId>
+            <artifactId>janino</artifactId>
+            <version>${janino.version}</version>
             <exclusions>
                 <exclusion>
-                    <groupId>org.slf4j</groupId>
-                    <artifactId>slf4j-log4j12</artifactId>
+                    <groupId>org.codehaus.janino</groupId>
+                    <artifactId>commons-compiler</artifactId>
                 </exclusion>
             </exclusions>
         </dependency>
         <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-client</artifactId>
+            <groupId>org.codehaus.janino</groupId>
+            <artifactId>commons-compiler</artifactId>
+            <version>${janino.version}</version>
         </dependency>
         <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-common</artifactId>
+            <!-- version of spark's jackson module is error -->
+            <groupId>com.fasterxml.jackson.module</groupId>
+            
<artifactId>jackson-module-scala_${scala.binary.version}</artifactId>
+            <version>${fasterxml.jackson.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>com.google.guava</groupId>
+                    <artifactId>guava</artifactId>
+                </exclusion>
+            </exclusions>
         </dependency>
+
         <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-hdfs</artifactId>
+            <groupId>org.apache.doris</groupId>
+            <artifactId>java-common</artifactId>
+            <version>${project.version}</version>
         </dependency>
         <dependency>
-            <groupId>commons-io</groupId>
-            <artifactId>commons-io</artifactId>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-common</artifactId>
         </dependency>
     </dependencies>
     <build>
         <finalName>hudi-scanner</finalName>
+        <sourceDirectory>src/main/java</sourceDirectory>
+        <testSourceDirectory>src/test/java</testSourceDirectory>
+        <resources>
+            <resource>
+                <directory>src/main/resources</directory>
+            </resource>
+        </resources>
+        <testResources>
+            <testResource>
+                <directory>src/test/resources</directory>
+            </testResource>
+        </testResources>
+
         <plugins>
+            <plugin>
+                <groupId>net.alchim31.maven</groupId>
+                <artifactId>scala-maven-plugin</artifactId>
+                <version>4.7.2</version>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>compile</goal>
+                            <goal>testCompile</goal>
+                        </goals>
+                    </execution>
+                </executions>
+                <configuration>
+                    <scalaVersion>${scala.version}</scalaVersion>
+                    <args>
+                        <arg>-unchecked</arg>
+                        <arg>-deprecation</arg>
+                        <arg>-feature</arg>
+                    </args>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>default-compile</id>
+                        <phase>none</phase>
+                    </execution>
+                    <execution>
+                        <id>default-testCompile</id>
+                        <phase>none</phase>
+                    </execution>
+                    <execution>
+                        <id>java-compile</id>
+                        <goals>
+                            <goal>compile</goal>
+                            <goal>testCompile</goal>
+                        </goals>
+                        <phase>compile</phase>
+                    </execution>
+                </executions>
+            </plugin>
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-assembly-plugin</artifactId>
diff --git 
a/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiColumnValue.java
 
b/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiColumnValue.java
index 4a7ea36e44..7d402e8429 100644
--- 
a/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiColumnValue.java
+++ 
b/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiColumnValue.java
@@ -18,51 +18,56 @@
 package org.apache.doris.hudi;
 
 
+import org.apache.doris.common.jni.vec.ColumnType;
 import org.apache.doris.common.jni.vec.ColumnValue;
+import org.apache.doris.common.jni.vec.NativeColumnValue;
 
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
-import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.DateObjectInspector;
-import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveDecimalObjectInspector;
-import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.TimestampObjectInspector;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
 
 import java.math.BigDecimal;
 import java.math.BigInteger;
-import java.nio.charset.StandardCharsets;
 import java.time.Instant;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.time.ZoneId;
 import java.util.List;
 
-public class HudiColumnValue implements ColumnValue {
-    public enum TimeUnit {
-        MILLIS, MICROS
-    }
+public class HudiColumnValue implements ColumnValue, NativeColumnValue {
+    private boolean isUnsafe;
+    private InternalRow internalRow;
+    private int ordinal;
+    private int precision;
+    private int scale;
 
-    private final Object fieldData;
-    private final ObjectInspector fieldInspector;
-    private final TimeUnit timeUnit;
+    HudiColumnValue() {
+    }
 
-    HudiColumnValue(ObjectInspector fieldInspector, Object fieldData) {
-        this(fieldInspector, fieldData, TimeUnit.MICROS);
+    HudiColumnValue(InternalRow internalRow, int ordinal, int precision, int 
scale) {
+        this.isUnsafe = internalRow instanceof UnsafeRow;
+        this.internalRow = internalRow;
+        this.ordinal = ordinal;
+        this.precision = precision;
+        this.scale = scale;
     }
 
-    HudiColumnValue(ObjectInspector fieldInspector, Object fieldData, int 
timePrecision) {
-        this(fieldInspector, fieldData, timePrecision == 3 ? TimeUnit.MILLIS : 
TimeUnit.MICROS);
+    public void reset(InternalRow internalRow, int ordinal, int precision, int 
scale) {
+        this.isUnsafe = internalRow instanceof UnsafeRow;
+        this.internalRow = internalRow;
+        this.ordinal = ordinal;
+        this.precision = precision;
+        this.scale = scale;
     }
 
-    HudiColumnValue(ObjectInspector fieldInspector, Object fieldData, TimeUnit 
timeUnit) {
-        this.fieldInspector = fieldInspector;
-        this.fieldData = fieldData;
-        this.timeUnit = timeUnit;
+    public void reset(int ordinal, int precision, int scale) {
+        this.ordinal = ordinal;
+        this.precision = precision;
+        this.scale = scale;
     }
 
-    private Object inspectObject() {
-        return ((PrimitiveObjectInspector) 
fieldInspector).getPrimitiveJavaObject(fieldData);
+    public void reset(InternalRow internalRow) {
+        this.isUnsafe = internalRow instanceof UnsafeRow;
+        this.internalRow = internalRow;
     }
 
     @Override
@@ -72,96 +77,89 @@ public class HudiColumnValue implements ColumnValue {
 
     @Override
     public boolean isNull() {
-        return false;
+        return internalRow.isNullAt(ordinal);
     }
 
     @Override
     public boolean getBoolean() {
-        return (boolean) inspectObject();
+        return internalRow.getBoolean(ordinal);
     }
 
     @Override
     public byte getByte() {
-        return 0;
+        return internalRow.getByte(ordinal);
     }
 
     @Override
     public short getShort() {
-        return (short) inspectObject();
+        return internalRow.getShort(ordinal);
     }
 
     @Override
     public int getInt() {
-        return (int) inspectObject();
+        return internalRow.getInt(ordinal);
     }
 
     @Override
     public float getFloat() {
-        return (float) inspectObject();
+        return internalRow.getFloat(ordinal);
     }
 
     @Override
     public long getLong() {
-        return (long) inspectObject();
+        return internalRow.getLong(ordinal);
     }
 
     @Override
     public double getDouble() {
-        return (double) inspectObject();
+        return internalRow.getDouble(ordinal);
     }
 
     @Override
     public BigInteger getBigInteger() {
-        throw new UnsupportedOperationException("Hudi type does not support 
largeint");
+        throw new UnsupportedOperationException("Hoodie type does not support 
largeint");
     }
 
     @Override
     public BigDecimal getDecimal() {
-        return ((HiveDecimalObjectInspector) 
fieldInspector).getPrimitiveJavaObject(fieldData).bigDecimalValue();
+        return internalRow.getDecimal(ordinal, precision, 
scale).toJavaBigDecimal();
     }
 
     @Override
     public String getString() {
-        return inspectObject().toString();
+        return internalRow.getUTF8String(ordinal).toString();
+    }
+
+    @Override
+    public byte[] getStringAsBytes() {
+        return internalRow.getUTF8String(ordinal).getBytes();
     }
 
     @Override
     public LocalDate getDate() {
-        return ((DateObjectInspector) 
fieldInspector).getPrimitiveJavaObject(fieldData).toLocalDate();
+        return LocalDate.ofEpochDay(internalRow.getInt(ordinal));
     }
 
     @Override
     public LocalDateTime getDateTime() {
-        if (fieldData instanceof LongWritable) {
-            long datetime = ((LongWritable) fieldData).get();
-            long seconds;
-            long nanoseconds;
-            if (timeUnit == TimeUnit.MILLIS) {
-                seconds = datetime / 1000;
-                nanoseconds = (datetime % 1000) * 1000000;
-            } else {
-                seconds = datetime / 1000000;
-                nanoseconds = (datetime % 1000000) * 1000;
-            }
-            return LocalDateTime.ofInstant(Instant.ofEpochSecond(seconds, 
nanoseconds), ZoneId.systemDefault());
+        long datetime = internalRow.getLong(ordinal);
+        long seconds;
+        long nanoseconds;
+        if (precision == 3) {
+            seconds = datetime / 1000;
+            nanoseconds = (datetime % 1000) * 1000000;
+        } else if (precision == 6) {
+            seconds = datetime / 1000000;
+            nanoseconds = (datetime % 1000000) * 1000;
         } else {
-            return ((TimestampObjectInspector) 
fieldInspector).getPrimitiveJavaObject(fieldData).toLocalDateTime();
+            throw new RuntimeException("Hoodie timestamp only support 
milliseconds and microseconds");
         }
+        return LocalDateTime.ofInstant(Instant.ofEpochSecond(seconds, 
nanoseconds), ZoneId.systemDefault());
     }
 
     @Override
     public byte[] getBytes() {
-        // Get bytes directly if fieldData is BytesWritable or Text to avoid 
decoding&encoding
-        if (fieldData instanceof BytesWritable) {
-            return ((BytesWritable) fieldData).getBytes();
-        }
-        if (fieldData instanceof Text) {
-            return ((Text) fieldData).getBytes();
-        }
-        if (fieldData instanceof String) {
-            return ((String) fieldData).getBytes(StandardCharsets.UTF_8);
-        }
-        return (byte[]) inspectObject();
+        return internalRow.getBinary(ordinal);
     }
 
     @Override
@@ -179,4 +177,23 @@ public class HudiColumnValue implements ColumnValue {
 
     }
 
+    @Override
+    public NativeValue getNativeValue(ColumnType.Type type) {
+        if (isUnsafe) {
+            UnsafeRow unsafeRow = (UnsafeRow) internalRow;
+            switch (type) {
+                case CHAR:
+                case VARCHAR:
+                case BINARY:
+                case STRING:
+                    long offsetAndSize = unsafeRow.getLong(ordinal);
+                    int offset = (int) (offsetAndSize >> 32);
+                    int size = (int) offsetAndSize;
+                    return new NativeValue(unsafeRow.getBaseObject(), offset, 
size);
+                default:
+                    return null;
+            }
+        }
+        return null;
+    }
 }
diff --git 
a/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiJniScanner.java
 
b/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiJniScanner.java
index bdc4960ec1..d067493a79 100644
--- 
a/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiJniScanner.java
+++ 
b/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiJniScanner.java
@@ -19,33 +19,21 @@ package org.apache.doris.hudi;
 
 
 import org.apache.doris.common.jni.JniScanner;
-import org.apache.doris.common.jni.vec.ColumnValue;
-import org.apache.doris.common.jni.vec.TableSchema;
+import org.apache.doris.common.jni.vec.ColumnType;
+import org.apache.doris.common.jni.vec.ScanPredicate;
 
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.serde2.Deserializer;
-import org.apache.hadoop.hive.serde2.objectinspector.StructField;
-import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-import org.apache.hadoop.io.ArrayWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hudi.common.model.HoodieLogFile;
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.hadoop.realtime.HoodieRealtimeFileSplit;
 import org.apache.log4j.Logger;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.sources.Filter;
+import scala.collection.Iterator;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.Properties;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -57,118 +45,44 @@ import java.util.stream.Collectors;
  */
 public class HudiJniScanner extends JniScanner {
     private static final Logger LOG = Logger.getLogger(HudiJniScanner.class);
-    private final HudiScanParam hudiScanParam;
 
-    UserGroupInformation ugi = null;
-    private RecordReader<NullWritable, ArrayWritable> reader;
-    private StructObjectInspector rowInspector;
-    private Deserializer deserializer;
+    private final int fetchSize;
+    private final HoodieSplit split;
+    private final ScanPredicate[] predicates;
     private final ClassLoader classLoader;
+    private final UserGroupInformation ugi;
 
     private long getRecordReaderTimeNs = 0;
+    private Iterator<InternalRow> recordIterator;
 
     public HudiJniScanner(int fetchSize, Map<String, String> params) {
         if (LOG.isDebugEnabled()) {
             LOG.debug("Hudi JNI params:\n" + params.entrySet().stream().map(kv 
-> kv.getKey() + "=" + kv.getValue())
                     .collect(Collectors.joining("\n")));
         }
-        this.hudiScanParam = new HudiScanParam(fetchSize, params);
         this.classLoader = this.getClass().getClassLoader();
-    }
-
-
-    @Override
-    public void open() throws IOException {
-        try {
-            initTableInfo(hudiScanParam.getRequiredTypes(), 
hudiScanParam.getRequiredFields(), null,
-                    hudiScanParam.getFetchSize());
-            Properties properties = hudiScanParam.createProperties();
-            JobConf jobConf = HudiScanUtils.createJobConf(properties);
-            ugi = Utils.getUserGroupInformation(jobConf);
-            init(jobConf, properties);
-        } catch (Exception e) {
-            close();
-            throw new IOException("Failed to open the hudi reader.", e);
-        }
-    }
-
-    @Override
-    public void close() throws IOException {
-        try {
-            if (reader != null) {
-                reader.close();
-            }
-        } catch (IOException e) {
-            LOG.error("Failed to close the hudi reader.", e);
-            throw e;
-        }
-    }
-
-    @Override
-    public int getNext() throws IOException {
-        try {
-            NullWritable key = reader.createKey();
-            ArrayWritable value = reader.createValue();
-
-            int readRowNumbers = 0;
-            while (readRowNumbers < getBatchSize()) {
-                if (!reader.next(key, value)) {
-                    break;
-                }
-                Object rowData = deserializer.deserialize(value);
-
-                for (int i = 0; i < hudiScanParam.getRequiredFields().length; 
i++) {
-                    Object fieldData = 
rowInspector.getStructFieldData(rowData, hudiScanParam.getStructFields()[i]);
-                    if (fieldData == null) {
-                        appendData(i, null);
-                    } else {
-                        ColumnValue fieldValue = new 
HudiColumnValue(hudiScanParam.getFieldInspectors()[i], fieldData,
-                                
hudiScanParam.getRequiredTypes()[i].getPrecision());
-                        appendData(i, fieldValue);
-                    }
-                }
-                readRowNumbers++;
+        String predicatesAddressString = params.remove("push_down_predicates");
+        this.fetchSize = fetchSize;
+        this.split = new HoodieSplit(params);
+        if (predicatesAddressString == null) {
+            predicates = new ScanPredicate[0];
+        } else {
+            long predicatesAddress = Long.parseLong(predicatesAddressString);
+            if (predicatesAddress != 0) {
+                predicates = 
ScanPredicate.parseScanPredicates(predicatesAddress, split.requiredTypes());
+                LOG.info("HudiJniScanner gets pushed-down predicates:  " + 
ScanPredicate.dump(predicates));
+            } else {
+                predicates = new ScanPredicate[0];
             }
-            return readRowNumbers;
-        } catch (Exception e) {
-            close();
-            throw new IOException("Failed to get the next batch of hudi.", e);
         }
+        ugi = Utils.getUserGroupInformation(split.hadoopConf());
     }
 
     @Override
-    protected TableSchema parseTableSchema() throws 
UnsupportedOperationException {
-        // do nothing
-        return null;
-    }
-
-    private void init(JobConf jobConf, Properties properties) throws Exception 
{
-        String basePath = hudiScanParam.getBasePath();
-        String dataFilePath = hudiScanParam.getDataFilePath();
-        long dataFileLength = hudiScanParam.getDataFileLength();
-        String[] deltaFilePaths = hudiScanParam.getDeltaFilePaths();
-        String[] requiredFields = hudiScanParam.getRequiredFields();
-
-        String realtimePath = dataFilePath.isEmpty() ? deltaFilePaths[0] : 
dataFilePath;
-        long realtimeLength = dataFileLength > 0 ? dataFileLength : 0;
-
-        Path path = new Path(realtimePath);
-
-        FileSplit fileSplit = new FileSplit(path, 0, realtimeLength, 
(String[]) null);
-        List<HoodieLogFile> logFiles = 
Arrays.stream(deltaFilePaths).map(HoodieLogFile::new)
-                .collect(Collectors.toList());
-
-        FileSplit hudiSplit =
-                new HoodieRealtimeFileSplit(fileSplit, basePath, logFiles, 
hudiScanParam.getInstantTime(), false,
-                        Option.empty());
-
-        InputFormat<?, ?> inputFormatClass = 
HudiScanUtils.createInputFormat(jobConf, hudiScanParam.getInputFormat());
-
-        // 
org.apache.hudi.common.util.SerializationUtils$KryoInstantiator.newKryo
-        // throws error like `java.lang.IllegalArgumentException: classLoader 
cannot be null`.
-        // Set the default class loader
+    public void open() throws IOException {
         Thread.currentThread().setContextClassLoader(classLoader);
-
+        initTableInfo(split.requiredTypes(), split.requiredFields(), 
predicates, fetchSize);
+        long startTime = System.nanoTime();
         // RecordReader will use ProcessBuilder to start a hotspot process, 
which may be stuck,
         // so use another process to kill this stuck process.
         // TODO(gaoxin): better way to solve the stuck process?
@@ -190,31 +104,49 @@ public class HudiJniScanner extends JniScanner {
                 }
             }
         }, 100, 1000, TimeUnit.MILLISECONDS);
-
-        long startTime = System.nanoTime();
         if (ugi != null) {
-            reader = 
ugi.doAs((PrivilegedExceptionAction<RecordReader<NullWritable, ArrayWritable>>) 
() -> {
-                RecordReader<NullWritable, ArrayWritable> ugiReader
-                        = (RecordReader<NullWritable, ArrayWritable>) 
inputFormatClass.getRecordReader(hudiSplit,
-                        jobConf, Reporter.NULL);
-                return ugiReader;
-            });
+            try {
+                recordIterator = ugi.doAs(
+                        (PrivilegedExceptionAction<Iterator<InternalRow>>) () 
-> new MORSnapshotSplitReader(
+                                
split).buildScanIterator(split.requiredFields(), new Filter[0]));
+            } catch (InterruptedException e) {
+                throw new IOException(e);
+            }
         } else {
-            reader = (RecordReader<NullWritable, ArrayWritable>) 
inputFormatClass
-                    .getRecordReader(hudiSplit, jobConf, Reporter.NULL);
+            recordIterator = new MORSnapshotSplitReader(split)
+                    .buildScanIterator(split.requiredFields(), new Filter[0]);
         }
-        getRecordReaderTimeNs += System.nanoTime() - startTime;
         isKilled.set(true);
         executorService.shutdownNow();
+        getRecordReaderTimeNs += System.nanoTime() - startTime;
+    }
 
-        deserializer = HudiScanUtils.getDeserializer(jobConf, properties, 
hudiScanParam.getSerde());
-
-        rowInspector = (StructObjectInspector) 
deserializer.getObjectInspector();
+    @Override
+    public void close() throws IOException {
+        if (recordIterator instanceof Closeable) {
+            ((Closeable) recordIterator).close();
+        }
+    }
 
-        for (int i = 0; i < requiredFields.length; i++) {
-            StructField field = 
rowInspector.getStructFieldRef(requiredFields[i]);
-            hudiScanParam.getStructFields()[i] = field;
-            hudiScanParam.getFieldInspectors()[i] = 
field.getFieldObjectInspector();
+    @Override
+    public int getNext() throws IOException {
+        try {
+            int readRowNumbers = 0;
+            HudiColumnValue columnValue = new HudiColumnValue();
+            int numFields = split.requiredFields().length;
+            ColumnType[] columnTypes = split.requiredTypes();
+            while (readRowNumbers < fetchSize && recordIterator.hasNext()) {
+                columnValue.reset(recordIterator.next());
+                for (int i = 0; i < numFields; i++) {
+                    columnValue.reset(i, columnTypes[i].getPrecision(), 
columnTypes[i].getScale());
+                    appendData(i, columnValue);
+                }
+                readRowNumbers++;
+            }
+            return readRowNumbers;
+        } catch (Exception e) {
+            close();
+            throw new IOException("Failed to get the next batch of hudi.", e);
         }
     }
 
diff --git 
a/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiScanParam.java
 
b/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiScanParam.java
deleted file mode 100644
index 4343dd3f49..0000000000
--- 
a/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiScanParam.java
+++ /dev/null
@@ -1,194 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.hudi;
-
-
-import org.apache.doris.common.jni.vec.ColumnType;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.hive.serde.serdeConstants;
-import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.StructField;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
-import java.util.stream.Collectors;
-
-/**
- * The hudi scan param
- */
-public class HudiScanParam {
-    public static String HADOOP_FS_PREFIX = "hadoop_fs.";
-
-    private final int fetchSize;
-    private final String basePath;
-    private final String dataFilePath;
-    private final long dataFileLength;
-    private final String[] deltaFilePaths;
-    private final String hudiColumnNames;
-    private final String[] hudiColumnTypes;
-    private final String[] requiredFields;
-    private int[] requiredColumnIds;
-    private ColumnType[] requiredTypes;
-    private final String[] nestedFields;
-    private final String instantTime;
-    private final String serde;
-    private final String inputFormat;
-    private final ObjectInspector[] fieldInspectors;
-    private final StructField[] structFields;
-    private final Map<String, String> hadoopConf;
-
-    public HudiScanParam(int fetchSize, Map<String, String> params) {
-        this.fetchSize = fetchSize;
-        this.basePath = params.get("base_path");
-        this.dataFilePath = params.get("data_file_path");
-        this.dataFileLength = Long.parseLong(params.get("data_file_length"));
-        String deltaFilePaths = params.get("delta_file_paths");
-
-        if (StringUtils.isEmpty(deltaFilePaths)) {
-            this.deltaFilePaths = new String[0];
-        } else {
-            this.deltaFilePaths = deltaFilePaths.split(",");
-        }
-
-        this.hudiColumnNames = params.get("hudi_column_names");
-        this.hudiColumnTypes = params.get("hudi_column_types").split("#");
-        this.requiredFields = params.get("required_fields").split(",");
-        this.nestedFields = params.getOrDefault("nested_fields", 
"").split(",");
-        this.instantTime = params.get("instant_time");
-        this.serde = params.get("serde");
-        this.inputFormat = params.get("input_format");
-        this.fieldInspectors = new ObjectInspector[requiredFields.length];
-        this.structFields = new StructField[requiredFields.length];
-
-        hadoopConf = new HashMap<>();
-        for (Map.Entry<String, String> kv : params.entrySet()) {
-            if (kv.getKey().startsWith(HADOOP_FS_PREFIX)) {
-                
hadoopConf.put(kv.getKey().substring(HADOOP_FS_PREFIX.length()), kv.getValue());
-            }
-        }
-
-        parseRequiredColumns();
-    }
-
-    private void parseRequiredColumns() {
-        String[] hiveColumnNames = this.hudiColumnNames.split(",");
-        Map<String, Integer> hiveColumnNameToIndex = new HashMap<>();
-        Map<String, String> hiveColumnNameToType = new HashMap<>();
-        for (int i = 0; i < hiveColumnNames.length; i++) {
-            hiveColumnNameToIndex.put(hiveColumnNames[i], i);
-            hiveColumnNameToType.put(hiveColumnNames[i], 
this.hudiColumnTypes[i]);
-        }
-        requiredTypes = new ColumnType[requiredFields.length];
-        requiredColumnIds = new int[requiredFields.length];
-        for (int i = 0; i < requiredFields.length; i++) {
-            requiredColumnIds[i] = 
hiveColumnNameToIndex.get(requiredFields[i]);
-            String type = hiveColumnNameToType.get(requiredFields[i]);
-            requiredTypes[i] = ColumnType.parseType(requiredFields[i], type);
-        }
-    }
-
-    public Properties createProperties() {
-        Properties properties = new Properties();
-
-        properties.setProperty(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR,
-                
Arrays.stream(requiredColumnIds).mapToObj(String::valueOf).collect(Collectors.joining(",")));
-        
properties.setProperty(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, 
String.join(",", requiredFields));
-        properties.setProperty(HudiScanUtils.COLUMNS, hudiColumnNames);
-        // recover INT64 based timestamp mark to hive type, 
timestamp(3)/timestamp(6) => timestamp
-        properties.setProperty(HudiScanUtils.COLUMNS_TYPES,
-                Arrays.stream(hudiColumnTypes).map(type -> 
type.startsWith("timestamp(") ? "timestamp" : type).collect(
-                        Collectors.joining(",")));
-        properties.setProperty(serdeConstants.SERIALIZATION_LIB, serde);
-
-        for (Map.Entry<String, String> kv : hadoopConf.entrySet()) {
-            properties.setProperty(kv.getKey(), kv.getValue());
-        }
-
-        return properties;
-    }
-
-
-    public int getFetchSize() {
-        return fetchSize;
-    }
-
-    public String getBasePath() {
-        return basePath;
-    }
-
-    public String getDataFilePath() {
-        return dataFilePath;
-    }
-
-    public long getDataFileLength() {
-        return dataFileLength;
-    }
-
-    public String[] getDeltaFilePaths() {
-        return deltaFilePaths;
-    }
-
-    public String getHudiColumnNames() {
-        return hudiColumnNames;
-    }
-
-    public String[] getHudiColumnTypes() {
-        return hudiColumnTypes;
-    }
-
-    public String[] getRequiredFields() {
-        return requiredFields;
-    }
-
-    public int[] getRequiredColumnIds() {
-        return requiredColumnIds;
-    }
-
-    public ColumnType[] getRequiredTypes() {
-        return requiredTypes;
-    }
-
-    public String[] getNestedFields() {
-        return nestedFields;
-    }
-
-    public String getInstantTime() {
-        return instantTime;
-    }
-
-    public String getSerde() {
-        return serde;
-    }
-
-    public String getInputFormat() {
-        return inputFormat;
-    }
-
-    public ObjectInspector[] getFieldInspectors() {
-        return fieldInspectors;
-    }
-
-    public StructField[] getStructFields() {
-        return structFields;
-    }
-
-}
diff --git 
a/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiScanUtils.java
 
b/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiScanUtils.java
deleted file mode 100644
index 78fff52293..0000000000
--- 
a/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiScanUtils.java
+++ /dev/null
@@ -1,62 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.hudi;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.common.JavaUtils;
-import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
-import org.apache.hadoop.hive.serde2.Deserializer;
-import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.util.ReflectionUtils;
-
-import java.util.Properties;
-
-
-/**
- * The hudi scan utils
- */
-public class HudiScanUtils {
-
-    public static final String COLUMNS = "columns";
-    public static final String COLUMNS_TYPES = "columns.types";
-
-    public static JobConf createJobConf(Properties properties) {
-        JobConf jobConf = new JobConf(new Configuration());
-        jobConf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false);
-        properties.stringPropertyNames().forEach(name -> jobConf.set(name, 
properties.getProperty(name)));
-        return jobConf;
-    }
-
-    public static InputFormat<?, ?> createInputFormat(Configuration conf, 
String inputFormat) throws Exception {
-        Class<?> clazz = conf.getClassByName(inputFormat);
-        Class<? extends InputFormat<?, ?>> cls =
-                (Class<? extends InputFormat<?, ?>>) 
clazz.asSubclass(InputFormat.class);
-        return ReflectionUtils.newInstance(cls, conf);
-    }
-
-    public static Deserializer getDeserializer(Configuration configuration, 
Properties properties, String name)
-            throws Exception {
-        Class<? extends Deserializer> deserializerClass = Class.forName(name, 
true, JavaUtils.getClassLoader())
-                .asSubclass(Deserializer.class);
-        Deserializer deserializer = 
deserializerClass.getConstructor().newInstance();
-        deserializer.initialize(configuration, properties);
-        return deserializer;
-    }
-
-}
diff --git 
a/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/Utils.java
 
b/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/Utils.java
index c7f3ecf9da..9dcfacebb8 100644
--- 
a/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/Utils.java
+++ 
b/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/Utils.java
@@ -20,6 +20,7 @@ package org.apache.doris.hudi;
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
 import sun.management.VMManagement;
 
 import java.io.BufferedReader;
@@ -30,6 +31,7 @@ import java.lang.management.ManagementFactory;
 import java.lang.management.RuntimeMXBean;
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;
+import java.security.PrivilegedExceptionAction;
 import java.util.LinkedList;
 import java.util.List;
 
@@ -110,4 +112,23 @@ public class Utils {
             throw new RuntimeException("Couldn't kill process PID " + pid, e);
         }
     }
+
+    public static HoodieTableMetaClient getMetaClient(Configuration conf, 
String basePath) {
+        UserGroupInformation ugi = getUserGroupInformation(conf);
+        HoodieTableMetaClient metaClient;
+        if (ugi != null) {
+            try {
+                metaClient = ugi.doAs(
+                        (PrivilegedExceptionAction<HoodieTableMetaClient>) () 
-> HoodieTableMetaClient.builder()
+                                .setConf(conf).setBasePath(basePath).build());
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            } catch (InterruptedException e) {
+                throw new RuntimeException("Cannot get hudi client.", e);
+            }
+        } else {
+            metaClient = 
HoodieTableMetaClient.builder().setConf(conf).setBasePath(basePath).build();
+        }
+        return metaClient;
+    }
 }
diff --git 
a/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/BaseSplitReader.scala
 
b/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/BaseSplitReader.scala
new file mode 100644
index 0000000000..92dd663847
--- /dev/null
+++ 
b/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/BaseSplitReader.scala
@@ -0,0 +1,702 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.hudi
+
+import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache}
+import org.apache.avro.Schema
+import org.apache.avro.generic.GenericRecord
+import org.apache.doris.common.jni.vec.ColumnType
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.hbase.io.hfile.CacheConfig
+import org.apache.hudi.HoodieBaseRelation.{BaseFileReader, convertToAvroSchema}
+import org.apache.hudi.HoodieConversionUtils.toScalaOption
+import org.apache.hudi.avro.HoodieAvroUtils
+import org.apache.hudi.client.utils.SparkInternalSchemaConverter
+import org.apache.hudi.common.config.{ConfigProperty, HoodieMetadataConfig, 
SerializableConfiguration, TypedProperties}
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.model.{HoodieFileFormat, HoodieRecord}
+import org.apache.hudi.common.table.timeline.HoodieTimeline
+import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, 
TableSchemaResolver}
+import org.apache.hudi.common.util.ValidationUtils.checkState
+import org.apache.hudi.common.util.{ConfigUtils, StringUtils}
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter
+import org.apache.hudi.internal.schema.utils.{InternalSchemaUtils, SerDeHelper}
+import org.apache.hudi.internal.schema.{HoodieSchemaException, InternalSchema}
+import org.apache.hudi.io.storage.HoodieAvroHFileReader
+import org.apache.hudi.metadata.HoodieTableMetadataUtil
+import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, 
DataSourceWriteOptions, HoodieSparkConfUtils, HoodieTableSchema, 
HoodieTableState}
+import org.apache.log4j.Logger
+import org.apache.spark.sql.adapter.Spark3_2Adapter
+import org.apache.spark.sql.avro.{HoodieAvroSchemaConverters, 
HoodieSparkAvroSchemaConverters}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.{PartitionedFile, 
PartitioningUtils}
+import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
+import org.apache.spark.sql.hudi.HoodieSqlCommonUtils
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.{StringType, StructField, StructType}
+import org.apache.spark.sql.vectorized.ColumnarBatch
+import org.apache.spark.sql.{SQLContext, SparkSession, SparkSessionExtensions}
+import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.{SparkConf, SparkContext}
+
+import java.lang.reflect.Constructor
+import java.net.URI
+import java.util.Objects
+import java.util.concurrent.TimeUnit
+import java.{util => jutil}
+import scala.collection.JavaConverters._
+import scala.util.control.NonFatal
+import scala.util.{Failure, Success, Try}
+
+class DorisSparkAdapter extends Spark3_2Adapter {
+  override def getAvroSchemaConverters: HoodieAvroSchemaConverters = 
HoodieSparkAvroSchemaConverters
+}
+
+class HoodieSplit(private val params: jutil.Map[String, String]) {
+  val queryId: String = params.remove("query_id")
+  val basePath: String = params.remove("base_path")
+  val dataFilePath: String = params.remove("data_file_path")
+  val dataFileLength: Long = params.remove("data_file_length").toLong
+  val deltaFilePaths: Array[String] = {
+    val deltas = params.remove("delta_file_paths")
+    if (StringUtils.isNullOrEmpty(deltas)) new Array[String](0) else 
deltas.split(",")
+  }
+
+  val hudiColumnNames: Array[String] = 
params.remove("hudi_column_names").split(",")
+  val hudiColumnTypes: Map[String, String] = hudiColumnNames.zip(
+    params.remove("hudi_column_types").split("#")).toMap
+
+  val requiredFields: Array[String] = 
params.remove("required_fields").split(",")
+  val requiredTypes: Array[ColumnType] = requiredFields.map(
+    field => ColumnType.parseType(field, hudiColumnTypes(field)))
+
+  val nestedFields: Array[String] = {
+    val fields = params.remove("nested_fields")
+    if (StringUtils.isNullOrEmpty(fields)) new Array[String](0) else 
fields.split(",")
+  }
+  val instantTime: String = params.remove("instant_time")
+  val serde: String = params.remove("serde")
+  val inputFormat: String = params.remove("input_format")
+
+  val hadoopProperties: Map[String, String] = {
+    val properties = new jutil.HashMap[String, String]
+    val iterator = params.entrySet().iterator()
+    while (iterator.hasNext) {
+      val kv = iterator.next()
+      if (kv.getKey.startsWith(BaseSplitReader.HADOOP_CONF_PREFIX)) {
+        
properties.put(kv.getKey.substring(BaseSplitReader.HADOOP_CONF_PREFIX.length), 
kv.getValue)
+        iterator.remove()
+      }
+    }
+    properties.asScala.toMap
+  }
+
+  lazy val hadoopConf: Configuration = {
+    val conf = new Configuration
+    hadoopProperties.foreach(kv => conf.set(kv._1, kv._2))
+    conf
+  }
+
+  // NOTE: In cases when Hive Metastore is used as catalog and the table is 
partitioned, schema in the HMS might contain
+  //       Hive-specific partitioning columns created specifically for HMS to 
handle partitioning appropriately. In that
+  //       case  we opt in to not be providing catalog's schema, and instead 
force Hudi relations to fetch the schema
+  //       from the table itself
+  val schemaSpec: Option[StructType] = None
+
+  val optParams: Map[String, String] = params.asScala.toMap
+
+  override def equals(obj: Any): Boolean = {
+    if (obj == null) {
+      return false
+    }
+    obj match {
+      case split: HoodieSplit =>
+        hashCode() == split.hashCode()
+      case _ => false
+    }
+  }
+
+  override def hashCode(): Int = {
+    Objects.hash(queryId, basePath)
+  }
+}
+
+case class HoodieTableInformation(sparkSession: SparkSession,
+                                  metaClient: HoodieTableMetaClient,
+                                  timeline: HoodieTimeline,
+                                  tableConfig: HoodieTableConfig,
+                                  tableAvroSchema: Schema,
+                                  internalSchemaOpt: Option[InternalSchema])
+
+/**
+ * Reference to Apache Hudi
+ * see <a 
href="https://github.com/apache/hudi/blob/release-0.13.0/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala";>HoodieBaseRelation</a>
+ */
+abstract class BaseSplitReader(val split: HoodieSplit) {
+
+  import BaseSplitReader._
+
+  protected val optParams: Map[String, String] = split.optParams
+
+  protected val tableInformation: HoodieTableInformation = cache.get(split)
+
+  protected val sparkSession: SparkSession = tableInformation.sparkSession
+  protected val sqlContext: SQLContext = sparkSession.sqlContext
+  imbueConfigs(sqlContext)
+
+  protected val tableConfig: HoodieTableConfig = tableInformation.tableConfig
+
+  // NOTE: Record key-field is assumed singular here due to the either of
+  //          - In case Hudi's meta fields are enabled: record key will be 
pre-materialized (stored) as part
+  //          of the record's payload (as part of the Hudi's metadata)
+  //          - In case Hudi's meta fields are disabled (virtual keys): in 
that case record has to bear _single field_
+  //          identified as its (unique) primary key w/in its payload (this is 
a limitation of [[SimpleKeyGenerator]],
+  //          which is the only [[KeyGenerator]] permitted for virtual-keys 
payloads)
+  protected lazy val recordKeyField: String =
+  if (tableConfig.populateMetaFields()) {
+    HoodieRecord.RECORD_KEY_METADATA_FIELD
+  } else {
+    val keyFields = tableConfig.getRecordKeyFields.get()
+    checkState(keyFields.length == 1)
+    keyFields.head
+  }
+
+  protected lazy val preCombineFieldOpt: Option[String] =
+    Option(tableConfig.getPreCombineField)
+      .orElse(optParams.get(DataSourceWriteOptions.PRECOMBINE_FIELD.key)) 
match {
+      // NOTE: This is required to compensate for cases when empty string is 
used to stub
+      //       property value to avoid it being set with the default value
+      // TODO(HUDI-3456) cleanup
+      case Some(f) if !StringUtils.isNullOrEmpty(f) => Some(f)
+      case _ => None
+    }
+
+  /**
+   * Columns that relation has to read from the storage to properly execute on 
its semantic: for ex,
+   * for Merge-on-Read tables key fields as well and pre-combine field 
comprise mandatory set of columns,
+   * meaning that regardless of whether this columns are being requested by 
the query they will be fetched
+   * regardless so that relation is able to combine records properly (if 
necessary)
+   *
+   * @VisibleInTests
+   */
+  val mandatoryFields: Seq[String]
+
+  /**
+   * NOTE: Initialization of teh following members is coupled on purpose to 
minimize amount of I/O
+   * required to fetch table's Avro and Internal schemas
+   */
+  protected lazy val (tableAvroSchema: Schema, internalSchemaOpt: 
Option[InternalSchema]) = {
+    (tableInformation.tableAvroSchema, tableInformation.internalSchemaOpt)
+  }
+
+  protected lazy val tableStructSchema: StructType = 
convertAvroSchemaToStructType(tableAvroSchema)
+
+  protected lazy val partitionColumns: Array[String] = 
tableConfig.getPartitionFields.orElse(Array.empty)
+
+  protected lazy val specifiedQueryTimestamp: Option[String] =
+    optParams.get(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key)
+      .map(HoodieSqlCommonUtils.formatQueryInstant)
+
+  private def queryTimestamp: Option[String] =
+    
specifiedQueryTimestamp.orElse(toScalaOption(timeline.lastInstant()).map(_.getTimestamp))
+
+  lazy val tableState: HoodieTableState = {
+    val recordMergerImpls = 
ConfigUtils.split2List(getConfigValue(HoodieWriteConfig.RECORD_MERGER_IMPLS)).asScala.toList
+    val recordMergerStrategy = 
getConfigValue(HoodieWriteConfig.RECORD_MERGER_STRATEGY,
+      
Option(tableInformation.metaClient.getTableConfig.getRecordMergerStrategy))
+    val configProperties = getConfigProperties(sparkSession, optParams)
+    val metadataConfig = HoodieMetadataConfig.newBuilder()
+      .fromProperties(configProperties)
+      .enable(configProperties.getBoolean(
+        HoodieMetadataConfig.ENABLE.key(), 
HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS)
+        && 
HoodieTableMetadataUtil.isFilesPartitionAvailable(tableInformation.metaClient))
+      .build()
+
+    // Subset of the state of table's configuration as of at the time of the 
query
+    HoodieTableState(
+      tablePath = split.basePath,
+      latestCommitTimestamp = queryTimestamp,
+      recordKeyField = recordKeyField,
+      preCombineFieldOpt = preCombineFieldOpt,
+      usesVirtualKeys = !tableConfig.populateMetaFields(),
+      recordPayloadClassName = tableConfig.getPayloadClass,
+      metadataConfig = metadataConfig,
+      recordMergerImpls = recordMergerImpls,
+      recordMergerStrategy = recordMergerStrategy
+    )
+  }
+
+  private def getConfigValue(config: ConfigProperty[String],
+                             defaultValueOption: Option[String] = 
Option.empty): String = {
+    optParams.getOrElse(config.key(),
+      sqlContext.getConf(config.key(), 
defaultValueOption.getOrElse(config.defaultValue())))
+  }
+
+  def imbueConfigs(sqlContext: SQLContext): Unit = {
+    
sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.filterPushdown",
 "true")
+    
sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.recordLevelFilter.enabled",
 "true")
+    // TODO(HUDI-3639) vectorized reader has to be disabled to make sure 
MORIncrementalRelation is working properly
+    
sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.enableVectorizedReader",
 "false")
+  }
+
+  def buildScanIterator(requiredColumns: Array[String], filters: 
Array[Filter]): Iterator[InternalRow] = {
+    // NOTE: PLEASE READ CAREFULLY BEFORE MAKING CHANGES
+    //       *Appending* additional columns to the ones requested by the 
caller is not a problem, as those
+    //       will be eliminated by the caller's projection;
+    //   (!) Please note, however, that it's critical to avoid _reordering_ of 
the requested columns as this
+    //       will break the upstream projection
+    val targetColumns: Array[String] = appendMandatoryColumns(requiredColumns)
+    // NOTE: We explicitly fallback to default table's Avro schema to make 
sure we avoid unnecessary Catalyst > Avro
+    //       schema conversion, which is lossy in nature (for ex, it doesn't 
preserve original Avro type-names) and
+    //       could have an effect on subsequent de-/serializing records in 
some exotic scenarios (when Avro unions
+    //       w/ more than 2 types are involved)
+    val sourceSchema = tableAvroSchema
+    val (requiredAvroSchema, requiredStructSchema, requiredInternalSchema) =
+      projectSchema(Either.cond(internalSchemaOpt.isDefined, 
internalSchemaOpt.get, sourceSchema), targetColumns)
+
+    val tableAvroSchemaStr = tableAvroSchema.toString
+    val tableSchema = HoodieTableSchema(tableStructSchema, tableAvroSchemaStr, 
internalSchemaOpt)
+    val requiredSchema = HoodieTableSchema(
+      requiredStructSchema, requiredAvroSchema.toString, 
Some(requiredInternalSchema))
+
+    composeIterator(tableSchema, requiredSchema, targetColumns, filters)
+  }
+
+  /**
+   * Composes iterator provided file split to read from, table and partition 
schemas, data filters to be applied
+   *
+   * @param tableSchema      target table's schema
+   * @param requiredSchema   projected schema required by the reader
+   * @param requestedColumns columns requested by the query
+   * @param filters          data filters to be applied
+   * @return instance of RDD (holding [[InternalRow]]s)
+   */
+  protected def composeIterator(tableSchema: HoodieTableSchema,
+                                requiredSchema: HoodieTableSchema,
+                                requestedColumns: Array[String],
+                                filters: Array[Filter]): Iterator[InternalRow]
+
+  private final def appendMandatoryColumns(requestedColumns: Array[String]): 
Array[String] = {
+    // For a nested field in mandatory columns, we should first get the 
root-level field, and then
+    // check for any missing column, as the requestedColumns should only 
contain root-level fields
+    // We should only append root-level field as well
+    val missing = mandatoryFields.map(col => 
HoodieAvroUtils.getRootLevelFieldName(col))
+      .filter(rootField => !requestedColumns.contains(rootField))
+    requestedColumns ++ missing
+  }
+
+  /**
+   * Projects provided schema by picking only required (projected) top-level 
columns from it
+   *
+   * @param tableSchema     schema to project (either of [[InternalSchema]] or 
Avro's [[Schema]])
+   * @param requiredColumns required top-level columns to be projected
+   */
+  def projectSchema(tableSchema: Either[Schema, InternalSchema],
+                    requiredColumns: Array[String]): (Schema, StructType, 
InternalSchema) = {
+    tableSchema match {
+      case Right(internalSchema) =>
+        checkState(!internalSchema.isEmptySchema)
+        val prunedInternalSchema = InternalSchemaUtils.pruneInternalSchema(
+          internalSchema, requiredColumns.toList.asJava)
+        val requiredAvroSchema = 
AvroInternalSchemaConverter.convert(prunedInternalSchema, "schema")
+        val requiredStructSchema = 
convertAvroSchemaToStructType(requiredAvroSchema)
+
+        (requiredAvroSchema, requiredStructSchema, prunedInternalSchema)
+
+      case Left(avroSchema) =>
+        val fieldMap = avroSchema.getFields.asScala.map(f => f.name() -> 
f).toMap
+        val requiredFields = requiredColumns.map { col =>
+          val f = fieldMap(col)
+          // We have to create a new [[Schema.Field]] since Avro schemas can't 
share field
+          // instances (and will throw "org.apache.avro.AvroRuntimeException: 
Field already used")
+          new Schema.Field(f.name(), f.schema(), f.doc(), f.defaultVal(), 
f.order())
+        }.toList
+        val requiredAvroSchema = Schema.createRecord(avroSchema.getName, 
avroSchema.getDoc,
+          avroSchema.getNamespace, avroSchema.isError, requiredFields.asJava)
+        val requiredStructSchema = 
convertAvroSchemaToStructType(requiredAvroSchema)
+
+        (requiredAvroSchema, requiredStructSchema, 
InternalSchema.getEmptyInternalSchema)
+    }
+  }
+
+  /**
+   * Converts Avro's [[Schema]] to Catalyst's [[StructType]]
+   */
+  protected def convertAvroSchemaToStructType(avroSchema: Schema): StructType 
= {
+    val schemaConverters = sparkAdapter.getAvroSchemaConverters
+    schemaConverters.toSqlType(avroSchema) match {
+      case (dataType, _) => dataType.asInstanceOf[StructType]
+    }
+  }
+
+  protected def tryPrunePartitionColumns(tableSchema: HoodieTableSchema,
+                                         requiredSchema: HoodieTableSchema): 
(StructType, HoodieTableSchema, HoodieTableSchema) = {
+    // Since schema requested by the caller might contain partition columns, 
we might need to
+    // prune it, removing all partition columns from it in case these columns 
are not persisted
+    // in the data files
+    //
+    // NOTE: This partition schema is only relevant to file reader to be able 
to embed
+    //       values of partition columns (hereafter referred to as partition 
values) encoded into
+    //       the partition path, and omitted from the data file, back into 
fetched rows;
+    //       Note that, by default, partition columns are not omitted 
therefore specifying
+    //       partition schema for reader is not required
+    if (shouldExtractPartitionValuesFromPartitionPath) {
+      val partitionSchema = StructType(partitionColumns.map(StructField(_, 
StringType)))
+      val prunedDataStructSchema = 
prunePartitionColumns(tableSchema.structTypeSchema)
+      val prunedRequiredSchema = 
prunePartitionColumns(requiredSchema.structTypeSchema)
+
+      (partitionSchema,
+        HoodieTableSchema(prunedDataStructSchema, 
convertToAvroSchema(prunedDataStructSchema).toString),
+        HoodieTableSchema(prunedRequiredSchema, 
convertToAvroSchema(prunedRequiredSchema).toString))
+    } else {
+      (StructType(Nil), tableSchema, requiredSchema)
+    }
+  }
+
+  /**
+   * Controls whether partition values (ie values of partition columns) should 
be
+   * <ol>
+   * <li>Extracted from partition path and appended to individual rows read 
from the data file (we
+   * delegate this to Spark's [[ParquetFileFormat]])</li>
+   * <li>Read from the data-file as is (by default Hudi persists all columns 
including partition ones)</li>
+   * </ol>
+   *
+   * This flag is only be relevant in conjunction with the usage of 
[["hoodie.datasource.write.drop.partition.columns"]]
+   * config, when Hudi will NOT be persisting partition columns in the data 
file, and therefore values for
+   * such partition columns (ie "partition values") will have to be parsed 
from the partition path, and appended
+   * to every row only in the fetched dataset.
+   *
+   * NOTE: Partition values extracted from partition path might be deviating 
from the values of the original
+   * partition columns: for ex, if originally as partition column was used 
column [[ts]] bearing epoch
+   * timestamp, which was used by [[TimestampBasedKeyGenerator]] to generate 
partition path of the format
+   * [["yyyy/mm/dd"]], appended partition value would bear the format verbatim 
as it was used in the
+   * partition path, meaning that string value of "2022/01/01" will be 
appended, and not its original
+   * representation
+   */
+  protected val shouldExtractPartitionValuesFromPartitionPath: Boolean = {
+    // Controls whether partition columns (which are the source for the 
partition path values) should
+    // be omitted from persistence in the data files. On the read path it 
affects whether partition values (values
+    // of partition columns) will be read from the data file or extracted from 
partition path
+
+    val shouldOmitPartitionColumns = 
tableInformation.tableConfig.shouldDropPartitionColumns && 
partitionColumns.nonEmpty
+    val shouldExtractPartitionValueFromPath =
+      
optParams.getOrElse(DataSourceReadOptions.EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH.key,
+        
DataSourceReadOptions.EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH.defaultValue.toString).toBoolean
+    shouldOmitPartitionColumns || shouldExtractPartitionValueFromPath
+  }
+
+  private def prunePartitionColumns(dataStructSchema: StructType): StructType =
+    StructType(dataStructSchema.filterNot(f => 
partitionColumns.contains(f.name)))
+
+  /**
+   * For enable hoodie.datasource.write.drop.partition.columns, need to create 
an InternalRow on partition values
+   * and pass this reader on parquet file. So that, we can query the partition 
columns.
+   */
+  protected def getPartitionColumnsAsInternalRow(): InternalRow = {
+    try {
+      if (shouldExtractPartitionValuesFromPartitionPath) {
+        val filePath = new Path(split.dataFilePath)
+        val relativePath = new URI(split.basePath).relativize(new 
URI(filePath.getParent.toString)).toString
+        val hiveStylePartitioningEnabled = 
tableConfig.getHiveStylePartitioningEnable.toBoolean
+        if (hiveStylePartitioningEnabled) {
+          val partitionSpec = PartitioningUtils.parsePathFragment(relativePath)
+          
InternalRow.fromSeq(partitionColumns.map(partitionSpec(_)).map(UTF8String.fromString))
+        } else {
+          if (partitionColumns.length == 1) {
+            InternalRow.fromSeq(Seq(UTF8String.fromString(relativePath)))
+          } else {
+            val parts = relativePath.split("/")
+            assert(parts.size == partitionColumns.length)
+            InternalRow.fromSeq(parts.map(UTF8String.fromString))
+          }
+        }
+      } else {
+        InternalRow.empty
+      }
+    } catch {
+      case NonFatal(e) =>
+        LOG.warn(s"Failed to get the right partition InternalRow for file: 
${split.dataFilePath}", e)
+        InternalRow.empty
+    }
+  }
+
+  /**
+   * Wrapper for `buildReaderWithPartitionValues` of [[ParquetFileFormat]] 
handling [[ColumnarBatch]],
+   * when Parquet's Vectorized Reader is used
+   *
+   * TODO move to HoodieBaseRelation, make private
+   */
+  private[hudi] def buildHoodieParquetReader(sparkSession: SparkSession,
+                                             dataSchema: StructType,
+                                             partitionSchema: StructType,
+                                             requiredSchema: StructType,
+                                             filters: Seq[Filter],
+                                             options: Map[String, String],
+                                             hadoopConf: Configuration,
+                                             appendPartitionValues: Boolean = 
false): PartitionedFile => Iterator[InternalRow] = {
+    val parquetFileFormat: ParquetFileFormat = 
sparkAdapter.createHoodieParquetFileFormat(appendPartitionValues).get
+    val readParquetFile: PartitionedFile => Iterator[Any] = 
parquetFileFormat.buildReaderWithPartitionValues(
+      sparkSession = sparkSession,
+      dataSchema = dataSchema,
+      partitionSchema = partitionSchema,
+      requiredSchema = requiredSchema,
+      filters = filters,
+      options = options,
+      hadoopConf = hadoopConf
+    )
+
+    file: PartitionedFile => {
+      val iter = readParquetFile(file)
+      iter.flatMap {
+        case r: InternalRow => Seq(r)
+        case b: ColumnarBatch => b.rowIterator().asScala
+      }
+    }
+  }
+
+  private def createHFileReader(spark: SparkSession,
+                                dataSchema: HoodieTableSchema,
+                                requiredDataSchema: HoodieTableSchema,
+                                filters: Seq[Filter],
+                                options: Map[String, String],
+                                hadoopConf: Configuration): PartitionedFile => 
Iterator[InternalRow] = {
+    partitionedFile => {
+      val reader = new HoodieAvroHFileReader(
+        hadoopConf, new Path(partitionedFile.filePath), new 
CacheConfig(hadoopConf))
+
+      val requiredRowSchema = requiredDataSchema.structTypeSchema
+      // NOTE: Schema has to be parsed at this point, since Avro's [[Schema]] 
aren't serializable
+      //       to be passed from driver to executor
+      val requiredAvroSchema = new 
Schema.Parser().parse(requiredDataSchema.avroSchemaStr)
+      val avroToRowConverter = 
AvroConversionUtils.createAvroToInternalRowConverter(
+        requiredAvroSchema, requiredRowSchema)
+
+      reader.getRecordIterator(requiredAvroSchema).asScala
+        .map(record => {
+          
avroToRowConverter.apply(record.getData.asInstanceOf[GenericRecord]).get
+        })
+    }
+  }
+
+  /**
+   * Returns file-reader routine accepting [[PartitionedFile]] and returning 
an [[Iterator]]
+   * over [[InternalRow]]
+   */
+  protected def createBaseFileReader(spark: SparkSession,
+                                     partitionSchema: StructType,
+                                     dataSchema: HoodieTableSchema,
+                                     requiredDataSchema: HoodieTableSchema,
+                                     filters: Seq[Filter],
+                                     options: Map[String, String],
+                                     hadoopConf: Configuration): 
BaseFileReader = {
+    val tableBaseFileFormat = tableConfig.getBaseFileFormat
+
+    // NOTE: PLEASE READ CAREFULLY
+    //       Lambda returned from this method is going to be invoked on the 
executor, and therefore
+    //       we have to eagerly initialize all of the readers even though only 
one specific to the type
+    //       of the file being read will be used. This is required to avoid 
serialization of the whole
+    //       relation (containing file-index for ex) and passing it to the 
executor
+    val (read: (PartitionedFile => Iterator[InternalRow]), schema: StructType) 
=
+    tableBaseFileFormat match {
+      case HoodieFileFormat.PARQUET =>
+        val parquetReader = buildHoodieParquetReader(
+          sparkSession = spark,
+          dataSchema = dataSchema.structTypeSchema,
+          partitionSchema = partitionSchema,
+          requiredSchema = requiredDataSchema.structTypeSchema,
+          filters = filters,
+          options = options,
+          hadoopConf = hadoopConf,
+          // We're delegating to Spark to append partition values to every row 
only in cases
+          // when these corresponding partition-values are not persisted w/in 
the data file itself
+          appendPartitionValues = shouldExtractPartitionValuesFromPartitionPath
+        )
+        // Since partition values by default are omitted, and not persisted 
w/in data-files by Spark,
+        // data-file readers (such as [[ParquetFileFormat]]) have to inject 
partition values while reading
+        // the data. As such, actual full schema produced by such reader is 
composed of
+        //    a) Data-file schema (projected or not)
+        //    b) Appended partition column values
+        val readerSchema = 
StructType(requiredDataSchema.structTypeSchema.fields ++ partitionSchema.fields)
+
+        (parquetReader, readerSchema)
+
+      case HoodieFileFormat.HFILE =>
+        val hfileReader = createHFileReader(
+          spark = spark,
+          dataSchema = dataSchema,
+          requiredDataSchema = requiredDataSchema,
+          filters = filters,
+          options = options,
+          hadoopConf = hadoopConf
+        )
+
+        (hfileReader, requiredDataSchema.structTypeSchema)
+
+      case _ => throw new UnsupportedOperationException(s"Base file format is 
not currently supported ($tableBaseFileFormat)")
+    }
+
+    BaseFileReader(
+      read = partitionedFile => {
+        val extension = FSUtils.getFileExtension(partitionedFile.filePath)
+        if (tableBaseFileFormat.getFileExtension.equals(extension)) {
+          read(partitionedFile)
+        } else {
+          throw new UnsupportedOperationException(s"Invalid base-file format 
($extension), expected ($tableBaseFileFormat)")
+        }
+      },
+      schema = schema
+    )
+  }
+
+  protected val timeline: HoodieTimeline = tableInformation.timeline
+
+  protected def embedInternalSchema(conf: Configuration, internalSchemaOpt: 
Option[InternalSchema]): Configuration = {
+    val internalSchema = 
internalSchemaOpt.getOrElse(InternalSchema.getEmptyInternalSchema)
+    val querySchemaString = SerDeHelper.toJson(internalSchema)
+    if (!StringUtils.isNullOrEmpty(querySchemaString)) {
+      val validCommits = 
timeline.getInstants.iterator.asScala.map(_.getFileName).mkString(",")
+      LOG.warn(s"Table valid commits: $validCommits")
+
+      conf.set(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA, 
SerDeHelper.toJson(internalSchema))
+      conf.set(SparkInternalSchemaConverter.HOODIE_TABLE_PATH, split.basePath)
+      conf.set(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST, 
validCommits)
+    }
+    conf
+  }
+}
+
+object SparkMockHelper {
+  private lazy val mockSparkContext = {
+    val conf = new SparkConf().setMaster("local").setAppName("mock_sc")
+      .set("spark.ui.enabled", "false")
+    val sc = new SparkContext(conf)
+    sc.setLogLevel("WARN")
+    sc
+  }
+
+  implicit class MockSparkSession(builder: SparkSession.Builder) {
+    def createMockSession(split: HoodieSplit): SparkSession = {
+      val sparkSessionClass = classOf[SparkSession]
+      val constructor: Constructor[SparkSession] = 
sparkSessionClass.getDeclaredConstructors
+        .find(_.getParameterCount == 
5).get.asInstanceOf[Constructor[SparkSession]]
+      constructor.setAccessible(true)
+      val ss = constructor.newInstance(mockSparkContext, None, None, new 
SparkSessionExtensions, Map.empty)
+      split.hadoopProperties.foreach(kv => 
ss.sessionState.conf.setConfString(kv._1, kv._2))
+      ss
+    }
+  }
+}
+
+object BaseSplitReader {
+
+  import SparkMockHelper.MockSparkSession
+
+  private val LOG = Logger.getLogger(BaseSplitReader.getClass)
+  val HADOOP_CONF_PREFIX = "hadoop_conf."
+
+  // Use [[SparkAdapterSupport]] instead ?
+  private lazy val sparkAdapter = new DorisSparkAdapter
+
+  private lazy val cache: LoadingCache[HoodieSplit, HoodieTableInformation] = {
+    val loader = new CacheLoader[HoodieSplit, HoodieTableInformation] {
+      override def load(split: HoodieSplit): HoodieTableInformation = {
+        // create mock spark session
+        val sparkSession = SparkSession.builder().createMockSession(split)
+        val metaClient = Utils.getMetaClient(split.hadoopConf, split.basePath)
+        // NOTE: We're including compaction here since it's not considering a 
"commit" operation
+        val timeline = 
metaClient.getCommitsAndCompactionTimeline.filterCompletedInstants
+
+        val specifiedQueryTimestamp: Option[String] =
+          
split.optParams.get(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key)
+            .map(HoodieSqlCommonUtils.formatQueryInstant)
+        val schemaResolver = new TableSchemaResolver(metaClient)
+        val internalSchemaOpt = if 
(!isSchemaEvolutionEnabledOnRead(split.optParams, sparkSession)) {
+          None
+        } else {
+          Try {
+            
specifiedQueryTimestamp.map(schemaResolver.getTableInternalSchemaFromCommitMetadata)
+              
.getOrElse(schemaResolver.getTableInternalSchemaFromCommitMetadata)
+          } match {
+            case Success(internalSchemaOpt) => toScalaOption(internalSchemaOpt)
+            case Failure(_) =>
+              None
+          }
+        }
+        val avroSchema: Schema = internalSchemaOpt.map { is =>
+          AvroInternalSchemaConverter.convert(is, "schema")
+        } orElse {
+          specifiedQueryTimestamp.map(schemaResolver.getTableAvroSchema)
+        } orElse {
+          split.schemaSpec.map(convertToAvroSchema)
+        } getOrElse {
+          Try(schemaResolver.getTableAvroSchema) match {
+            case Success(schema) => schema
+            case Failure(e) =>
+              throw new HoodieSchemaException("Failed to fetch schema from the 
table", e)
+          }
+        }
+
+        HoodieTableInformation(sparkSession,
+          metaClient,
+          timeline,
+          metaClient.getTableConfig,
+          avroSchema,
+          internalSchemaOpt)
+      }
+    }
+    CacheBuilder.newBuilder()
+      .expireAfterAccess(10, TimeUnit.MINUTES)
+      .maximumSize(4096)
+      .build(loader)
+  }
+
+  private def isSchemaEvolutionEnabledOnRead(optParams: Map[String, String], 
sparkSession: SparkSession): Boolean = {
+    // NOTE: Schema evolution could be configured both t/h optional parameters 
vehicle as well as
+    //       t/h Spark Session configuration (for ex, for Spark SQL)
+    optParams.getOrElse(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key,
+      
DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.defaultValue.toString).toBoolean 
||
+      
sparkSession.sessionState.conf.getConfString(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key,
+        
DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.defaultValue.toString).toBoolean
+  }
+
+  private def getConfigProperties(spark: SparkSession, options: Map[String, 
String]) = {
+    val sqlConf: SQLConf = spark.sessionState.conf
+    val properties = new TypedProperties()
+    properties.putAll(options.filter(p => p._2 != null).asJava)
+
+    // TODO(HUDI-5361) clean up properties carry-over
+
+    // To support metadata listing via Spark SQL we allow users to pass the 
config via SQL Conf in spark session. Users
+    // would be able to run SET hoodie.metadata.enable=true in the spark sql 
session to enable metadata listing.
+    val isMetadataTableEnabled = HoodieSparkConfUtils.getConfigValue(options, 
sqlConf, HoodieMetadataConfig.ENABLE.key, null)
+    if (isMetadataTableEnabled != null) {
+      properties.setProperty(HoodieMetadataConfig.ENABLE.key(), 
String.valueOf(isMetadataTableEnabled))
+    }
+
+    val listingModeOverride = HoodieSparkConfUtils.getConfigValue(options, 
sqlConf,
+      DataSourceReadOptions.FILE_INDEX_LISTING_MODE_OVERRIDE.key, null)
+    if (listingModeOverride != null) {
+      
properties.setProperty(DataSourceReadOptions.FILE_INDEX_LISTING_MODE_OVERRIDE.key,
 listingModeOverride)
+    }
+
+    properties
+  }
+}
diff --git 
a/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/HoodieRecordIterator.scala
 
b/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/HoodieRecordIterator.scala
new file mode 100644
index 0000000000..c564565535
--- /dev/null
+++ 
b/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/HoodieRecordIterator.scala
@@ -0,0 +1,142 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.hudi
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hudi.HoodieBaseRelation.{BaseFileReader, projectReader}
+import org.apache.hudi.MergeOnReadSnapshotRelation.isProjectionCompatible
+import org.apache.hudi.{DataSourceReadOptions, HoodieMergeOnReadFileSplit, 
HoodieTableSchema, HoodieTableState, LogFileIterator, RecordMergingFileIterator}
+import 
org.apache.spark.sql.HoodieCatalystExpressionUtils.generateUnsafeProjection
+import org.apache.spark.sql.catalyst.InternalRow
+
+import java.io.Closeable
+
+/**
+ * Class holding base-file readers for 3 different use-cases:
+ *
+ * <ol>
+ * <li>Full-schema reader: is used when whole row has to be read to perform 
merging correctly.
+ * This could occur, when no optimizations could be applied and we have to 
fallback to read the whole row from
+ * the base file and the corresponding delta-log file to merge them 
correctly</li>
+ *
+ * <li>Required-schema reader: is used when it's fine to only read row's 
projected columns.
+ * This could occur, when row could be merged with corresponding delta-log 
record while leveraging only
+ * projected columns</li>
+ *
+ * <li>Required-schema reader (skip-merging): is used when when no merging 
will be performed (skip-merged).
+ * This could occur, when file-group has no delta-log files</li>
+ * </ol>
+ */
+private[hudi] case class HoodieMergeOnReadBaseFileReaders(fullSchemaReader: 
BaseFileReader,
+                                                          
requiredSchemaReader: BaseFileReader,
+                                                          
requiredSchemaReaderSkipMerging: BaseFileReader)
+
+/**
+ * Provided w/ instance of [[HoodieMergeOnReadFileSplit]], provides an 
iterator over all of the records stored in
+ * Base file as well as all of the Delta Log files simply returning 
concatenation of these streams, while not
+ * performing any combination/merging of the records w/ the same primary keys 
(ie producing duplicates potentially)
+ */
+private class SkipMergeIterator(split: HoodieMergeOnReadFileSplit,
+                                baseFileReader: BaseFileReader,
+                                dataSchema: HoodieTableSchema,
+                                requiredSchema: HoodieTableSchema,
+                                tableState: HoodieTableState,
+                                config: Configuration)
+  extends LogFileIterator(split, dataSchema, requiredSchema, tableState, 
config) {
+
+  private val requiredSchemaProjection = 
generateUnsafeProjection(baseFileReader.schema, structTypeSchema)
+
+  private val baseFileIterator = baseFileReader(split.dataFile.get)
+
+  override def doHasNext: Boolean = {
+    if (baseFileIterator.hasNext) {
+      // No merge is required, simply load current row and project into 
required schema
+      nextRecord = requiredSchemaProjection(baseFileIterator.next())
+      true
+    } else {
+      super[LogFileIterator].doHasNext
+    }
+  }
+}
+
+/**
+ * Reference to Apache Hudi
+ * see <a 
href="https://github.com/apache/hudi/blob/release-0.13.0/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala";>HoodieMergeOnReadRDD</a>
+ */
+class HoodieMORRecordIterator(config: Configuration,
+                              fileReaders: HoodieMergeOnReadBaseFileReaders,
+                              tableSchema: HoodieTableSchema,
+                              requiredSchema: HoodieTableSchema,
+                              tableState: HoodieTableState,
+                              mergeType: String,
+                              fileSplit: HoodieMergeOnReadFileSplit) extends 
Iterator[InternalRow] with Closeable {
+  protected val maxCompactionMemoryInBytes: Long = config.getLongBytes(
+    "hoodie.compaction.memory", 512 * 1024 * 1024)
+
+  protected val recordIterator: Iterator[InternalRow] = fileSplit match {
+    case dataFileOnlySplit if dataFileOnlySplit.logFiles.isEmpty =>
+      val projectedReader = 
projectReader(fileReaders.requiredSchemaReaderSkipMerging, 
requiredSchema.structTypeSchema)
+      projectedReader(dataFileOnlySplit.dataFile.get)
+
+    case logFileOnlySplit if logFileOnlySplit.dataFile.isEmpty =>
+      new LogFileIterator(logFileOnlySplit, tableSchema, requiredSchema, 
tableState, config)
+
+    case split => mergeType match {
+      case DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL =>
+        val reader = fileReaders.requiredSchemaReaderSkipMerging
+        new SkipMergeIterator(split, reader, tableSchema, requiredSchema, 
tableState, config)
+
+      case DataSourceReadOptions.REALTIME_PAYLOAD_COMBINE_OPT_VAL =>
+        val reader = pickBaseFileReader()
+        new RecordMergingFileIterator(split, reader, tableSchema, 
requiredSchema, tableState, config)
+
+      case _ => throw new UnsupportedOperationException(s"Not supported merge 
type ($mergeType)")
+    }
+  }
+
+  private def pickBaseFileReader(): BaseFileReader = {
+    // NOTE: This is an optimization making sure that even for MOR tables we 
fetch absolute minimum
+    //       of the stored data possible, while still properly executing 
corresponding relation's semantic
+    //       and meet the query's requirements.
+    //
+    //       Here we assume that iff queried table does use one of the 
standard (and whitelisted)
+    //       Record Payload classes then we can avoid reading and parsing the 
records w/ _full_ schema,
+    //       and instead only rely on projected one, nevertheless being able 
to perform merging correctly
+    if (isProjectionCompatible(tableState)) {
+      fileReaders.requiredSchemaReader
+    } else {
+      fileReaders.fullSchemaReader
+    }
+  }
+
+  override def hasNext: Boolean = {
+    recordIterator.hasNext
+  }
+
+  override def next(): InternalRow = {
+    recordIterator.next()
+  }
+
+  override def close(): Unit = {
+    recordIterator match {
+      case closeable: Closeable =>
+        closeable.close()
+      case _ =>
+    }
+  }
+}
diff --git 
a/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/MORSnapshotSplitReader.scala
 
b/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/MORSnapshotSplitReader.scala
new file mode 100644
index 0000000000..8ea9e1fcef
--- /dev/null
+++ 
b/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/MORSnapshotSplitReader.scala
@@ -0,0 +1,183 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.hudi
+
+import org.apache.hudi.HoodieBaseRelation.convertToAvroSchema
+import org.apache.hudi.avro.HoodieAvroUtils
+import org.apache.hudi.common.model.HoodieLogFile
+import org.apache.hudi.{DataSourceReadOptions, HoodieMergeOnReadFileSplit, 
HoodieTableSchema}
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.StructType
+
+/**
+ * Reference to Apache Hudi
+ * see <a 
href="https://github.com/apache/hudi/blob/release-0.13.0/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala";>MergeOnReadSnapshotRelation</a>
+ */
+class MORSnapshotSplitReader(override val split: HoodieSplit) extends 
BaseSplitReader(split) {
+  /**
+   * NOTE: These are the fields that are required to properly fulfil 
Merge-on-Read (MOR)
+   * semantic:
+   *
+   * <ol>
+   * <li>Primary key is required to make sure we're able to correlate records 
from the base
+   * file with the updated records from the delta-log file</li>
+   * <li>Pre-combine key is required to properly perform the combining (or 
merging) of the
+   * existing and updated records</li>
+   * </ol>
+   *
+   * However, in cases when merging is NOT performed (for ex, if file-group 
only contains base
+   * files but no delta-log files, or if the query-type is equal to 
[["skip_merge"]]) neither
+   * of primary-key or pre-combine-key are required to be fetched from storage 
(unless requested
+   * by the query), therefore saving on throughput
+   */
+  protected lazy val mandatoryFieldsForMerging: Seq[String] =
+    Seq(recordKeyField) ++ preCombineFieldOpt.map(Seq(_)).getOrElse(Seq())
+
+  override lazy val mandatoryFields: Seq[String] = mandatoryFieldsForMerging
+
+  protected val mergeType: String = 
optParams.getOrElse(DataSourceReadOptions.REALTIME_MERGE.key,
+    DataSourceReadOptions.REALTIME_MERGE.defaultValue)
+
+  override protected def composeIterator(tableSchema: HoodieTableSchema,
+                                         requiredSchema: HoodieTableSchema,
+                                         requestedColumns: Array[String],
+                                         filters: Array[Filter]): 
Iterator[InternalRow] = {
+    // todo: push down predicates about key field
+    val requiredFilters = Seq.empty
+    val optionalFilters = filters
+    val readers = createBaseFileReaders(tableSchema, requiredSchema, 
requestedColumns, requiredFilters, optionalFilters)
+
+    new HoodieMORRecordIterator(split.hadoopConf,
+      readers,
+      tableSchema,
+      requiredSchema,
+      tableState,
+      mergeType,
+      getFileSplit())
+  }
+
+  private def getFileSplit(): HoodieMergeOnReadFileSplit = {
+    val logFiles = split.deltaFilePaths.map(new HoodieLogFile(_))
+      
.sorted(Ordering.comparatorToOrdering(HoodieLogFile.getLogFileComparator)).toList
+    val partitionedBaseFile = if (split.dataFilePath.isEmpty) {
+      None
+    } else {
+      Some(PartitionedFile(getPartitionColumnsAsInternalRow(), 
split.dataFilePath, 0, split.dataFileLength))
+    }
+    HoodieMergeOnReadFileSplit(partitionedBaseFile, logFiles)
+  }
+
+  override def imbueConfigs(sqlContext: SQLContext): Unit = {
+    super.imbueConfigs(sqlContext)
+    
sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.enableVectorizedReader",
 "true")
+    // there's no thread local TaskContext, so the parquet reader will still 
use on heap memory even setting true
+    
sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.columnVector.offheap.enabled",
 "true")
+  }
+
+  protected def createBaseFileReaders(tableSchema: HoodieTableSchema,
+                                      requiredSchema: HoodieTableSchema,
+                                      requestedColumns: Array[String],
+                                      requiredFilters: Seq[Filter],
+                                      optionalFilters: Seq[Filter] = 
Seq.empty): HoodieMergeOnReadBaseFileReaders = {
+    val (partitionSchema, dataSchema, requiredDataSchema) =
+      tryPrunePartitionColumns(tableSchema, requiredSchema)
+
+    val fullSchemaReader = createBaseFileReader(
+      spark = sqlContext.sparkSession,
+      partitionSchema = partitionSchema,
+      dataSchema = dataSchema,
+      requiredDataSchema = dataSchema,
+      // This file-reader is used to read base file records, subsequently 
merging them with the records
+      // stored in delta-log files. As such, we have to read _all_ records 
from the base file, while avoiding
+      // applying any filtering _before_ we complete combining them w/ 
delta-log records (to make sure that
+      // we combine them correctly);
+      // As such only required filters could be pushed-down to such reader
+      filters = requiredFilters,
+      options = optParams,
+      // NOTE: We have to fork the Hadoop Config here as Spark will be 
modifying it
+      //       to configure Parquet reader appropriately
+      hadoopConf = embedInternalSchema(split.hadoopConf, internalSchemaOpt)
+    )
+
+    val requiredSchemaReader = createBaseFileReader(
+      spark = sqlContext.sparkSession,
+      partitionSchema = partitionSchema,
+      dataSchema = dataSchema,
+      requiredDataSchema = requiredDataSchema,
+      // This file-reader is used to read base file records, subsequently 
merging them with the records
+      // stored in delta-log files. As such, we have to read _all_ records 
from the base file, while avoiding
+      // applying any filtering _before_ we complete combining them w/ 
delta-log records (to make sure that
+      // we combine them correctly);
+      // As such only required filters could be pushed-down to such reader
+      filters = requiredFilters,
+      options = optParams,
+      // NOTE: We have to fork the Hadoop Config here as Spark will be 
modifying it
+      //       to configure Parquet reader appropriately
+      hadoopConf = embedInternalSchema(split.hadoopConf, 
requiredDataSchema.internalSchema)
+    )
+
+    // Check whether fields required for merging were also requested to be 
fetched
+    // by the query:
+    //    - In case they were, there's no optimization we could apply here (we 
will have
+    //    to fetch such fields)
+    //    - In case they were not, we will provide 2 separate file-readers
+    //        a) One which would be applied to file-groups w/ delta-logs 
(merging)
+    //        b) One which would be applied to file-groups w/ no delta-logs or
+    //           in case query-mode is skipping merging
+    val mandatoryColumns = 
mandatoryFieldsForMerging.map(HoodieAvroUtils.getRootLevelFieldName)
+    if (mandatoryColumns.forall(requestedColumns.contains)) {
+      HoodieMergeOnReadBaseFileReaders(
+        fullSchemaReader = fullSchemaReader,
+        requiredSchemaReader = requiredSchemaReader,
+        requiredSchemaReaderSkipMerging = requiredSchemaReader
+      )
+    } else {
+      val prunedRequiredSchema = {
+        val unusedMandatoryColumnNames = 
mandatoryColumns.filterNot(requestedColumns.contains)
+        val prunedStructSchema =
+          StructType(requiredDataSchema.structTypeSchema.fields
+            .filterNot(f => unusedMandatoryColumnNames.contains(f.name)))
+
+        HoodieTableSchema(prunedStructSchema, 
convertToAvroSchema(prunedStructSchema).toString)
+      }
+
+      val requiredSchemaReaderSkipMerging = createBaseFileReader(
+        spark = sqlContext.sparkSession,
+        partitionSchema = partitionSchema,
+        dataSchema = dataSchema,
+        requiredDataSchema = prunedRequiredSchema,
+        // This file-reader is only used in cases when no merging is 
performed, therefore it's safe to push
+        // down these filters to the base file readers
+        filters = requiredFilters ++ optionalFilters,
+        options = optParams,
+        // NOTE: We have to fork the Hadoop Config here as Spark will be 
modifying it
+        //       to configure Parquet reader appropriately
+        hadoopConf = embedInternalSchema(split.hadoopConf, 
requiredDataSchema.internalSchema)
+      )
+
+      HoodieMergeOnReadBaseFileReaders(
+        fullSchemaReader = fullSchemaReader,
+        requiredSchemaReader = requiredSchemaReader,
+        requiredSchemaReaderSkipMerging = requiredSchemaReaderSkipMerging
+      )
+    }
+  }
+}
diff --git 
a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/JniScanner.java
 
b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/JniScanner.java
index c45b2ac8e5..5031a01826 100644
--- 
a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/JniScanner.java
+++ 
b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/JniScanner.java
@@ -20,6 +20,7 @@ package org.apache.doris.common.jni;
 
 import org.apache.doris.common.jni.vec.ColumnType;
 import org.apache.doris.common.jni.vec.ColumnValue;
+import org.apache.doris.common.jni.vec.NativeColumnValue;
 import org.apache.doris.common.jni.vec.ScanPredicate;
 import org.apache.doris.common.jni.vec.TableSchema;
 import org.apache.doris.common.jni.vec.VectorTable;
@@ -45,7 +46,9 @@ public abstract class JniScanner {
     protected abstract int getNext() throws IOException;
 
     // parse table schema
-    protected abstract TableSchema parseTableSchema() throws 
UnsupportedOperationException;
+    protected TableSchema parseTableSchema() throws 
UnsupportedOperationException {
+        throw new UnsupportedOperationException();
+    }
 
     protected void initTableInfo(ColumnType[] requiredTypes, String[] 
requiredFields, ScanPredicate[] predicates,
             int batchSize) {
@@ -55,6 +58,10 @@ public abstract class JniScanner {
         this.batchSize = batchSize;
     }
 
+    protected void appendNativeData(int index, NativeColumnValue value) {
+        vectorTable.appendNativeData(index, value);
+    }
+
     protected void appendData(int index, ColumnValue value) {
         vectorTable.appendData(index, value);
     }
diff --git 
a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/MockJniScanner.java
 
b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/MockJniScanner.java
index fc2928f8ed..3557b3b903 100644
--- 
a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/MockJniScanner.java
+++ 
b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/MockJniScanner.java
@@ -21,7 +21,6 @@ package org.apache.doris.common.jni;
 import org.apache.doris.common.jni.vec.ColumnType;
 import org.apache.doris.common.jni.vec.ColumnValue;
 import org.apache.doris.common.jni.vec.ScanPredicate;
-import org.apache.doris.common.jni.vec.TableSchema;
 
 import org.apache.log4j.Logger;
 
@@ -111,6 +110,11 @@ public class MockJniScanner extends JniScanner {
             return "row-" + i + "-column-" + j;
         }
 
+        @Override
+        public byte[] getStringAsBytes() {
+            throw new UnsupportedOperationException();
+        }
+
         @Override
         public LocalDate getDate() {
             return LocalDate.now();
@@ -196,9 +200,4 @@ public class MockJniScanner extends JniScanner {
         readRows += rows;
         return rows;
     }
-
-    @Override
-    protected TableSchema parseTableSchema() throws 
UnsupportedOperationException {
-        return null;
-    }
 }
diff --git 
a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/vec/ColumnValue.java
 
b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/vec/ColumnValue.java
index fa2e268366..0d1c522f9c 100644
--- 
a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/vec/ColumnValue.java
+++ 
b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/vec/ColumnValue.java
@@ -55,6 +55,8 @@ public interface ColumnValue {
 
     String getString();
 
+    byte[] getStringAsBytes();
+
     LocalDate getDate();
 
     LocalDateTime getDateTime();
diff --git 
a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/vec/NativeColumnValue.java
 
b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/vec/NativeColumnValue.java
new file mode 100644
index 0000000000..8a0b4d2244
--- /dev/null
+++ 
b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/vec/NativeColumnValue.java
@@ -0,0 +1,48 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.common.jni.vec;
+
+/**
+ * Native types of data that can be directly copied.
+ */
+public interface NativeColumnValue {
+    public static class NativeValue {
+        public final Object baseObject;
+        public final long offset;
+        public final int length;
+
+        public NativeValue(Object baseObject, long offset) {
+            this.baseObject = baseObject;
+            this.offset = offset;
+            this.length = -1;
+        }
+
+        public NativeValue(Object baseObject, long offset, int length) {
+            this.baseObject = baseObject;
+            this.offset = offset;
+            this.length = length;
+        }
+    }
+
+    boolean isNull();
+
+    /**
+     * Return null if the type can't be copied directly
+     */
+    NativeValue getNativeValue(ColumnType.Type type);
+}
diff --git 
a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/vec/ScanPredicate.java
 
b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/vec/ScanPredicate.java
index 4553d29e18..e82f05c7d0 100644
--- 
a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/vec/ScanPredicate.java
+++ 
b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/vec/ScanPredicate.java
@@ -183,6 +183,11 @@ public class ScanPredicate {
             return toString();
         }
 
+        @Override
+        public byte[] getStringAsBytes() {
+            throw new UnsupportedOperationException();
+        }
+
         @Override
         public LocalDate getDate() {
             return LocalDate.now();
diff --git 
a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/vec/VectorColumn.java
 
b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/vec/VectorColumn.java
index f65ea8fae7..3998a1a327 100644
--- 
a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/vec/VectorColumn.java
+++ 
b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/vec/VectorColumn.java
@@ -21,6 +21,7 @@ package org.apache.doris.common.jni.vec;
 import org.apache.doris.common.jni.utils.OffHeap;
 import org.apache.doris.common.jni.utils.TypeNativeBytes;
 import org.apache.doris.common.jni.vec.ColumnType.Type;
+import org.apache.doris.common.jni.vec.NativeColumnValue.NativeValue;
 
 import java.math.BigDecimal;
 import java.math.BigInteger;
@@ -550,6 +551,38 @@ public class VectorColumn {
         }
     }
 
+    public void appendNativeValue(NativeColumnValue o) {
+        ColumnType.Type typeValue = columnType.getType();
+        if (o == null || o.isNull()) {
+            appendNull(typeValue);
+            return;
+        }
+        NativeValue nativeValue = o.getNativeValue(typeValue);
+        if (nativeValue == null) {
+            // can't get native value, fall back to materialized value
+            appendValue((ColumnValue) o);
+            return;
+        }
+        if (nativeValue.length == -1) {
+            // java origin types
+            long typeSize = typeValue.size;
+            reserve(appendIndex + 1);
+            OffHeap.copyMemory(nativeValue.baseObject, nativeValue.offset,
+                    null, data + typeSize * appendIndex, typeSize);
+            appendIndex++;
+        } else {
+            int byteLength = nativeValue.length;
+            VectorColumn bytesColumn = childColumns[0];
+            int startOffset = bytesColumn.appendIndex;
+            bytesColumn.reserve(startOffset + byteLength);
+            OffHeap.copyMemory(nativeValue.baseObject, nativeValue.offset,
+                    null, bytesColumn.data + startOffset, byteLength);
+            bytesColumn.appendIndex += byteLength;
+            OffHeap.putInt(null, offsets + 4L * appendIndex, startOffset + 
byteLength);
+            appendIndex++;
+        }
+    }
+
     public void appendValue(ColumnValue o) {
         ColumnType.Type typeValue = columnType.getType();
         if (o == null || o.isNull()) {
@@ -598,7 +631,7 @@ public class VectorColumn {
             case VARCHAR:
             case STRING:
                 if (o.canGetStringAsBytes()) {
-                    appendBytesAndOffset(o.getBytes());
+                    appendBytesAndOffset(o.getStringAsBytes());
                 } else {
                     appendStringAndOffset(o.getString());
                 }
diff --git 
a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/vec/VectorTable.java
 
b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/vec/VectorTable.java
index e70d8f683f..63b6f1ac2a 100644
--- 
a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/vec/VectorTable.java
+++ 
b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/vec/VectorTable.java
@@ -68,6 +68,11 @@ public class VectorTable {
         this.isRestoreTable = true;
     }
 
+    public void appendNativeData(int fieldId, NativeColumnValue o) {
+        assert (!isRestoreTable);
+        columns[fieldId].appendNativeValue(o);
+    }
+
     public void appendData(int fieldId, ColumnValue o) {
         assert (!isRestoreTable);
         columns[fieldId].appendValue(o);
diff --git 
a/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeColumnValue.java
 
b/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeColumnValue.java
index 9fa9eea9f6..5dfd5a0bcf 100644
--- 
a/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeColumnValue.java
+++ 
b/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeColumnValue.java
@@ -60,7 +60,7 @@ public class MaxComputeColumnValue implements ColumnValue {
 
     @Override
     public boolean canGetStringAsBytes() {
-        return false;
+        return true;
     }
 
     @Override
@@ -152,6 +152,14 @@ public class MaxComputeColumnValue implements ColumnValue {
         return v == null ? new String(new byte[0]) : v;
     }
 
+    @Override
+    public byte[] getStringAsBytes() {
+        skippedIfNull();
+        VarCharVector varcharCol = (VarCharVector) column;
+        byte[] v = varcharCol.getObject(idx++).getBytes();
+        return v == null ? new byte[0] : v;
+    }
+
     @Override
     public LocalDate getDate() {
         skippedIfNull();
diff --git 
a/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java
 
b/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java
index 6a3d519670..8f9b903afd 100644
--- 
a/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java
+++ 
b/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java
@@ -20,7 +20,6 @@ package org.apache.doris.maxcompute;
 import org.apache.doris.common.jni.JniScanner;
 import org.apache.doris.common.jni.vec.ColumnType;
 import org.apache.doris.common.jni.vec.ScanPredicate;
-import org.apache.doris.common.jni.vec.TableSchema;
 
 import com.aliyun.odps.Column;
 import com.aliyun.odps.OdpsType;
@@ -239,12 +238,6 @@ public class MaxComputeJniScanner extends JniScanner {
         return realRows;
     }
 
-    @Override
-    protected TableSchema parseTableSchema() throws 
UnsupportedOperationException {
-        // do nothing
-        return null;
-    }
-
     private int readVectors(int expectedRows) throws IOException {
         VectorSchemaRoot batch;
         int curReadRows = 0;
diff --git 
a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonColumnValue.java
 
b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonColumnValue.java
index e6ab3e76ac..5a037c2857 100644
--- 
a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonColumnValue.java
+++ 
b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonColumnValue.java
@@ -49,7 +49,7 @@ public class PaimonColumnValue implements ColumnValue {
 
     @Override
     public boolean canGetStringAsBytes() {
-        return false;
+        return true;
     }
 
     @Override
@@ -102,6 +102,11 @@ public class PaimonColumnValue implements ColumnValue {
         return record.getString(idx).toString();
     }
 
+    @Override
+    public byte[] getStringAsBytes() {
+        return record.getString(idx).toBytes();
+    }
+
     @Override
     public LocalDate getDate() {
         return Instant.ofEpochMilli(record.getTimestamp(idx, 3)
diff --git 
a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java
 
b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java
index f6cdc436b7..71965344a4 100644
--- 
a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java
+++ 
b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java
@@ -21,7 +21,6 @@ import org.apache.doris.common.jni.JniScanner;
 import org.apache.doris.common.jni.utils.OffHeap;
 import org.apache.doris.common.jni.vec.ColumnType;
 import org.apache.doris.common.jni.vec.ScanPredicate;
-import org.apache.doris.common.jni.vec.TableSchema;
 
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.log4j.Logger;
@@ -141,12 +140,6 @@ public class PaimonJniScanner extends JniScanner {
         return rows;
     }
 
-    @Override
-    protected TableSchema parseTableSchema() throws 
UnsupportedOperationException {
-        // do nothing
-        return null;
-    }
-
     private Catalog create(CatalogContext context) throws IOException {
         Path warehousePath = new 
Path(context.options().get(CatalogOptions.WAREHOUSE));
         FileIO fileIO;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/HudiUtils.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/HudiUtils.java
index 1b57641efe..0799f7b137 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/HudiUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/HudiUtils.java
@@ -65,6 +65,9 @@ public class HudiUtils {
                     int scale = ((LogicalTypes.Decimal) 
logicalType).getScale();
                     return String.format("decimal(%s,%s)", precision, scale);
                 } else {
+                    if (columnType == Schema.Type.BYTES) {
+                        return "binary";
+                    }
                     return "string";
                 }
             case ARRAY:


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to