This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 625961ee5ff branch-3.1: [fix](hudi) Remove spark hudi jni scanner 
#50394 (#52222)
625961ee5ff is described below

commit 625961ee5ffa2f46b82bcff0a8f39841ef6b1b7d
Author: Socrates <[email protected]>
AuthorDate: Tue Jun 24 23:05:02 2025 +0800

    branch-3.1: [fix](hudi) Remove spark hudi jni scanner #50394 (#52222)
    
    bp: #50394
---
 be/src/vec/exec/format/table/hudi_jni_reader.cpp   |  11 +-
 build.sh                                           |   6 +-
 fe/be-java-extensions/hudi-scanner/pom.xml         | 275 --------
 .../org/apache/doris/hudi/HudiColumnValue.java     | 187 ------
 .../java/org/apache/doris/hudi/HudiJniScanner.java | 229 -------
 .../src/main/java/org/apache/doris/hudi/Utils.java |  89 ---
 .../hudi-scanner/src/main/resources/package.xml    |  41 --
 .../org/apache/doris/hudi/BaseSplitReader.scala    | 734 ---------------------
 .../apache/doris/hudi/HoodieRecordIterator.scala   | 178 -----
 .../doris/hudi/MORIncrementalSplitReader.scala     |  86 ---
 .../apache/doris/hudi/MORSnapshotSplitReader.scala | 184 ------
 .../org/apache/doris/hudi/HudiJniScannerTest.java  |  31 -
 fe/be-java-extensions/pom.xml                      |   1 -
 .../doris/datasource/hudi/source/HudiScanNode.java |   2 -
 .../doris/datasource/hudi/source/HudiSplit.java    |   1 -
 .../java/org/apache/doris/qe/SessionVariable.java  |  14 -
 gensrc/thrift/PlanNodes.thrift                     |   2 +-
 .../hudi/test_hudi_incremental.groovy              |   1 -
 .../hudi/test_hudi_schema_evolution.groovy         |   1 -
 .../hudi/test_hudi_snapshot.groovy                 |   1 -
 .../hudi/test_hudi_timetravel.groovy               |   1 -
 21 files changed, 5 insertions(+), 2070 deletions(-)

diff --git a/be/src/vec/exec/format/table/hudi_jni_reader.cpp 
b/be/src/vec/exec/format/table/hudi_jni_reader.cpp
index eb88dda9512..f73f9a29225 100644
--- a/be/src/vec/exec/format/table/hudi_jni_reader.cpp
+++ b/be/src/vec/exec/format/table/hudi_jni_reader.cpp
@@ -72,15 +72,8 @@ HudiJniReader::HudiJniReader(const TFileScanRangeParams& 
scan_params,
         }
     }
 
-    if (_hudi_params.hudi_jni_scanner == "spark") {
-        _jni_connector = 
std::make_unique<JniConnector>("org/apache/doris/hudi/HudiJniScanner",
-                                                        params, 
required_fields);
-    } else {
-        // _hudi_params.hudi_jni_scanner == "hadoop"
-        // and default use hadoop hudi jni scanner
-        _jni_connector = std::make_unique<JniConnector>(
-                "org/apache/doris/hudi/HadoopHudiJniScanner", params, 
required_fields);
-    }
+    _jni_connector = 
std::make_unique<JniConnector>("org/apache/doris/hudi/HadoopHudiJniScanner",
+                                                    params, required_fields);
 }
 
 Status HudiJniReader::get_next_block(Block* block, size_t* read_rows, bool* 
eof) {
diff --git a/build.sh b/build.sh
index 2cf95d87326..594e990821f 100755
--- a/build.sh
+++ b/build.sh
@@ -66,7 +66,7 @@ Usage: $0 <options>
   Environment variables:
     USE_AVX2                    If the CPU does not support AVX2 instruction 
set, please set USE_AVX2=0. Default is ON.
     STRIP_DEBUG_INFO            If set STRIP_DEBUG_INFO=ON, the debug 
information in the compiled binaries will be stored separately in the 
'be/lib/debug_info' directory. Default is OFF.
-    DISABLE_BE_JAVA_EXTENSIONS  If set DISABLE_BE_JAVA_EXTENSIONS=ON, we will 
do not build binary with java-udf,hudi-scanner,jdbc-scanner and so on Default 
is OFF.
+    DISABLE_BE_JAVA_EXTENSIONS  If set DISABLE_BE_JAVA_EXTENSIONS=ON, we will 
do not build binary with java-udf,hadoop-hudi-scanner,jdbc-scanner and so on 
Default is OFF.
     DISABLE_JAVA_CHECK_STYLE    If set DISABLE_JAVA_CHECK_STYLE=ON, it will 
skip style check of java code in FE.
     DISABLE_BUILD_AZURE         If set DISABLE_BUILD_AZURE=ON, it will not 
build azure into BE.
   Eg.
@@ -83,7 +83,7 @@ Usage: $0 <options>
     $0 --be --fe                            build Backend, Frontend, Spark Dpp 
application and Java UDF library
     $0 --be --coverage                      build Backend with coverage enabled
     $0 --be --output PATH                   build Backend, the result will be 
output to PATH(relative paths are available)
-    $0 --be-extension-ignore avro-scanner   build be-java-extensions, choose 
which modules to ignore. Multiple modules separated by commas, like 
--be-extension-ignore avro-scanner,hudi-scanner
+    $0 --be-extension-ignore avro-scanner   build be-java-extensions, choose 
which modules to ignore. Multiple modules separated by commas, like 
--be-extension-ignore avro-scanner,hadoop-hudi-scanner
 
     USE_AVX2=0 $0 --be                      build Backend and not using AVX2 
instruction.
     USE_AVX2=0 STRIP_DEBUG_INFO=ON $0       build all and not using AVX2 
instruction, and strip the debug info for Backend
@@ -539,7 +539,6 @@ if [[ "${BUILD_HIVE_UDF}" -eq 1 ]]; then
 fi
 if [[ "${BUILD_BE_JAVA_EXTENSIONS}" -eq 1 ]]; then
     modules+=("fe-common")
-    modules+=("be-java-extensions/hudi-scanner")
     modules+=("be-java-extensions/hadoop-hudi-scanner")
     modules+=("be-java-extensions/java-common")
     modules+=("be-java-extensions/java-udf")
@@ -847,7 +846,6 @@ EOF
 
     extensions_modules=("java-udf")
     extensions_modules+=("jdbc-scanner")
-    extensions_modules+=("hudi-scanner")
     extensions_modules+=("hadoop-hudi-scanner")
     extensions_modules+=("paimon-scanner")
     extensions_modules+=("trino-connector-scanner")
diff --git a/fe/be-java-extensions/hudi-scanner/pom.xml 
b/fe/be-java-extensions/hudi-scanner/pom.xml
deleted file mode 100644
index c8f56e55a83..00000000000
--- a/fe/be-java-extensions/hudi-scanner/pom.xml
+++ /dev/null
@@ -1,275 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
-    <parent>
-        <artifactId>be-java-extensions</artifactId>
-        <groupId>org.apache.doris</groupId>
-        <version>${revision}</version>
-    </parent>
-    <modelVersion>4.0.0</modelVersion>
-    <artifactId>hudi-scanner</artifactId>
-
-    <properties>
-        <doris.home>${basedir}/../../</doris.home>
-        <fe_ut_parallel>1</fe_ut_parallel>
-        <scala.version>2.12.15</scala.version>
-        <scala.binary.version>2.12</scala.binary.version>
-        <avro.version>1.11.3</avro.version>
-    </properties>
-    
-    <dependencyManagement>
-        <dependencies>
-            <dependency>
-                <groupId>org.apache.avro</groupId>
-                <artifactId>avro</artifactId>
-                <version>${avro.version}</version>
-                <scope>provided</scope>
-                <exclusions>
-                    <exclusion>
-                        <groupId>org.apache.avro</groupId>
-                        <artifactId>avro-tools</artifactId>
-                    </exclusion>
-                </exclusions>
-            </dependency>
-        </dependencies>
-    </dependencyManagement>
-
-    <dependencies>
-        <dependency>
-            <groupId>org.scala-lang</groupId>
-            <artifactId>scala-library</artifactId>
-            <version>${scala.version}</version>
-            <scope>provided</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-common</artifactId>
-            <scope>provided</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.hudi</groupId>
-            <artifactId>hudi-spark-client</artifactId>
-            <version>${hudi.version}</version>
-            <scope>provided</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.hudi</groupId>
-            <artifactId>hudi-spark-common_${scala.binary.version}</artifactId>
-            <version>${hudi.version}</version>
-            <scope>provided</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.avro</groupId>
-            <artifactId>avro</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.hudi</groupId>
-            <artifactId>hudi-spark3-common</artifactId>
-            <version>${hudi.version}</version>
-            <scope>provided</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.hudi</groupId>
-            
<artifactId>${hudi-spark.version}_${scala.binary.version}</artifactId>
-            <version>${hudi.version}</version>
-            <scope>provided</scope>
-            <exclusions>
-                <exclusion>
-                    <artifactId>json4s-ast_2.11</artifactId>
-                    <groupId>org.json4s</groupId>
-                </exclusion>
-                <exclusion>
-                    <artifactId>json4s-core_2.11</artifactId>
-                    <groupId>org.json4s</groupId>
-                </exclusion>
-                <exclusion>
-                    <artifactId>json4s-jackson_2.11</artifactId>
-                    <groupId>org.json4s</groupId>
-                </exclusion>
-                <exclusion>
-                    <artifactId>json4s-scalap_2.11</artifactId>
-                    <groupId>org.json4s</groupId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.parquet</groupId>
-            <artifactId>parquet-avro</artifactId>
-            <scope>provided</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.antlr</groupId>
-            <artifactId>antlr4-runtime</artifactId>
-            <version>${antlr4.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.spark</groupId>
-            <artifactId>spark-core_${scala.binary.version}</artifactId>
-            <exclusions>
-                <exclusion>
-                    <groupId>javax.servlet</groupId>
-                    <artifactId>*</artifactId>
-                </exclusion>
-                <exclusion>
-                    <artifactId>jackson-module-scala_2.12</artifactId>
-                    <groupId>com.fasterxml.jackson.module</groupId>
-                </exclusion>
-                <exclusion>
-                    <artifactId>hadoop-client-api</artifactId>
-                    <groupId>org.apache.hadoop</groupId>
-                </exclusion>
-                <exclusion>
-                    <artifactId>hadoop-client-runtime</artifactId>
-                    <groupId>org.apache.hadoop</groupId>
-                </exclusion>
-            </exclusions>
-            <version>${spark.version}</version>
-            <scope>provided</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.spark</groupId>
-            <artifactId>spark-sql_${scala.binary.version}</artifactId>
-            <version>${spark.version}</version>
-            <scope>provided</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.spark</groupId>
-            <artifactId>spark-launcher_${scala.binary.version}</artifactId>
-            <version>${spark.version}</version>
-            <scope>provided</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.spark</groupId>
-            <artifactId>spark-catalyst_${scala.binary.version}</artifactId>
-            <version>${spark.version}</version>
-            <scope>provided</scope>
-        </dependency>
-        <dependency>
-            <!-- version of spark's jackson module is error -->
-            <groupId>com.fasterxml.jackson.module</groupId>
-            
<artifactId>jackson-module-scala_${scala.binary.version}</artifactId>
-            <version>${jackson.version}</version>
-            <scope>provided</scope>
-            <exclusions>
-                <exclusion>
-                    <groupId>com.google.guava</groupId>
-                    <artifactId>guava</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.doris</groupId>
-            <artifactId>java-common</artifactId>
-            <version>${project.version}</version>
-            <exclusions>
-                <exclusion>
-                    <groupId>org.apache.thrift</groupId>
-                    <artifactId>libthrift</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-    </dependencies>
-    <build>
-        <finalName>hudi-scanner</finalName>
-        <sourceDirectory>src/main/java</sourceDirectory>
-        <testSourceDirectory>src/test/java</testSourceDirectory>
-        <directory>${project.basedir}/target/</directory>
-        <resources>
-            <resource>
-                <directory>src/main/resources</directory>
-            </resource>
-        </resources>
-        <testResources>
-            <testResource>
-                <directory>src/test/resources</directory>
-            </testResource>
-        </testResources>
-
-        <plugins>
-            <plugin>
-                <groupId>net.alchim31.maven</groupId>
-                <artifactId>scala-maven-plugin</artifactId>
-                <version>4.7.2</version>
-                <executions>
-                    <execution>
-                        <goals>
-                            <goal>compile</goal>
-                            <goal>testCompile</goal>
-                        </goals>
-                    </execution>
-                </executions>
-                <configuration>
-                    <scalaVersion>${scala.version}</scalaVersion>
-                    <args>
-                        <arg>-unchecked</arg>
-                        <arg>-deprecation</arg>
-                        <arg>-feature</arg>
-                    </args>
-                </configuration>
-            </plugin>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-compiler-plugin</artifactId>
-                <executions>
-                    <execution>
-                        <id>default-compile</id>
-                        <phase>none</phase>
-                    </execution>
-                    <execution>
-                        <id>default-testCompile</id>
-                        <phase>none</phase>
-                    </execution>
-                    <execution>
-                        <id>java-compile</id>
-                        <goals>
-                            <goal>compile</goal>
-                            <goal>testCompile</goal>
-                        </goals>
-                        <phase>compile</phase>
-                    </execution>
-                </executions>
-            </plugin>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-assembly-plugin</artifactId>
-                <configuration>
-                    <descriptors>
-                        <descriptor>src/main/resources/package.xml</descriptor>
-                    </descriptors>
-                    <archive>
-                        <manifest>
-                            <mainClass></mainClass>
-                        </manifest>
-                    </archive>
-                </configuration>
-                <executions>
-                    <execution>
-                        <id>make-assembly</id>
-                        <phase>package</phase>
-                        <goals>
-                            <goal>single</goal>
-                        </goals>
-                    </execution>
-                </executions>
-            </plugin>
-        </plugins>
-    </build>
-</project>
diff --git 
a/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiColumnValue.java
 
b/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiColumnValue.java
deleted file mode 100644
index 1c489affe16..00000000000
--- 
a/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiColumnValue.java
+++ /dev/null
@@ -1,187 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.hudi;
-
-import org.apache.doris.common.jni.vec.ColumnType;
-import org.apache.doris.common.jni.vec.ColumnValue;
-
-import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.catalyst.expressions.SpecializedGetters;
-import org.apache.spark.sql.catalyst.util.ArrayData;
-import org.apache.spark.sql.catalyst.util.MapData;
-
-import java.math.BigDecimal;
-import java.math.BigInteger;
-import java.time.Instant;
-import java.time.LocalDate;
-import java.time.LocalDateTime;
-import java.time.ZoneId;
-import java.util.List;
-
-public class HudiColumnValue implements ColumnValue {
-    private SpecializedGetters data;
-    private int ordinal;
-    private ColumnType columnType;
-
-    HudiColumnValue() {
-    }
-
-    HudiColumnValue(SpecializedGetters data, int ordinal, ColumnType 
columnType) {
-        this.data = data;
-        this.ordinal = ordinal;
-        this.columnType = columnType;
-    }
-
-    public void reset(SpecializedGetters data, int ordinal, ColumnType 
columnType) {
-        this.data = data;
-        this.ordinal = ordinal;
-        this.columnType = columnType;
-    }
-
-    public void reset(int ordinal, ColumnType columnType) {
-        this.ordinal = ordinal;
-        this.columnType = columnType;
-    }
-
-    public void reset(SpecializedGetters data) {
-        this.data = data;
-    }
-
-    @Override
-    public boolean canGetStringAsBytes() {
-        return true;
-    }
-
-    @Override
-    public boolean isNull() {
-        return data.isNullAt(ordinal);
-    }
-
-    @Override
-    public boolean getBoolean() {
-        return data.getBoolean(ordinal);
-    }
-
-    @Override
-    public byte getByte() {
-        return data.getByte(ordinal);
-    }
-
-    @Override
-    public short getShort() {
-        return data.getShort(ordinal);
-    }
-
-    @Override
-    public int getInt() {
-        return data.getInt(ordinal);
-    }
-
-    @Override
-    public float getFloat() {
-        return data.getFloat(ordinal);
-    }
-
-    @Override
-    public long getLong() {
-        return data.getLong(ordinal);
-    }
-
-    @Override
-    public double getDouble() {
-        return data.getDouble(ordinal);
-    }
-
-    @Override
-    public BigInteger getBigInteger() {
-        throw new UnsupportedOperationException("Hoodie type does not support 
largeint");
-    }
-
-    @Override
-    public BigDecimal getDecimal() {
-        return data.getDecimal(ordinal, columnType.getPrecision(), 
columnType.getScale()).toJavaBigDecimal();
-    }
-
-    @Override
-    public String getString() {
-        return data.getUTF8String(ordinal).toString();
-    }
-
-    @Override
-    public byte[] getStringAsBytes() {
-        return data.getUTF8String(ordinal).getBytes();
-    }
-
-    @Override
-    public LocalDate getDate() {
-        return LocalDate.ofEpochDay(data.getInt(ordinal));
-    }
-
-    @Override
-    public LocalDateTime getDateTime() {
-        long datetime = data.getLong(ordinal);
-        long seconds;
-        long nanoseconds;
-        if (columnType.getPrecision() == 3) {
-            seconds = datetime / 1000;
-            nanoseconds = (datetime % 1000) * 1000000;
-        } else if (columnType.getPrecision() == 6) {
-            seconds = datetime / 1000000;
-            nanoseconds = (datetime % 1000000) * 1000;
-        } else {
-            throw new RuntimeException("Hoodie timestamp only support 
milliseconds and microseconds, wrong precision = "
-                    + columnType.getPrecision());
-        }
-        return LocalDateTime.ofInstant(Instant.ofEpochSecond(seconds, 
nanoseconds), ZoneId.systemDefault());
-    }
-
-    @Override
-    public byte[] getBytes() {
-        return data.getBinary(ordinal);
-    }
-
-    @Override
-    public void unpackArray(List<ColumnValue> values) {
-        ArrayData array = data.getArray(ordinal);
-        for (int i = 0; i < array.numElements(); ++i) {
-            values.add(new HudiColumnValue(array, i, 
columnType.getChildTypes().get(0)));
-        }
-    }
-
-    @Override
-    public void unpackMap(List<ColumnValue> keys, List<ColumnValue> values) {
-        MapData map = data.getMap(ordinal);
-        ArrayData key = map.keyArray();
-        for (int i = 0; i < key.numElements(); ++i) {
-            keys.add(new HudiColumnValue(key, i, 
columnType.getChildTypes().get(0)));
-        }
-        ArrayData value = map.valueArray();
-        for (int i = 0; i < value.numElements(); ++i) {
-            values.add(new HudiColumnValue(value, i, 
columnType.getChildTypes().get(1)));
-        }
-    }
-
-    @Override
-    public void unpackStruct(List<Integer> structFieldIndex, List<ColumnValue> 
values) {
-        // todo: support pruned struct fields
-        InternalRow struct = data.getStruct(ordinal, structFieldIndex.size());
-        for (int i : structFieldIndex) {
-            values.add(new HudiColumnValue(struct, i, 
columnType.getChildTypes().get(i)));
-        }
-    }
-}
diff --git 
a/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiJniScanner.java
 
b/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiJniScanner.java
deleted file mode 100644
index bc082e56732..00000000000
--- 
a/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiJniScanner.java
+++ /dev/null
@@ -1,229 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.hudi;
-
-import org.apache.doris.common.jni.JniScanner;
-import org.apache.doris.common.jni.vec.ColumnType;
-import org.apache.doris.common.security.authentication.AuthenticationConfig;
-import org.apache.doris.common.security.authentication.HadoopAuthenticator;
-
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import org.apache.avro.generic.GenericDatumReader;
-import org.apache.avro.util.WeakIdentityHashMap;
-import org.apache.log4j.Logger;
-import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.sources.Filter;
-import scala.collection.Iterator;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.lang.reflect.Field;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.stream.Collectors;
-
-/**
- * The hudi JniScanner
- */
-public class HudiJniScanner extends JniScanner {
-    private static final Logger LOG = Logger.getLogger(HudiJniScanner.class);
-
-    private final int fetchSize;
-    private final String debugString;
-    private final HoodieSplit split;
-    private final ClassLoader classLoader;
-
-    private long getRecordReaderTimeNs = 0;
-    private Iterator<InternalRow> recordIterator;
-
-    /**
-     * `GenericDatumReader` of avro is a thread local map, that stores 
`WeakIdentityHashMap`.
-     * `WeakIdentityHashMap` has cached the avro resolving decoder, and the 
cached resolver can only be cleaned when
-     * its avro schema is recycled and become a week reference. However, the 
behavior of the week reference queue
-     * of `WeakIdentityHashMap` is unpredictable. Secondly, the decoder is 
very memory intensive, the number of threads
-     * to call the thread local map cannot be too many.
-     * Two solutions:
-     * 1. Reduce the number of threads reading avro logs and keep the readers 
in a fixed thread pool.
-     * 2. Regularly cleaning the cached resolvers in the thread local map by 
reflection.
-     */
-    private static final AtomicLong lastUpdateTime = new 
AtomicLong(System.currentTimeMillis());
-    private static final long RESOLVER_TIME_OUT = 60000;
-    private static final ExecutorService avroReadPool;
-    private static ThreadLocal<WeakIdentityHashMap<?, ?>> AVRO_RESOLVER_CACHE;
-    private static final Map<Long, WeakIdentityHashMap<?, ?>> cachedResolvers 
= new ConcurrentHashMap<>();
-    private static final ReadWriteLock cleanResolverLock = new 
ReentrantReadWriteLock();
-    private static final ScheduledExecutorService cleanResolverService = 
Executors.newScheduledThreadPool(1);
-
-    static {
-        int numThreads = Math.max(Runtime.getRuntime().availableProcessors() * 
2, 4);
-        if (numThreads > 48) {
-            numThreads = Runtime.getRuntime().availableProcessors();
-        }
-        avroReadPool = Executors.newFixedThreadPool(numThreads,
-                new 
ThreadFactoryBuilder().setNameFormat("avro-log-reader-%d").build());
-        LOG.info("Create " + numThreads + " daemon threads to load avro logs");
-
-        Class<?> avroReader = GenericDatumReader.class;
-        try {
-            Field field = avroReader.getDeclaredField("RESOLVER_CACHE");
-            field.setAccessible(true);
-            AVRO_RESOLVER_CACHE = (ThreadLocal<WeakIdentityHashMap<?, ?>>) 
field.get(null);
-            LOG.info("Get the resolved cache for avro reader");
-        } catch (Exception e) {
-            AVRO_RESOLVER_CACHE = null;
-            LOG.warn("Failed to get the resolved cache for avro reader");
-        }
-
-        cleanResolverService.scheduleAtFixedRate(() -> {
-            cleanResolverLock.writeLock().lock();
-            try {
-                if (System.currentTimeMillis() - lastUpdateTime.get() > 
RESOLVER_TIME_OUT) {
-                    for (WeakIdentityHashMap<?, ?> solver : 
cachedResolvers.values()) {
-                        solver.clear();
-                    }
-                    lastUpdateTime.set(System.currentTimeMillis());
-                }
-            } finally {
-                cleanResolverLock.writeLock().unlock();
-            }
-        }, RESOLVER_TIME_OUT, RESOLVER_TIME_OUT, TimeUnit.MILLISECONDS);
-    }
-
-    public HudiJniScanner(int fetchSize, Map<String, String> params) {
-        debugString = params.entrySet().stream().map(kv -> kv.getKey() + "=" + 
kv.getValue())
-                .collect(Collectors.joining("\n"));
-        try {
-            this.classLoader = this.getClass().getClassLoader();
-            this.fetchSize = fetchSize;
-            this.split = new HoodieSplit(params);
-        } catch (Exception e) {
-            LOG.error("Failed to initialize hudi scanner, split params:\n" + 
debugString, e);
-            throw e;
-        }
-    }
-
-    @Override
-    public void open() throws IOException {
-        Future<?> avroFuture = avroReadPool.submit(() -> {
-            Thread.currentThread().setContextClassLoader(classLoader);
-            initTableInfo(split.requiredTypes(), split.requiredFields(), 
fetchSize);
-            long startTime = System.nanoTime();
-            // RecordReader will use ProcessBuilder to start a hotspot 
process, which may be stuck,
-            // so use another process to kill this stuck process.
-            // TODO(gaoxin): better way to solve the stuck process?
-            AtomicBoolean isKilled = new AtomicBoolean(false);
-            ScheduledExecutorService executorService = 
Executors.newScheduledThreadPool(1);
-            executorService.scheduleAtFixedRate(() -> {
-                if (!isKilled.get()) {
-                    synchronized (HudiJniScanner.class) {
-                        List<Long> pids = Utils.getChildProcessIds(
-                                Utils.getCurrentProcId());
-                        for (long pid : pids) {
-                            String cmd = Utils.getCommandLine(pid);
-                            if (cmd != null && 
cmd.contains("org.openjdk.jol.vm.sa.AttachMain")) {
-                                Utils.killProcess(pid);
-                                isKilled.set(true);
-                                LOG.info("Kill hotspot debugger process " + 
pid);
-                            }
-                        }
-                    }
-                }
-            }, 100, 1000, TimeUnit.MILLISECONDS);
-
-            cleanResolverLock.readLock().lock();
-            try {
-                lastUpdateTime.set(System.currentTimeMillis());
-                AuthenticationConfig authenticationConfig = 
AuthenticationConfig.getKerberosConfig(split.hadoopConf());
-                HadoopAuthenticator hadoopAuthenticator = HadoopAuthenticator
-                        .getHadoopAuthenticator(authenticationConfig);
-                if (split.incrementalRead()) {
-                    recordIterator = hadoopAuthenticator.doAs(() -> new 
MORIncrementalSplitReader(split)
-                            .buildScanIterator(new Filter[0]));
-                } else {
-                    recordIterator = hadoopAuthenticator.doAs(() -> new 
MORSnapshotSplitReader(split)
-                            .buildScanIterator(new Filter[0]));
-                }
-                if (AVRO_RESOLVER_CACHE != null && AVRO_RESOLVER_CACHE.get() 
!= null) {
-                    
cachedResolvers.computeIfAbsent(Thread.currentThread().getId(),
-                            threadId -> AVRO_RESOLVER_CACHE.get());
-                    AVRO_RESOLVER_CACHE.get().clear();
-                }
-            } catch (Exception e) {
-                LOG.error("Failed to open hudi scanner, split params:\n" + 
debugString, e);
-                throw new RuntimeException(e.getMessage(), e);
-            } finally {
-                cleanResolverLock.readLock().unlock();
-            }
-            isKilled.set(true);
-            executorService.shutdownNow();
-            getRecordReaderTimeNs += System.nanoTime() - startTime;
-        });
-        try {
-            avroFuture.get();
-        } catch (Exception e) {
-            throw new IOException(e.getMessage(), e);
-        }
-    }
-
-    @Override
-    public void close() throws IOException {
-        if (recordIterator instanceof Closeable) {
-            ((Closeable) recordIterator).close();
-        }
-        recordIterator = null;
-    }
-
-    @Override
-    public int getNext() throws IOException {
-        try {
-            int readRowNumbers = 0;
-            HudiColumnValue columnValue = new HudiColumnValue();
-            int numFields = split.requiredFields().length;
-            ColumnType[] columnTypes = split.requiredTypes();
-            while (readRowNumbers < fetchSize && recordIterator.hasNext()) {
-                columnValue.reset(recordIterator.next());
-                for (int i = 0; i < numFields; i++) {
-                    columnValue.reset(i, columnTypes[i]);
-                    appendData(i, columnValue);
-                }
-                readRowNumbers++;
-            }
-            return readRowNumbers;
-        } catch (Exception e) {
-            close();
-            LOG.error("Failed to get the next batch of hudi, split params:\n" 
+ debugString, e);
-            throw new IOException("Failed to get the next batch of hudi.", e);
-        }
-    }
-
-    @Override
-    public Map<String, String> getStatistics() {
-        return Collections.singletonMap("timer:GetRecordReaderTime", 
String.valueOf(getRecordReaderTimeNs));
-    }
-}
diff --git 
a/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/Utils.java
 
b/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/Utils.java
deleted file mode 100644
index c0fbec633e8..00000000000
--- 
a/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/Utils.java
+++ /dev/null
@@ -1,89 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.hudi;
-
-import org.apache.doris.common.security.authentication.AuthenticationConfig;
-import org.apache.doris.common.security.authentication.HadoopAuthenticator;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.lang.management.ManagementFactory;
-import java.util.LinkedList;
-import java.util.List;
-
-public class Utils {
-    public static long getCurrentProcId() {
-        try {
-            return ManagementFactory.getRuntimeMXBean().getPid();
-        } catch (Exception e) {
-            throw new RuntimeException("Couldn't find PID of current JVM 
process.", e);
-        }
-    }
-
-    public static List<Long> getChildProcessIds(long pid) {
-        try {
-            Process pgrep = (new ProcessBuilder("pgrep", "-P", 
String.valueOf(pid))).start();
-            BufferedReader reader = new BufferedReader(new 
InputStreamReader(pgrep.getInputStream()));
-            List<Long> result = new LinkedList<>();
-            String line;
-            while ((line = reader.readLine()) != null) {
-                result.add(Long.valueOf(line.trim()));
-            }
-            pgrep.waitFor();
-            return result;
-        } catch (Exception e) {
-            throw new RuntimeException("Couldn't get child processes of PID " 
+ pid, e);
-        }
-    }
-
-    public static String getCommandLine(long pid) {
-        try {
-            return FileUtils.readFileToString(new 
File(String.format("/proc/%d/cmdline", pid))).trim();
-        } catch (IOException e) {
-            return null;
-        }
-    }
-
-    public static void killProcess(long pid) {
-        try {
-            Process kill = (new ProcessBuilder("kill", "-9", 
String.valueOf(pid))).start();
-            kill.waitFor();
-        } catch (Exception e) {
-            throw new RuntimeException("Couldn't kill process PID " + pid, e);
-        }
-    }
-
-    public static HoodieTableMetaClient getMetaClient(Configuration conf, 
String basePath) {
-        HadoopStorageConfiguration hadoopStorageConfiguration = new 
HadoopStorageConfiguration(conf);
-        AuthenticationConfig authenticationConfig = 
AuthenticationConfig.getKerberosConfig(conf);
-        HadoopAuthenticator hadoopAuthenticator = 
HadoopAuthenticator.getHadoopAuthenticator(authenticationConfig);
-        try {
-            return hadoopAuthenticator.doAs(() -> 
HoodieTableMetaClient.builder()
-                    
.setConf(hadoopStorageConfiguration).setBasePath(basePath).build());
-        } catch (IOException e) {
-            throw new RuntimeException("Failed to get HoodieTableMetaClient", 
e);
-        }
-    }
-}
diff --git a/fe/be-java-extensions/hudi-scanner/src/main/resources/package.xml 
b/fe/be-java-extensions/hudi-scanner/src/main/resources/package.xml
deleted file mode 100644
index 4bbb2610603..00000000000
--- a/fe/be-java-extensions/hudi-scanner/src/main/resources/package.xml
+++ /dev/null
@@ -1,41 +0,0 @@
-<?xml version="1.0" encoding="UTF-8" ?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<assembly xmlns="http://maven.apache.org/ASSEMBLY/2.0.0";
-          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
-          xsi:schemaLocation="http://maven.apache.org/ASSEMBLY/2.0.0 
http://maven.apache.org/xsd/assembly-2.0.0.xsd";>
-    <id>jar-with-dependencies</id>
-    <formats>
-        <format>jar</format>
-    </formats>
-    <includeBaseDirectory>false</includeBaseDirectory>
-    <dependencySets>
-        <dependencySet>
-            <outputDirectory>/</outputDirectory>
-            <useProjectArtifact>true</useProjectArtifact>
-            <unpack>true</unpack>
-            <scope>runtime</scope>
-            <unpackOptions>
-                <excludes>
-                    <exclude>**/Log4j2Plugins.dat</exclude>
-                </excludes>
-            </unpackOptions>
-        </dependencySet>
-    </dependencySets>
-</assembly>
diff --git 
a/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/BaseSplitReader.scala
 
b/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/BaseSplitReader.scala
deleted file mode 100644
index fc8d74f9713..00000000000
--- 
a/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/BaseSplitReader.scala
+++ /dev/null
@@ -1,734 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.hudi
-
-import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache}
-import org.apache.avro.Schema
-import org.apache.avro.generic.GenericRecord
-import org.apache.doris.common.jni.vec.ColumnType
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.hbase.io.hfile.CacheConfig
-import org.apache.hudi.HoodieBaseRelation.{BaseFileReader, convertToAvroSchema}
-import org.apache.hudi.HoodieConversionUtils.toScalaOption
-import org.apache.hudi.avro.HoodieAvroUtils
-import org.apache.hudi.client.utils.SparkInternalSchemaConverter
-import org.apache.hudi.common.config.{ConfigProperty, HoodieMetadataConfig, 
TypedProperties}
-import org.apache.hudi.common.fs.FSUtils
-import org.apache.hudi.common.model.{HoodieFileFormat, HoodieRecord}
-import org.apache.hudi.common.table.timeline.HoodieTimeline
-import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, 
TableSchemaResolver}
-import org.apache.hudi.common.util.ValidationUtils.checkState
-import org.apache.hudi.common.util.{ConfigUtils, StringUtils}
-import org.apache.hudi.config.HoodieWriteConfig
-import org.apache.hudi.hadoop.fs.CachingPath
-import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter
-import org.apache.hudi.internal.schema.utils.{InternalSchemaUtils, SerDeHelper}
-import org.apache.hudi.internal.schema.{HoodieSchemaException, InternalSchema}
-import org.apache.hudi.io.hadoop.HoodieHBaseAvroHFileReader
-import org.apache.hudi.metadata.HoodieTableMetadataUtil
-import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, 
DataSourceWriteOptions, HoodieSparkConfUtils, HoodieTableSchema, 
HoodieTableState}
-import org.apache.hudi.storage.StoragePath;
-import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
-import org.apache.log4j.Logger
-import org.apache.spark.sql.adapter.Spark3_4Adapter
-import org.apache.spark.sql.avro.{HoodieAvroSchemaConverters, 
HoodieSparkAvroSchemaConverters}
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
-import org.apache.spark.sql.execution.datasources.{PartitionedFile, 
PartitioningUtils}
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.sources.Filter
-import org.apache.spark.sql.types.{StringType, StructField, StructType}
-import org.apache.spark.sql.vectorized.ColumnarBatch
-import org.apache.spark.sql.{SQLContext, SparkSession, SparkSessionExtensions}
-import org.apache.spark.unsafe.types.UTF8String
-import org.apache.spark.{SparkConf, SparkContext}
-
-import java.lang.reflect.Constructor
-import java.net.URI
-import java.util.Objects
-import java.util.concurrent.TimeUnit
-import java.{util => jutil}
-import scala.collection.JavaConverters._
-import scala.util.control.NonFatal
-import scala.util.{Failure, Success, Try}
-
-class DorisSparkAdapter extends Spark3_4Adapter {
-  override def getAvroSchemaConverters: HoodieAvroSchemaConverters = 
HoodieSparkAvroSchemaConverters
-}
-
-class HoodieSplit(private val params: jutil.Map[String, String]) {
-  val queryId: String = params.remove("query_id")
-  val basePath: String = params.remove("base_path")
-  val dataFilePath: String = params.remove("data_file_path")
-  val dataFileLength: Long = params.remove("data_file_length").toLong
-  val deltaFilePaths: Array[String] = {
-    val deltas = params.remove("delta_file_paths")
-    if (StringUtils.isNullOrEmpty(deltas)) new Array[String](0) else 
deltas.split(",")
-  }
-
-  val hudiColumnNames: Array[String] = 
params.remove("hudi_column_names").split(",")
-  val hudiColumnTypes: Map[String, String] = hudiColumnNames.zip(
-    params.remove("hudi_column_types").split("#")).toMap
-
-  val requiredFields: Array[String] = {
-    val readFields = 
params.remove("required_fields").split(",").filter(_.nonEmpty)
-    if (readFields.isEmpty) {
-      // If only read the partition columns, the JniConnector will produce 
empty required fields.
-      // Read the "_hoodie_record_key" field at least to know how many rows in 
current hoodie split
-      // Even if the JniConnector doesn't read this field, the call of 
releaseTable will reclaim the resource
-      Array(HoodieRecord.RECORD_KEY_METADATA_FIELD)
-    } else {
-      readFields
-    }
-  }
-  val requiredTypes: Array[ColumnType] = requiredFields.map(
-    field => ColumnType.parseType(field, hudiColumnTypes(field)))
-
-  val nestedFields: Array[String] = {
-    val fields = params.remove("nested_fields")
-    if (StringUtils.isNullOrEmpty(fields)) new Array[String](0) else 
fields.split(",")
-  }
-  val instantTime: String = params.remove("instant_time")
-  val serde: String = params.remove("serde")
-  val inputFormat: String = params.remove("input_format")
-
-  val hadoopProperties: Map[String, String] = {
-    val properties = new jutil.HashMap[String, String]
-    val iterator = params.entrySet().iterator()
-    while (iterator.hasNext) {
-      val kv = iterator.next()
-      if (kv.getKey.startsWith(BaseSplitReader.HADOOP_CONF_PREFIX)) {
-        
properties.put(kv.getKey.substring(BaseSplitReader.HADOOP_CONF_PREFIX.length), 
kv.getValue)
-        iterator.remove()
-      }
-    }
-    properties.asScala.toMap
-  }
-
-  lazy val hadoopConf: Configuration = {
-    val conf = new Configuration
-    hadoopProperties.foreach(kv => conf.set(kv._1, kv._2))
-    conf
-  }
-
-  def incrementalRead: Boolean = {
-    
"true".equalsIgnoreCase(optParams.getOrElse("hoodie.datasource.read.incr.operation",
 "false"))
-  }
-
-  // NOTE: In cases when Hive Metastore is used as catalog and the table is 
partitioned, schema in the HMS might contain
-  //       Hive-specific partitioning columns created specifically for HMS to 
handle partitioning appropriately. In that
-  //       case  we opt in to not be providing catalog's schema, and instead 
force Hudi relations to fetch the schema
-  //       from the table itself
-  val schemaSpec: Option[StructType] = None
-
-  val optParams: Map[String, String] = params.asScala.toMap
-
-  override def equals(obj: Any): Boolean = {
-    if (obj == null) {
-      return false
-    }
-    obj match {
-      case split: HoodieSplit =>
-        hashCode() == split.hashCode()
-      case _ => false
-    }
-  }
-
-  override def hashCode(): Int = {
-    Objects.hash(queryId, basePath)
-  }
-}
-
-case class HoodieTableInformation(sparkSession: SparkSession,
-                                  metaClient: HoodieTableMetaClient,
-                                  timeline: HoodieTimeline,
-                                  tableConfig: HoodieTableConfig,
-                                  resolvedTargetFields: Array[String],
-                                  tableAvroSchema: Schema,
-                                  internalSchemaOpt: Option[InternalSchema])
-
-/**
- * Reference to Apache Hudi
- * see <a 
href="https://github.com/apache/hudi/blob/release-0.13.0/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala";>HoodieBaseRelation</a>
- */
-abstract class BaseSplitReader(val split: HoodieSplit) {
-
-  import BaseSplitReader._
-
-  protected val optParams: Map[String, String] = split.optParams
-
-  protected val tableInformation: HoodieTableInformation = cache.get(split)
-
-  protected val timeline: HoodieTimeline = tableInformation.timeline
-
-  protected val sparkSession: SparkSession = tableInformation.sparkSession
-  protected val sqlContext: SQLContext = sparkSession.sqlContext
-  imbueConfigs(sqlContext)
-
-  protected val tableConfig: HoodieTableConfig = tableInformation.tableConfig
-  protected val tableName: String = tableConfig.getTableName
-
-  // NOTE: Record key-field is assumed singular here due to the either of
-  //          - In case Hudi's meta fields are enabled: record key will be 
pre-materialized (stored) as part
-  //          of the record's payload (as part of the Hudi's metadata)
-  //          - In case Hudi's meta fields are disabled (virtual keys): in 
that case record has to bear _single field_
-  //          identified as its (unique) primary key w/in its payload (this is 
a limitation of [[SimpleKeyGenerator]],
-  //          which is the only [[KeyGenerator]] permitted for virtual-keys 
payloads)
-  protected lazy val recordKeyField: String =
-  if (tableConfig.populateMetaFields()) {
-    HoodieRecord.RECORD_KEY_METADATA_FIELD
-  } else {
-    val keyFields = tableConfig.getRecordKeyFields.get()
-    checkState(keyFields.length == 1)
-    keyFields.head
-  }
-
-  protected lazy val preCombineFieldOpt: Option[String] =
-    Option(tableConfig.getPreCombineField)
-      .orElse(optParams.get(DataSourceWriteOptions.PRECOMBINE_FIELD.key)) 
match {
-      // NOTE: This is required to compensate for cases when empty string is 
used to stub
-      //       property value to avoid it being set with the default value
-      // TODO(HUDI-3456) cleanup
-      case Some(f) if !StringUtils.isNullOrEmpty(f) => Some(f)
-      case _ => None
-    }
-
-  /**
-   * Columns that relation has to read from the storage to properly execute on 
its semantic: for ex,
-   * for Merge-on-Read tables key fields as well and pre-combine field 
comprise mandatory set of columns,
-   * meaning that regardless of whether this columns are being requested by 
the query they will be fetched
-   * regardless so that relation is able to combine records properly (if 
necessary)
-   *
-   * @VisibleInTests
-   */
-  val mandatoryFields: Seq[String]
-
-  /**
-   * NOTE: Initialization of teh following members is coupled on purpose to 
minimize amount of I/O
-   * required to fetch table's Avro and Internal schemas
-   */
-  protected lazy val (tableAvroSchema: Schema, internalSchemaOpt: 
Option[InternalSchema]) = {
-    (tableInformation.tableAvroSchema, tableInformation.internalSchemaOpt)
-  }
-
-  protected lazy val tableStructSchema: StructType = 
convertAvroSchemaToStructType(tableAvroSchema)
-
-  protected lazy val partitionColumns: Array[String] = 
tableConfig.getPartitionFields.orElse(Array.empty)
-
-  protected lazy val specifiedQueryTimestamp: Option[String] = 
Some(split.instantTime)
-
-  private def queryTimestamp: Option[String] =
-    
specifiedQueryTimestamp.orElse(toScalaOption(timeline.lastInstant()).map(_.getTimestamp))
-
-  lazy val tableState: HoodieTableState = {
-    val recordMergerImpls = 
ConfigUtils.split2List(getConfigValue(HoodieWriteConfig.RECORD_MERGER_IMPLS)).asScala.toList
-    val recordMergerStrategy = 
getConfigValue(HoodieWriteConfig.RECORD_MERGER_STRATEGY,
-      
Option(tableInformation.metaClient.getTableConfig.getRecordMergerStrategy))
-    val configProperties = getConfigProperties(sparkSession, optParams)
-    val metadataConfig = HoodieMetadataConfig.newBuilder()
-      .fromProperties(configProperties)
-      .enable(configProperties.getBoolean(
-        HoodieMetadataConfig.ENABLE.key(), 
HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS)
-        && 
HoodieTableMetadataUtil.isFilesPartitionAvailable(tableInformation.metaClient))
-      .build()
-
-    // Subset of the state of table's configuration as of at the time of the 
query
-    HoodieTableState(
-      tablePath = split.basePath,
-      latestCommitTimestamp = queryTimestamp,
-      recordKeyField = recordKeyField,
-      preCombineFieldOpt = preCombineFieldOpt,
-      usesVirtualKeys = !tableConfig.populateMetaFields(),
-      recordPayloadClassName = tableConfig.getPayloadClass,
-      metadataConfig = metadataConfig,
-      recordMergerImpls = recordMergerImpls,
-      recordMergerStrategy = recordMergerStrategy
-    )
-  }
-
-  private def getConfigValue(config: ConfigProperty[String],
-                             defaultValueOption: Option[String] = 
Option.empty): String = {
-    optParams.getOrElse(config.key(),
-      sqlContext.getConf(config.key(), 
defaultValueOption.getOrElse(config.defaultValue())))
-  }
-
-  def imbueConfigs(sqlContext: SQLContext): Unit = {
-    
sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.filterPushdown",
 "true")
-    
sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.recordLevelFilter.enabled",
 "true")
-    // TODO(HUDI-3639) vectorized reader has to be disabled to make sure 
MORIncrementalRelation is working properly
-    
sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.enableVectorizedReader",
 "false")
-  }
-
-  def buildScanIterator(filters: Array[Filter]): Iterator[InternalRow] = {
-    // NOTE: PLEASE READ CAREFULLY BEFORE MAKING CHANGES
-    //       *Appending* additional columns to the ones requested by the 
caller is not a problem, as those
-    //       will be eliminated by the caller's projection;
-    //   (!) Please note, however, that it's critical to avoid _reordering_ of 
the requested columns as this
-    //       will break the upstream projection
-    val targetColumns: Array[String] = 
appendMandatoryColumns(tableInformation.resolvedTargetFields)
-    // NOTE: We explicitly fallback to default table's Avro schema to make 
sure we avoid unnecessary Catalyst > Avro
-    //       schema conversion, which is lossy in nature (for ex, it doesn't 
preserve original Avro type-names) and
-    //       could have an effect on subsequent de-/serializing records in 
some exotic scenarios (when Avro unions
-    //       w/ more than 2 types are involved)
-    val sourceSchema = tableAvroSchema
-    val (requiredAvroSchema, requiredStructSchema, requiredInternalSchema) =
-      projectSchema(Either.cond(internalSchemaOpt.isDefined, 
internalSchemaOpt.get, sourceSchema), targetColumns)
-
-    val tableAvroSchemaStr = tableAvroSchema.toString
-    val tableSchema = HoodieTableSchema(tableStructSchema, tableAvroSchemaStr, 
internalSchemaOpt)
-    val requiredSchema = HoodieTableSchema(
-      requiredStructSchema, requiredAvroSchema.toString, 
Some(requiredInternalSchema))
-
-    composeIterator(tableSchema, requiredSchema, targetColumns, filters)
-  }
-
-  /**
-   * Composes iterator provided file split to read from, table and partition 
schemas, data filters to be applied
-   *
-   * @param tableSchema      target table's schema
-   * @param requiredSchema   projected schema required by the reader
-   * @param requestedColumns columns requested by the query
-   * @param filters          data filters to be applied
-   * @return instance of RDD (holding [[InternalRow]]s)
-   */
-  protected def composeIterator(tableSchema: HoodieTableSchema,
-                                requiredSchema: HoodieTableSchema,
-                                requestedColumns: Array[String],
-                                filters: Array[Filter]): Iterator[InternalRow]
-
-  private final def appendMandatoryColumns(requestedColumns: Array[String]): 
Array[String] = {
-    // For a nested field in mandatory columns, we should first get the 
root-level field, and then
-    // check for any missing column, as the requestedColumns should only 
contain root-level fields
-    // We should only append root-level field as well
-    val missing = mandatoryFields.map(col => 
HoodieAvroUtils.getRootLevelFieldName(col))
-      .filter(rootField => !requestedColumns.contains(rootField))
-    requestedColumns ++ missing
-  }
-
-  /**
-   * Projects provided schema by picking only required (projected) top-level 
columns from it
-   *
-   * @param tableSchema     schema to project (either of [[InternalSchema]] or 
Avro's [[Schema]])
-   * @param requiredColumns required top-level columns to be projected
-   */
-  def projectSchema(tableSchema: Either[Schema, InternalSchema],
-                    requiredColumns: Array[String]): (Schema, StructType, 
InternalSchema) = {
-    tableSchema match {
-      case Right(internalSchema) =>
-        checkState(!internalSchema.isEmptySchema)
-        val prunedInternalSchema = InternalSchemaUtils.pruneInternalSchema(
-          internalSchema, requiredColumns.toList.asJava)
-        val requiredAvroSchema = 
AvroInternalSchemaConverter.convert(prunedInternalSchema, "schema")
-        val requiredStructSchema = 
convertAvroSchemaToStructType(requiredAvroSchema)
-
-        (requiredAvroSchema, requiredStructSchema, prunedInternalSchema)
-
-      case Left(avroSchema) =>
-        val fieldMap = avroSchema.getFields.asScala.map(f => f.name() -> 
f).toMap
-        val requiredFields = requiredColumns.map { col =>
-          val f = fieldMap(col)
-          // We have to create a new [[Schema.Field]] since Avro schemas can't 
share field
-          // instances (and will throw "org.apache.avro.AvroRuntimeException: 
Field already used")
-          new Schema.Field(f.name(), f.schema(), f.doc(), f.defaultVal(), 
f.order())
-        }.toList
-        val requiredAvroSchema = Schema.createRecord(avroSchema.getName, 
avroSchema.getDoc,
-          avroSchema.getNamespace, avroSchema.isError, requiredFields.asJava)
-        val requiredStructSchema = 
convertAvroSchemaToStructType(requiredAvroSchema)
-
-        (requiredAvroSchema, requiredStructSchema, 
InternalSchema.getEmptyInternalSchema)
-    }
-  }
-
-  /**
-   * Converts Avro's [[Schema]] to Catalyst's [[StructType]]
-   */
-  protected def convertAvroSchemaToStructType(avroSchema: Schema): StructType 
= {
-    val schemaConverters = sparkAdapter.getAvroSchemaConverters
-    schemaConverters.toSqlType(avroSchema) match {
-      case (dataType, _) => dataType.asInstanceOf[StructType]
-    }
-  }
-
-  protected def tryPrunePartitionColumns(tableSchema: HoodieTableSchema,
-                                         requiredSchema: HoodieTableSchema): 
(StructType, HoodieTableSchema, HoodieTableSchema) = {
-    // Since schema requested by the caller might contain partition columns, 
we might need to
-    // prune it, removing all partition columns from it in case these columns 
are not persisted
-    // in the data files
-    //
-    // NOTE: This partition schema is only relevant to file reader to be able 
to embed
-    //       values of partition columns (hereafter referred to as partition 
values) encoded into
-    //       the partition path, and omitted from the data file, back into 
fetched rows;
-    //       Note that, by default, partition columns are not omitted 
therefore specifying
-    //       partition schema for reader is not required
-    if (shouldExtractPartitionValuesFromPartitionPath) {
-      val partitionSchema = StructType(partitionColumns.map(StructField(_, 
StringType)))
-      val prunedDataStructSchema = 
prunePartitionColumns(tableSchema.structTypeSchema)
-      val prunedRequiredSchema = 
prunePartitionColumns(requiredSchema.structTypeSchema)
-
-      (partitionSchema,
-        HoodieTableSchema(prunedDataStructSchema, 
convertToAvroSchema(prunedDataStructSchema, tableName).toString),
-        HoodieTableSchema(prunedRequiredSchema, 
convertToAvroSchema(prunedRequiredSchema, tableName).toString))
-    } else {
-      (StructType(Nil), tableSchema, requiredSchema)
-    }
-  }
-
-  /**
-   * Controls whether partition values (ie values of partition columns) should 
be
-   * <ol>
-   * <li>Extracted from partition path and appended to individual rows read 
from the data file (we
-   * delegate this to Spark's [[ParquetFileFormat]])</li>
-   * <li>Read from the data-file as is (by default Hudi persists all columns 
including partition ones)</li>
-   * </ol>
-   *
-   * This flag is only be relevant in conjunction with the usage of 
[["hoodie.datasource.write.drop.partition.columns"]]
-   * config, when Hudi will NOT be persisting partition columns in the data 
file, and therefore values for
-   * such partition columns (ie "partition values") will have to be parsed 
from the partition path, and appended
-   * to every row only in the fetched dataset.
-   *
-   * NOTE: Partition values extracted from partition path might be deviating 
from the values of the original
-   * partition columns: for ex, if originally as partition column was used 
column [[ts]] bearing epoch
-   * timestamp, which was used by [[TimestampBasedKeyGenerator]] to generate 
partition path of the format
-   * [["yyyy/mm/dd"]], appended partition value would bear the format verbatim 
as it was used in the
-   * partition path, meaning that string value of "2022/01/01" will be 
appended, and not its original
-   * representation
-   */
-  protected val shouldExtractPartitionValuesFromPartitionPath: Boolean = {
-    // Controls whether partition columns (which are the source for the 
partition path values) should
-    // be omitted from persistence in the data files. On the read path it 
affects whether partition values (values
-    // of partition columns) will be read from the data file or extracted from 
partition path
-
-    val shouldOmitPartitionColumns = 
tableInformation.tableConfig.shouldDropPartitionColumns && 
partitionColumns.nonEmpty
-    val shouldExtractPartitionValueFromPath =
-      
optParams.getOrElse(DataSourceReadOptions.EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH.key,
-        
DataSourceReadOptions.EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH.defaultValue.toString).toBoolean
-    shouldOmitPartitionColumns || shouldExtractPartitionValueFromPath
-  }
-
-  private def prunePartitionColumns(dataStructSchema: StructType): StructType =
-    StructType(dataStructSchema.filterNot(f => 
partitionColumns.contains(f.name)))
-
-  /**
-   * For enable hoodie.datasource.write.drop.partition.columns, need to create 
an InternalRow on partition values
-   * and pass this reader on parquet file. So that, we can query the partition 
columns.
-   */
-  protected def getPartitionColumnsAsInternalRow(): InternalRow = {
-    try {
-      if (shouldExtractPartitionValuesFromPartitionPath) {
-        val filePath = new Path(split.dataFilePath)
-        val tablePathWithoutScheme = 
CachingPath.getPathWithoutSchemeAndAuthority(new 
Path(tableInformation.metaClient.getBasePathV2.toUri))
-        val partitionPathWithoutScheme = 
CachingPath.getPathWithoutSchemeAndAuthority(filePath.getParent)
-        val relativePath = new 
URI(tablePathWithoutScheme.toString).relativize(new 
URI(partitionPathWithoutScheme.toString)).toString
-        val hiveStylePartitioningEnabled = 
tableConfig.getHiveStylePartitioningEnable.toBoolean
-        if (hiveStylePartitioningEnabled) {
-          val partitionSpec = PartitioningUtils.parsePathFragment(relativePath)
-          
InternalRow.fromSeq(partitionColumns.map(partitionSpec(_)).map(UTF8String.fromString))
-        } else {
-          if (partitionColumns.length == 1) {
-            InternalRow.fromSeq(Seq(UTF8String.fromString(relativePath)))
-          } else {
-            val parts = relativePath.split("/")
-            assert(parts.size == partitionColumns.length)
-            InternalRow.fromSeq(parts.map(UTF8String.fromString))
-          }
-        }
-      } else {
-        InternalRow.empty
-      }
-    } catch {
-      case NonFatal(e) =>
-        LOG.warn(s"Failed to get the right partition InternalRow for file: 
${split.dataFilePath}", e)
-        InternalRow.empty
-    }
-  }
-
-  /**
-   * Wrapper for `buildReaderWithPartitionValues` of [[ParquetFileFormat]] 
handling [[ColumnarBatch]],
-   * when Parquet's Vectorized Reader is used
-   *
-   * TODO move to HoodieBaseRelation, make private
-   */
-  private[hudi] def buildHoodieParquetReader(sparkSession: SparkSession,
-                                             dataSchema: StructType,
-                                             partitionSchema: StructType,
-                                             requiredSchema: StructType,
-                                             filters: Seq[Filter],
-                                             options: Map[String, String],
-                                             hadoopConf: Configuration,
-                                             appendPartitionValues: Boolean = 
false): PartitionedFile => Iterator[InternalRow] = {
-    val parquetFileFormat: ParquetFileFormat = 
sparkAdapter.createLegacyHoodieParquetFileFormat(appendPartitionValues).get
-    val readParquetFile: PartitionedFile => Iterator[Any] = 
parquetFileFormat.buildReaderWithPartitionValues(
-      sparkSession = sparkSession,
-      dataSchema = dataSchema,
-      partitionSchema = partitionSchema,
-      requiredSchema = requiredSchema,
-      filters = filters,
-      options = options,
-      hadoopConf = hadoopConf
-    )
-
-    file: PartitionedFile => {
-      val iter = readParquetFile(file)
-      iter.flatMap {
-        case r: InternalRow => Seq(r)
-        case b: ColumnarBatch => b.rowIterator().asScala
-      }
-    }
-  }
-
-  private def createHFileReader(spark: SparkSession,
-                                dataSchema: HoodieTableSchema,
-                                requiredDataSchema: HoodieTableSchema,
-                                filters: Seq[Filter],
-                                options: Map[String, String],
-                                hadoopConf: Configuration): PartitionedFile => 
Iterator[InternalRow] = {
-    partitionedFile => {
-      var hadoopStorageConfiguration = new 
HadoopStorageConfiguration(hadoopConf);
-      var storagePath = new StoragePath(partitionedFile.toPath.toUri.getPath);
-      var emptySchema = 
org.apache.hudi.common.util.Option.empty[org.apache.avro.Schema]()
-      val reader = new HoodieHBaseAvroHFileReader(
-        hadoopStorageConfiguration, storagePath, emptySchema)
-
-      val requiredRowSchema = requiredDataSchema.structTypeSchema
-      // NOTE: Schema has to be parsed at this point, since Avro's [[Schema]] 
aren't serializable
-      //       to be passed from driver to executor
-      val requiredAvroSchema = new 
Schema.Parser().parse(requiredDataSchema.avroSchemaStr)
-      val avroToRowConverter = 
AvroConversionUtils.createAvroToInternalRowConverter(
-        requiredAvroSchema, requiredRowSchema)
-
-      reader.getRecordIterator(requiredAvroSchema).asScala
-        .map(record => {
-          
avroToRowConverter.apply(record.getData.asInstanceOf[GenericRecord]).get
-        })
-    }
-  }
-
-  /**
-   * Returns file-reader routine accepting [[PartitionedFile]] and returning 
an [[Iterator]]
-   * over [[InternalRow]]
-   */
-  protected def createBaseFileReader(spark: SparkSession,
-                                     partitionSchema: StructType,
-                                     dataSchema: HoodieTableSchema,
-                                     requiredDataSchema: HoodieTableSchema,
-                                     filters: Seq[Filter],
-                                     options: Map[String, String],
-                                     hadoopConf: Configuration): 
BaseFileReader = {
-    val tableBaseFileFormat = tableConfig.getBaseFileFormat
-
-    // NOTE: PLEASE READ CAREFULLY
-    //       Lambda returned from this method is going to be invoked on the 
executor, and therefore
-    //       we have to eagerly initialize all of the readers even though only 
one specific to the type
-    //       of the file being read will be used. This is required to avoid 
serialization of the whole
-    //       relation (containing file-index for ex) and passing it to the 
executor
-    val (read: (PartitionedFile => Iterator[InternalRow]), schema: StructType) 
=
-    tableBaseFileFormat match {
-      case HoodieFileFormat.PARQUET =>
-        val parquetReader = buildHoodieParquetReader(
-          sparkSession = spark,
-          dataSchema = dataSchema.structTypeSchema,
-          partitionSchema = partitionSchema,
-          requiredSchema = requiredDataSchema.structTypeSchema,
-          filters = filters,
-          options = options,
-          hadoopConf = hadoopConf,
-          // We're delegating to Spark to append partition values to every row 
only in cases
-          // when these corresponding partition-values are not persisted w/in 
the data file itself
-          appendPartitionValues = shouldExtractPartitionValuesFromPartitionPath
-        )
-        // Since partition values by default are omitted, and not persisted 
w/in data-files by Spark,
-        // data-file readers (such as [[ParquetFileFormat]]) have to inject 
partition values while reading
-        // the data. As such, actual full schema produced by such reader is 
composed of
-        //    a) Data-file schema (projected or not)
-        //    b) Appended partition column values
-        val readerSchema = 
StructType(requiredDataSchema.structTypeSchema.fields ++ partitionSchema.fields)
-
-        (parquetReader, readerSchema)
-
-      case HoodieFileFormat.HFILE =>
-        val hfileReader = createHFileReader(
-          spark = spark,
-          dataSchema = dataSchema,
-          requiredDataSchema = requiredDataSchema,
-          filters = filters,
-          options = options,
-          hadoopConf = hadoopConf
-        )
-
-        (hfileReader, requiredDataSchema.structTypeSchema)
-
-      case _ => throw new UnsupportedOperationException(s"Base file format is 
not currently supported ($tableBaseFileFormat)")
-    }
-
-    BaseFileReader(
-      read = partitionedFile => {
-        val extension = 
FSUtils.getFileExtension(partitionedFile.filePath.toString())
-        if (tableBaseFileFormat.getFileExtension.equals(extension)) {
-          read(partitionedFile)
-        } else {
-          throw new UnsupportedOperationException(s"Invalid base-file format 
($extension), expected ($tableBaseFileFormat)")
-        }
-      },
-      schema = schema
-    )
-  }
-
-  protected def embedInternalSchema(conf: Configuration, internalSchemaOpt: 
Option[InternalSchema]): Configuration = {
-    val internalSchema = 
internalSchemaOpt.getOrElse(InternalSchema.getEmptyInternalSchema)
-    val querySchemaString = SerDeHelper.toJson(internalSchema)
-    if (!StringUtils.isNullOrEmpty(querySchemaString)) {
-      val validCommits = 
timeline.getInstants.iterator.asScala.map(_.getFileName).mkString(",")
-      LOG.warn(s"Table valid commits: $validCommits")
-
-      conf.set(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA, 
SerDeHelper.toJson(internalSchema))
-      conf.set(SparkInternalSchemaConverter.HOODIE_TABLE_PATH, split.basePath)
-      conf.set(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST, 
validCommits)
-    }
-    conf
-  }
-}
-
-object SparkMockHelper {
-  private lazy val mockSparkContext = {
-    val conf = new SparkConf().setMaster("local").setAppName("mock_sc")
-      .set("spark.ui.enabled", "false")
-    val sc = new SparkContext(conf)
-    sc.setLogLevel("WARN")
-    sc
-  }
-
-  implicit class MockSparkSession(builder: SparkSession.Builder) {
-    def createMockSession(split: HoodieSplit): SparkSession = {
-      val sparkSessionClass = classOf[SparkSession]
-      val constructor: Constructor[SparkSession] = 
sparkSessionClass.getDeclaredConstructors
-        .find(_.getParameterCount == 
5).get.asInstanceOf[Constructor[SparkSession]]
-      constructor.setAccessible(true)
-      val ss = constructor.newInstance(mockSparkContext, None, None, new 
SparkSessionExtensions, Map.empty)
-      split.hadoopProperties.foreach(kv => 
ss.sessionState.conf.setConfString(kv._1, kv._2))
-      ss
-    }
-  }
-}
-
-object BaseSplitReader {
-
-  import SparkMockHelper.MockSparkSession
-
-  private val LOG = Logger.getLogger(BaseSplitReader.getClass)
-  val HADOOP_CONF_PREFIX = "hadoop_conf."
-
-  // Use [[SparkAdapterSupport]] instead ?
-  private lazy val sparkAdapter = new DorisSparkAdapter
-
-  private lazy val cache: LoadingCache[HoodieSplit, HoodieTableInformation] = {
-    val loader = new CacheLoader[HoodieSplit, HoodieTableInformation] {
-      override def load(split: HoodieSplit): HoodieTableInformation = {
-        // create mock spark session
-        val sparkSession = SparkSession.builder().createMockSession(split)
-        val metaClient = Utils.getMetaClient(split.hadoopConf, split.basePath)
-        // NOTE: We're including compaction here since it's not considering a 
"commit" operation
-        val timeline = 
metaClient.getCommitsAndCompactionTimeline.filterCompletedInstants
-
-        val specifiedQueryTimestamp: Option[String] = Some(split.instantTime)
-        val schemaResolver = new TableSchemaResolver(metaClient)
-        val internalSchemaOpt = if 
(!isSchemaEvolutionEnabledOnRead(split.optParams, sparkSession)) {
-          None
-        } else {
-          Try {
-            
specifiedQueryTimestamp.map(schemaResolver.getTableInternalSchemaFromCommitMetadata)
-              
.getOrElse(schemaResolver.getTableInternalSchemaFromCommitMetadata)
-          } match {
-            case Success(internalSchemaOpt) => toScalaOption(internalSchemaOpt)
-            case Failure(_) =>
-              None
-          }
-        }
-        val tableName = metaClient.getTableConfig.getTableName
-        val (name, namespace) = 
AvroConversionUtils.getAvroRecordNameAndNamespace(tableName)
-        val avroSchema: Schema = internalSchemaOpt.map { is =>
-          AvroInternalSchemaConverter.convert(is, namespace + "." + name)
-        } orElse {
-          specifiedQueryTimestamp.map(schemaResolver.getTableAvroSchema)
-        } orElse {
-          split.schemaSpec.map(s => convertToAvroSchema(s, tableName))
-        } getOrElse {
-          Try(schemaResolver.getTableAvroSchema) match {
-            case Success(schema) => schema
-            case Failure(e) =>
-              throw new HoodieSchemaException("Failed to fetch schema from the 
table", e)
-          }
-        }
-
-        // match column name in lower case
-        val colNames = internalSchemaOpt.map { internalSchema =>
-          internalSchema.getAllColsFullName.asScala.map(f => f.toLowerCase -> 
f).toMap
-        } getOrElse {
-          avroSchema.getFields.asScala.map(f => f.name().toLowerCase -> 
f.name()).toMap
-        }
-        val resolvedTargetFields = split.requiredFields.map(field => 
colNames.getOrElse(field.toLowerCase, field))
-
-        HoodieTableInformation(sparkSession,
-          metaClient,
-          timeline,
-          metaClient.getTableConfig,
-          resolvedTargetFields,
-          avroSchema,
-          internalSchemaOpt)
-      }
-    }
-    CacheBuilder.newBuilder()
-      .expireAfterAccess(10, TimeUnit.MINUTES)
-      .maximumSize(4096)
-      .build(loader)
-  }
-
-  private def isSchemaEvolutionEnabledOnRead(optParams: Map[String, String], 
sparkSession: SparkSession): Boolean = {
-    // NOTE: Schema evolution could be configured both t/h optional parameters 
vehicle as well as
-    //       t/h Spark Session configuration (for ex, for Spark SQL)
-    optParams.getOrElse(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key,
-      
DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.defaultValue.toString).toBoolean 
||
-      
sparkSession.sessionState.conf.getConfString(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key,
-        
DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.defaultValue.toString).toBoolean
-  }
-
-  private def getConfigProperties(spark: SparkSession, options: Map[String, 
String]) = {
-    val sqlConf: SQLConf = spark.sessionState.conf
-    val properties = new TypedProperties()
-    // Ambiguous reference when invoking Properties.putAll() in Java 11
-    // Reference https://github.com/scala/bug/issues/10418
-    options.filter(p => p._2 != null).foreach(p => 
properties.setProperty(p._1, p._2))
-
-    // TODO(HUDI-5361) clean up properties carry-over
-
-    // To support metadata listing via Spark SQL we allow users to pass the 
config via SQL Conf in spark session. Users
-    // would be able to run SET hoodie.metadata.enable=true in the spark sql 
session to enable metadata listing.
-    val isMetadataTableEnabled = HoodieSparkConfUtils.getConfigValue(options, 
sqlConf, HoodieMetadataConfig.ENABLE.key, null)
-    if (isMetadataTableEnabled != null) {
-      properties.setProperty(HoodieMetadataConfig.ENABLE.key(), 
String.valueOf(isMetadataTableEnabled))
-    }
-
-    val listingModeOverride = HoodieSparkConfUtils.getConfigValue(options, 
sqlConf,
-      DataSourceReadOptions.FILE_INDEX_LISTING_MODE_OVERRIDE.key, null)
-    if (listingModeOverride != null) {
-      
properties.setProperty(DataSourceReadOptions.FILE_INDEX_LISTING_MODE_OVERRIDE.key,
 listingModeOverride)
-    }
-
-    properties
-  }
-}
diff --git 
a/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/HoodieRecordIterator.scala
 
b/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/HoodieRecordIterator.scala
deleted file mode 100644
index f393e9e1246..00000000000
--- 
a/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/HoodieRecordIterator.scala
+++ /dev/null
@@ -1,178 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.hudi
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hudi.HoodieBaseRelation.{BaseFileReader, projectReader}
-import org.apache.hudi.MergeOnReadSnapshotRelation.isProjectionCompatible
-import org.apache.hudi.common.model.HoodieRecord
-import org.apache.hudi.common.util.StringUtils
-import org.apache.hudi.{DataSourceReadOptions, HoodieMergeOnReadFileSplit, 
HoodieTableSchema, HoodieTableState, LogFileIterator, RecordMergingFileIterator}
-import 
org.apache.spark.sql.HoodieCatalystExpressionUtils.generateUnsafeProjection
-import org.apache.spark.sql.catalyst.InternalRow
-
-import java.io.Closeable
-import java.util.function.Predicate
-
-/**
- * Class holding base-file readers for 3 different use-cases:
- *
- * <ol>
- * <li>Full-schema reader: is used when whole row has to be read to perform 
merging correctly.
- * This could occur, when no optimizations could be applied and we have to 
fallback to read the whole row from
- * the base file and the corresponding delta-log file to merge them 
correctly</li>
- *
- * <li>Required-schema reader: is used when it's fine to only read row's 
projected columns.
- * This could occur, when row could be merged with corresponding delta-log 
record while leveraging only
- * projected columns</li>
- *
- * <li>Required-schema reader (skip-merging): is used when when no merging 
will be performed (skip-merged).
- * This could occur, when file-group has no delta-log files</li>
- * </ol>
- */
-private[hudi] case class HoodieMergeOnReadBaseFileReaders(fullSchemaReader: 
BaseFileReader,
-                                                          
requiredSchemaReader: BaseFileReader,
-                                                          
requiredSchemaReaderSkipMerging: BaseFileReader)
-
-/**
- * Provided w/ instance of [[HoodieMergeOnReadFileSplit]], provides an 
iterator over all of the records stored in
- * Base file as well as all of the Delta Log files simply returning 
concatenation of these streams, while not
- * performing any combination/merging of the records w/ the same primary keys 
(ie producing duplicates potentially)
- */
-private class SkipMergeIterator(split: HoodieMergeOnReadFileSplit,
-                                baseFileReader: BaseFileReader,
-                                dataSchema: HoodieTableSchema,
-                                requiredSchema: HoodieTableSchema,
-                                tableState: HoodieTableState,
-                                config: Configuration)
-  extends LogFileIterator(split, dataSchema, requiredSchema, tableState, 
config) {
-
-  private val requiredSchemaProjection = 
generateUnsafeProjection(baseFileReader.schema, structTypeSchema)
-
-  private val baseFileIterator = baseFileReader(split.dataFile.get)
-
-  override def doHasNext: Boolean = {
-    if (baseFileIterator.hasNext) {
-      // No merge is required, simply load current row and project into 
required schema
-      nextRecord = requiredSchemaProjection(baseFileIterator.next())
-      true
-    } else {
-      super[LogFileIterator].doHasNext
-    }
-  }
-}
-
-/**
- * Reference to Apache Hudi
- * see <a 
href="https://github.com/apache/hudi/blob/release-0.13.0/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala";>HoodieMergeOnReadRDD</a>
- */
-class HoodieMORRecordIterator(config: Configuration,
-                              fileReaders: HoodieMergeOnReadBaseFileReaders,
-                              tableSchema: HoodieTableSchema,
-                              requiredSchema: HoodieTableSchema,
-                              tableState: HoodieTableState,
-                              mergeType: String,
-                              fileSplit: HoodieMergeOnReadFileSplit,
-                              includeStartTime: Boolean = false,
-                              startTimestamp: String = null,
-                              endTimestamp: String = null) extends 
Iterator[InternalRow] with Closeable {
-  protected val maxCompactionMemoryInBytes: Long = config.getLongBytes(
-    "hoodie.compaction.memory", 512 * 1024 * 1024)
-
-  protected val recordIterator: Iterator[InternalRow] = {
-    val iter = fileSplit match {
-      case dataFileOnlySplit if dataFileOnlySplit.logFiles.isEmpty =>
-        val projectedReader = 
projectReader(fileReaders.requiredSchemaReaderSkipMerging, 
requiredSchema.structTypeSchema)
-        projectedReader(dataFileOnlySplit.dataFile.get)
-
-      case logFileOnlySplit if logFileOnlySplit.dataFile.isEmpty =>
-        new LogFileIterator(logFileOnlySplit, tableSchema, requiredSchema, 
tableState, config)
-
-      case split => mergeType match {
-        case DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL =>
-          val reader = fileReaders.requiredSchemaReaderSkipMerging
-          new SkipMergeIterator(split, reader, tableSchema, requiredSchema, 
tableState, config)
-
-        case DataSourceReadOptions.REALTIME_PAYLOAD_COMBINE_OPT_VAL =>
-          val reader = pickBaseFileReader()
-          new RecordMergingFileIterator(split, reader, tableSchema, 
requiredSchema, tableState, config)
-
-        case _ => throw new UnsupportedOperationException(s"Not supported 
merge type ($mergeType)")
-      }
-    }
-
-    val commitTimeMetadataFieldIdx = 
requiredSchema.structTypeSchema.fieldNames.indexOf(HoodieRecord.COMMIT_TIME_METADATA_FIELD)
-    val needsFiltering = commitTimeMetadataFieldIdx >= 0 && 
!StringUtils.isNullOrEmpty(startTimestamp) && 
!StringUtils.isNullOrEmpty(endTimestamp)
-    if (needsFiltering) {
-      val filterT: Predicate[InternalRow] = 
getCommitTimeFilter(includeStartTime, commitTimeMetadataFieldIdx)
-      iter.filter(filterT.test)
-    }
-    else {
-      iter
-    }
-  }
-
-  private def getCommitTimeFilter(includeStartTime: Boolean, 
commitTimeMetadataFieldIdx: Int): Predicate[InternalRow] = {
-    if (includeStartTime) {
-      new Predicate[InternalRow] {
-        override def test(row: InternalRow): Boolean = {
-          val commitTime = row.getString(commitTimeMetadataFieldIdx)
-          commitTime >= startTimestamp && commitTime <= endTimestamp
-        }
-      }
-    } else {
-      new Predicate[InternalRow] {
-        override def test(row: InternalRow): Boolean = {
-          val commitTime = row.getString(commitTimeMetadataFieldIdx)
-          commitTime > startTimestamp && commitTime <= endTimestamp
-        }
-      }
-    }
-  }
-
-  private def pickBaseFileReader(): BaseFileReader = {
-    // NOTE: This is an optimization making sure that even for MOR tables we 
fetch absolute minimum
-    //       of the stored data possible, while still properly executing 
corresponding relation's semantic
-    //       and meet the query's requirements.
-    //
-    //       Here we assume that iff queried table does use one of the 
standard (and whitelisted)
-    //       Record Payload classes then we can avoid reading and parsing the 
records w/ _full_ schema,
-    //       and instead only rely on projected one, nevertheless being able 
to perform merging correctly
-    if (isProjectionCompatible(tableState)) {
-      fileReaders.requiredSchemaReader
-    } else {
-      fileReaders.fullSchemaReader
-    }
-  }
-
-  override def hasNext: Boolean = {
-    recordIterator.hasNext
-  }
-
-  override def next(): InternalRow = {
-    recordIterator.next()
-  }
-
-  override def close(): Unit = {
-    recordIterator match {
-      case closeable: Closeable =>
-        closeable.close()
-      case _ =>
-    }
-  }
-}
diff --git 
a/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/MORIncrementalSplitReader.scala
 
b/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/MORIncrementalSplitReader.scala
deleted file mode 100644
index 73c87e29034..00000000000
--- 
a/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/MORIncrementalSplitReader.scala
+++ /dev/null
@@ -1,86 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.hudi
-
-import org.apache.hudi.HoodieTableSchema
-import org.apache.hudi.common.model.HoodieRecord
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.sources._
-
-/**
- * Reference to Apache Hudi
- * see <a 
href="https://github.com/apache/hudi/blob/release-0.14.1/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala";>MergeOnReadIncrementalRelation</a>
- */
-class MORIncrementalSplitReader(override val split: HoodieSplit) extends 
MORSnapshotSplitReader(split) with IncrementalSplitReaderTrait {
-
-  override protected def composeIterator(tableSchema: HoodieTableSchema,
-                                         requiredSchema: HoodieTableSchema,
-                                         requestedColumns: Array[String],
-                                         filters: Array[Filter]): 
Iterator[InternalRow] = {
-    // The only required filters are ones that make sure we're only fetching 
records that
-    // fall into incremental span of the timeline being queried
-    val requiredFilters = incrementalSpanRecordFilters
-    val optionalFilters = filters
-    val readers = createBaseFileReaders(tableSchema, requiredSchema, 
requestedColumns, requiredFilters, optionalFilters)
-
-    new HoodieMORRecordIterator(split.hadoopConf,
-      readers,
-      tableSchema,
-      requiredSchema,
-      tableState,
-      mergeType,
-      getFileSplit(),
-      includeStartTime = includeStartTime,
-      startTimestamp = startTs,
-      endTimestamp = endTs)
-  }
-
-}
-
-/**
- * Reference to Apache Hudi
- * see <a 
href="https://github.com/apache/hudi/blob/release-0.14.1/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala";>HoodieIncrementalRelationTrait</a>
- */
-trait IncrementalSplitReaderTrait extends BaseSplitReader {
-  protected val includeStartTime: Boolean = 
"true".equalsIgnoreCase(optParams("hoodie.datasource.read.incr.includeStartTime"))
-  protected val startTs: String = 
optParams("hoodie.datasource.read.begin.instanttime")
-  protected val endTs: String = 
optParams("hoodie.datasource.read.end.instanttime")
-
-  // Record filters making sure that only records w/in the requested bounds 
are being fetched as part of the
-  // scan collected by this relation
-  protected lazy val incrementalSpanRecordFilters: Seq[Filter] = {
-    val isNotNullFilter = IsNotNull(HoodieRecord.COMMIT_TIME_METADATA_FIELD)
-
-    val largerThanFilter = if (includeStartTime) {
-      GreaterThanOrEqual(HoodieRecord.COMMIT_TIME_METADATA_FIELD, startTs)
-    } else {
-      GreaterThan(HoodieRecord.COMMIT_TIME_METADATA_FIELD, startTs)
-    }
-
-    val lessThanFilter = 
LessThanOrEqual(HoodieRecord.COMMIT_TIME_METADATA_FIELD, endTs)
-
-    Seq(isNotNullFilter, largerThanFilter, lessThanFilter)
-  }
-
-  override lazy val mandatoryFields: Seq[String] = {
-    // NOTE: This columns are required for Incremental flow to be able to 
handle the rows properly, even in
-    //       cases when no columns are requested to be fetched (for ex, when 
using {@code count()} API)
-    Seq(HoodieRecord.RECORD_KEY_METADATA_FIELD, 
HoodieRecord.COMMIT_TIME_METADATA_FIELD) ++
-      preCombineFieldOpt.map(Seq(_)).getOrElse(Seq())
-  }
-}
diff --git 
a/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/MORSnapshotSplitReader.scala
 
b/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/MORSnapshotSplitReader.scala
deleted file mode 100644
index 02a4fa40045..00000000000
--- 
a/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/MORSnapshotSplitReader.scala
+++ /dev/null
@@ -1,184 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.hudi
-
-import org.apache.hudi.HoodieBaseRelation.convertToAvroSchema
-import org.apache.hudi.avro.HoodieAvroUtils
-import org.apache.hudi.common.model.HoodieLogFile
-import org.apache.hudi.{DataSourceReadOptions, HoodieMergeOnReadFileSplit, 
HoodieTableSchema}
-import org.apache.spark.paths.SparkPath
-import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.execution.datasources.PartitionedFile
-import org.apache.spark.sql.sources.Filter
-import org.apache.spark.sql.types.StructType
-
-/**
- * Reference to Apache Hudi
- * see <a 
href="https://github.com/apache/hudi/blob/release-0.13.0/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala";>MergeOnReadSnapshotRelation</a>
- */
-class MORSnapshotSplitReader(override val split: HoodieSplit) extends 
BaseSplitReader(split) {
-  /**
-   * NOTE: These are the fields that are required to properly fulfil 
Merge-on-Read (MOR)
-   * semantic:
-   *
-   * <ol>
-   * <li>Primary key is required to make sure we're able to correlate records 
from the base
-   * file with the updated records from the delta-log file</li>
-   * <li>Pre-combine key is required to properly perform the combining (or 
merging) of the
-   * existing and updated records</li>
-   * </ol>
-   *
-   * However, in cases when merging is NOT performed (for ex, if file-group 
only contains base
-   * files but no delta-log files, or if the query-type is equal to 
[["skip_merge"]]) neither
-   * of primary-key or pre-combine-key are required to be fetched from storage 
(unless requested
-   * by the query), therefore saving on throughput
-   */
-  protected lazy val mandatoryFieldsForMerging: Seq[String] =
-    Seq(recordKeyField) ++ preCombineFieldOpt.map(Seq(_)).getOrElse(Seq())
-
-  override lazy val mandatoryFields: Seq[String] = mandatoryFieldsForMerging
-
-  protected val mergeType: String = 
optParams.getOrElse(DataSourceReadOptions.REALTIME_MERGE.key,
-    DataSourceReadOptions.REALTIME_MERGE.defaultValue)
-
-  override protected def composeIterator(tableSchema: HoodieTableSchema,
-                                         requiredSchema: HoodieTableSchema,
-                                         requestedColumns: Array[String],
-                                         filters: Array[Filter]): 
Iterator[InternalRow] = {
-    // todo: push down predicates about key field
-    val requiredFilters = Seq.empty
-    val optionalFilters = filters
-    val readers = createBaseFileReaders(tableSchema, requiredSchema, 
requestedColumns, requiredFilters, optionalFilters)
-
-    new HoodieMORRecordIterator(split.hadoopConf,
-      readers,
-      tableSchema,
-      requiredSchema,
-      tableState,
-      mergeType,
-      getFileSplit())
-  }
-
-  protected def getFileSplit(): HoodieMergeOnReadFileSplit = {
-    val logFiles = split.deltaFilePaths.map(new HoodieLogFile(_))
-      
.sorted(Ordering.comparatorToOrdering(HoodieLogFile.getLogFileComparator)).toList
-    val partitionedBaseFile = if (split.dataFilePath.isEmpty) {
-      None
-    } else {
-      Some(PartitionedFile(getPartitionColumnsAsInternalRow(), 
SparkPath.fromPathString(split.dataFilePath), 0, split.dataFileLength))
-    }
-    HoodieMergeOnReadFileSplit(partitionedBaseFile, logFiles)
-  }
-
-  override def imbueConfigs(sqlContext: SQLContext): Unit = {
-    super.imbueConfigs(sqlContext)
-    
sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.enableVectorizedReader",
 "true")
-    // there's no thread local TaskContext, so the parquet reader will still 
use on heap memory even setting true
-    
sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.columnVector.offheap.enabled",
 "true")
-  }
-
-  protected def createBaseFileReaders(tableSchema: HoodieTableSchema,
-                                      requiredSchema: HoodieTableSchema,
-                                      requestedColumns: Array[String],
-                                      requiredFilters: Seq[Filter],
-                                      optionalFilters: Seq[Filter] = 
Seq.empty): HoodieMergeOnReadBaseFileReaders = {
-    val (partitionSchema, dataSchema, requiredDataSchema) =
-      tryPrunePartitionColumns(tableSchema, requiredSchema)
-
-    val fullSchemaReader = createBaseFileReader(
-      spark = sqlContext.sparkSession,
-      partitionSchema = partitionSchema,
-      dataSchema = dataSchema,
-      requiredDataSchema = dataSchema,
-      // This file-reader is used to read base file records, subsequently 
merging them with the records
-      // stored in delta-log files. As such, we have to read _all_ records 
from the base file, while avoiding
-      // applying any filtering _before_ we complete combining them w/ 
delta-log records (to make sure that
-      // we combine them correctly);
-      // As such only required filters could be pushed-down to such reader
-      filters = requiredFilters,
-      options = optParams,
-      // NOTE: We have to fork the Hadoop Config here as Spark will be 
modifying it
-      //       to configure Parquet reader appropriately
-      hadoopConf = embedInternalSchema(split.hadoopConf, internalSchemaOpt)
-    )
-
-    val requiredSchemaReader = createBaseFileReader(
-      spark = sqlContext.sparkSession,
-      partitionSchema = partitionSchema,
-      dataSchema = dataSchema,
-      requiredDataSchema = requiredDataSchema,
-      // This file-reader is used to read base file records, subsequently 
merging them with the records
-      // stored in delta-log files. As such, we have to read _all_ records 
from the base file, while avoiding
-      // applying any filtering _before_ we complete combining them w/ 
delta-log records (to make sure that
-      // we combine them correctly);
-      // As such only required filters could be pushed-down to such reader
-      filters = requiredFilters,
-      options = optParams,
-      // NOTE: We have to fork the Hadoop Config here as Spark will be 
modifying it
-      //       to configure Parquet reader appropriately
-      hadoopConf = embedInternalSchema(split.hadoopConf, 
requiredDataSchema.internalSchema)
-    )
-
-    // Check whether fields required for merging were also requested to be 
fetched
-    // by the query:
-    //    - In case they were, there's no optimization we could apply here (we 
will have
-    //    to fetch such fields)
-    //    - In case they were not, we will provide 2 separate file-readers
-    //        a) One which would be applied to file-groups w/ delta-logs 
(merging)
-    //        b) One which would be applied to file-groups w/ no delta-logs or
-    //           in case query-mode is skipping merging
-    val mandatoryColumns = 
mandatoryFieldsForMerging.map(HoodieAvroUtils.getRootLevelFieldName)
-    if (mandatoryColumns.forall(requestedColumns.contains)) {
-      HoodieMergeOnReadBaseFileReaders(
-        fullSchemaReader = fullSchemaReader,
-        requiredSchemaReader = requiredSchemaReader,
-        requiredSchemaReaderSkipMerging = requiredSchemaReader
-      )
-    } else {
-      val prunedRequiredSchema = {
-        val unusedMandatoryColumnNames = 
mandatoryColumns.filterNot(requestedColumns.contains)
-        val prunedStructSchema =
-          StructType(requiredDataSchema.structTypeSchema.fields
-            .filterNot(f => unusedMandatoryColumnNames.contains(f.name)))
-
-        HoodieTableSchema(prunedStructSchema, 
convertToAvroSchema(prunedStructSchema, tableName).toString)
-      }
-
-      val requiredSchemaReaderSkipMerging = createBaseFileReader(
-        spark = sqlContext.sparkSession,
-        partitionSchema = partitionSchema,
-        dataSchema = dataSchema,
-        requiredDataSchema = prunedRequiredSchema,
-        // This file-reader is only used in cases when no merging is 
performed, therefore it's safe to push
-        // down these filters to the base file readers
-        filters = requiredFilters ++ optionalFilters,
-        options = optParams,
-        // NOTE: We have to fork the Hadoop Config here as Spark will be 
modifying it
-        //       to configure Parquet reader appropriately
-        hadoopConf = embedInternalSchema(split.hadoopConf, 
requiredDataSchema.internalSchema)
-      )
-
-      HoodieMergeOnReadBaseFileReaders(
-        fullSchemaReader = fullSchemaReader,
-        requiredSchemaReader = requiredSchemaReader,
-        requiredSchemaReaderSkipMerging = requiredSchemaReaderSkipMerging
-      )
-    }
-  }
-}
diff --git 
a/fe/be-java-extensions/hudi-scanner/src/test/java/org/apache/doris/hudi/HudiJniScannerTest.java
 
b/fe/be-java-extensions/hudi-scanner/src/test/java/org/apache/doris/hudi/HudiJniScannerTest.java
deleted file mode 100644
index 6cdfbdc53e4..00000000000
--- 
a/fe/be-java-extensions/hudi-scanner/src/test/java/org/apache/doris/hudi/HudiJniScannerTest.java
+++ /dev/null
@@ -1,31 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.hudi;
-
-import org.junit.Test;
-
-
-/**
- * The hudi JniScanner test
- */
-public class HudiJniScannerTest {
-    @Test
-    public void testOpen() {
-    }
-
-}
diff --git a/fe/be-java-extensions/pom.xml b/fe/be-java-extensions/pom.xml
index 5d56ef76e7c..049cb6b516d 100644
--- a/fe/be-java-extensions/pom.xml
+++ b/fe/be-java-extensions/pom.xml
@@ -21,7 +21,6 @@ under the License.
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
     <modelVersion>4.0.0</modelVersion>
     <modules>
-        <module>hudi-scanner</module>
         <module>hadoop-hudi-scanner</module>
         <module>java-common</module>
         <module>java-udf</module>
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
index 987adbdc0bc..96ee3c08605 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
@@ -261,7 +261,6 @@ public class HudiScanNode extends HiveScanNode {
         fileDesc.setColumnTypes(hudiSplit.getHudiColumnTypes());
         // TODO(gaoxin): support complex types
         // fileDesc.setNestedFields(hudiSplit.getNestedFields());
-        fileDesc.setHudiJniScanner(hudiSplit.getHudiJniScanner());
         tableFormatFileDesc.setHudiParams(fileDesc);
         rangeDesc.setTableFormatParams(tableFormatFileDesc);
     }
@@ -493,7 +492,6 @@ public class HudiScanNode extends HiveScanNode {
         split.setHudiColumnNames(columnNames);
         split.setHudiColumnTypes(columnTypes);
         split.setInstantTime(queryInstant);
-        split.setHudiJniScanner(sessionVariable.getHudiJniScanner());
         return split;
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiSplit.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiSplit.java
index 2270d201793..2c3cbdb7fba 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiSplit.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiSplit.java
@@ -40,5 +40,4 @@ public class HudiSplit extends FileSplit {
     private List<String> hudiColumnNames;
     private List<String> hudiColumnTypes;
     private List<String> nestedFields;
-    private String hudiJniScanner;
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 1bbb5e51d29..55a575a9ed4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -649,8 +649,6 @@ public class SessionVariable implements Serializable, 
Writable {
 
     public static final String FORCE_JNI_SCANNER = "force_jni_scanner";
 
-    public static final String HUDI_JNI_SCANNER = "hudi_jni_scanner";
-
     public static final String ENABLE_COUNT_PUSH_DOWN_FOR_EXTERNAL_TABLE = 
"enable_count_push_down_for_external_table";
 
     public static final String SHOW_ALL_FE_CONNECTION = 
"show_all_fe_connection";
@@ -2164,10 +2162,6 @@ public class SessionVariable implements Serializable, 
Writable {
             description = {"强制使用jni方式读取外表", "Force the use of jni mode to read 
external table"})
     private boolean forceJniScanner = false;
 
-    @VariableMgr.VarAttr(name = HUDI_JNI_SCANNER, description = { "使用那种hudi 
jni scanner, 'hadoop' 或 'spark'",
-            "Which hudi jni scanner to use, 'hadoop' or 'spark'" })
-    private String hudiJniScanner = "hadoop";
-
     @VariableMgr.VarAttr(name = ENABLE_COUNT_PUSH_DOWN_FOR_EXTERNAL_TABLE,
             description = {"对外表启用 count(*) 下推优化", "enable count(*) pushdown 
optimization for external table"})
     private boolean enableCountPushDownForExternalTable = true;
@@ -4710,10 +4704,6 @@ public class SessionVariable implements Serializable, 
Writable {
         return forceJniScanner;
     }
 
-    public String getHudiJniScanner() {
-        return hudiJniScanner;
-    }
-
     public String getIgnoreSplitType() {
         return ignoreSplitType;
     }
@@ -4734,10 +4724,6 @@ public class SessionVariable implements Serializable, 
Writable {
         forceJniScanner = force;
     }
 
-    public void setHudiJniScanner(String hudiJniScanner) {
-        this.hudiJniScanner = hudiJniScanner;
-    }
-
     public boolean isEnableCountPushDownForExternalTable() {
         return enableCountPushDownForExternalTable;
     }
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index 7411383670f..a8f10a3f053 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -377,7 +377,7 @@ struct THudiFileDesc {
     8: optional list<string> column_names;
     9: optional list<string> column_types;
     10: optional list<string> nested_fields;
-    11: optional string hudi_jni_scanner;
+    11: optional string hudi_jni_scanner; // deprecated
 }
 
 struct TLakeSoulFileDesc {
diff --git 
a/regression-test/suites/external_table_p2/hudi/test_hudi_incremental.groovy 
b/regression-test/suites/external_table_p2/hudi/test_hudi_incremental.groovy
index aa7dd5bfe3d..b962936dca1 100644
--- a/regression-test/suites/external_table_p2/hudi/test_hudi_incremental.groovy
+++ b/regression-test/suites/external_table_p2/hudi/test_hudi_incremental.groovy
@@ -105,7 +105,6 @@ suite("test_hudi_incremental", 
"p2,external,hudi,external_remote,external_remote
     ]
 
     sql """set force_jni_scanner=true;"""
-    sql """set hudi_jni_scanner='hadoop';"""
     // TODO: @suxiaogang223 don't support incremental query for cow table by 
jni reader
     // test_hudi_incremental_querys("user_activity_log_cow_non_partition", 
timestamps_cow_non_partition)
     // test_hudi_incremental_querys("user_activity_log_cow_partition", 
timestamps_cow_partition)
diff --git 
a/regression-test/suites/external_table_p2/hudi/test_hudi_schema_evolution.groovy
 
b/regression-test/suites/external_table_p2/hudi/test_hudi_schema_evolution.groovy
index dbffb9ea4a4..fae889240d9 100644
--- 
a/regression-test/suites/external_table_p2/hudi/test_hudi_schema_evolution.groovy
+++ 
b/regression-test/suites/external_table_p2/hudi/test_hudi_schema_evolution.groovy
@@ -36,7 +36,6 @@ suite("test_hudi_schema_evolution", 
"p2,external,hudi,external_remote,external_r
     sql """ set enable_fallback_to_original_planner=false """
     
     sql """set force_jni_scanner = true;"""
-    sql """set hudi_jni_scanner='hadoop';"""
     qt_adding_simple_columns_table """ select * from 
adding_simple_columns_table order by id """
     qt_altering_simple_columns_table """ select * from 
altering_simple_columns_table order by id """
     // qt_deleting_simple_columns_table """ select * from 
deleting_simple_columns_table order by id """
diff --git 
a/regression-test/suites/external_table_p2/hudi/test_hudi_snapshot.groovy 
b/regression-test/suites/external_table_p2/hudi/test_hudi_snapshot.groovy
index 16918d8305d..8feec578f74 100644
--- a/regression-test/suites/external_table_p2/hudi/test_hudi_snapshot.groovy
+++ b/regression-test/suites/external_table_p2/hudi/test_hudi_snapshot.groovy
@@ -83,7 +83,6 @@ suite("test_hudi_snapshot", 
"p2,external,hudi,external_remote,external_remote_hu
     }
 
     sql """set force_jni_scanner=true;"""
-    sql """set hudi_jni_scanner='hadoop';"""
     test_hudi_snapshot_querys("user_activity_log_mor_non_partition")
     test_hudi_snapshot_querys("user_activity_log_mor_partition")
     test_hudi_snapshot_querys("user_activity_log_cow_non_partition")
diff --git 
a/regression-test/suites/external_table_p2/hudi/test_hudi_timetravel.groovy 
b/regression-test/suites/external_table_p2/hudi/test_hudi_timetravel.groovy
index 25ebfe7c453..dd61ccf4230 100644
--- a/regression-test/suites/external_table_p2/hudi/test_hudi_timetravel.groovy
+++ b/regression-test/suites/external_table_p2/hudi/test_hudi_timetravel.groovy
@@ -99,7 +99,6 @@ suite("test_hudi_timetravel", 
"p2,external,hudi,external_remote,external_remote_
     ]
 
     sql """set force_jni_scanner=true;"""
-    sql """set hudi_jni_scanner='hadoop';"""
     test_hudi_timetravel_querys("user_activity_log_cow_non_partition", 
timestamps_cow_non_partition)
     test_hudi_timetravel_querys("user_activity_log_cow_partition", 
timestamps_cow_partition)
     test_hudi_timetravel_querys("user_activity_log_mor_non_partition", 
timestamps_mor_non_partition)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to