This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 8d9bc866bd8 branch-3.1: [refactor](paimon) clean unused 
PaimonJniScanner code and refactor PaimonSysJniScanner (#54679 #54804)  (#55020)
8d9bc866bd8 is described below

commit 8d9bc866bd86da4e3d7afd329130453004e6b184
Author: Socrates <[email protected]>
AuthorDate: Thu Aug 21 12:03:54 2025 +0800

    branch-3.1: [refactor](paimon) clean unused PaimonJniScanner code and 
refactor PaimonSysJniScanner (#54679 #54804)  (#55020)
    
    bp:
    #54679  adapt session time_zone for PaimonJniScanner
    #54804 refactor PaimonSysJniScanner
---
 .../format/table/iceberg_sys_table_jni_reader.cpp  |  11 +-
 .../format/table/iceberg_sys_table_jni_reader.h    |   4 +-
 be/src/vec/exec/format/table/paimon_jni_reader.cpp |   9 +-
 .../format/table/paimon_sys_table_jni_reader.cpp   |  23 +--
 .../format/table/paimon_sys_table_jni_reader.h     |   5 +-
 be/src/vec/exec/scan/vmeta_scanner.cpp             |  31 +--
 be/src/vec/exec/scan/vmeta_scanner.h               |   2 -
 .../create_preinstalled_scripts/paimon/run08.sql   |  15 ++
 .../doris/iceberg/IcebergSysTableJniScanner.java   |  32 ++-
 .../org/apache/doris/paimon/PaimonColumnValue.java |  19 +-
 .../org/apache/doris/paimon/PaimonJniScanner.java  |   7 +-
 .../doris/paimon/PaimonSysTableJniScanner.java     | 184 +++++++----------
 .../org/apache/doris/paimon/PaimonTableCache.java  | 221 ---------------------
 .../datasource/paimon/source/PaimonScanNode.java   |  10 -
 .../datasource/tvf/source/MetadataScanNode.java    |  42 +++-
 .../tablefunction/BackendsTableValuedFunction.java |   5 +-
 .../tablefunction/CatalogsTableValuedFunction.java |   5 +-
 .../FrontendsDisksTableValuedFunction.java         |   5 +-
 .../FrontendsTableValuedFunction.java              |   5 +-
 .../tablefunction/HudiTableValuedFunction.java     |   4 +-
 .../tablefunction/IcebergTableValuedFunction.java  |  27 ++-
 .../tablefunction/JobsTableValuedFunction.java     |   5 +-
 .../tablefunction/MetadataTableValuedFunction.java |   2 +-
 .../tablefunction/MvInfosTableValuedFunction.java  |   5 +-
 .../tablefunction/PaimonTableValuedFunction.java   |  41 +---
 .../PartitionValuesTableValuedFunction.java        |   4 +-
 .../PartitionsTableValuedFunction.java             |   5 +-
 .../tablefunction/TasksTableValuedFunction.java    |   5 +-
 gensrc/thrift/PlanNodes.thrift                     |  12 +-
 .../test_paimon_timestamp_with_time_zone.out       | Bin 0 -> 483 bytes
 .../test_paimon_timestamp_with_time_zone.groovy    |  61 ++++++
 31 files changed, 294 insertions(+), 512 deletions(-)

diff --git a/be/src/vec/exec/format/table/iceberg_sys_table_jni_reader.cpp 
b/be/src/vec/exec/format/table/iceberg_sys_table_jni_reader.cpp
index 2d3519b7dd2..7d35329a2bc 100644
--- a/be/src/vec/exec/format/table/iceberg_sys_table_jni_reader.cpp
+++ b/be/src/vec/exec/format/table/iceberg_sys_table_jni_reader.cpp
@@ -27,8 +27,8 @@ static const std::string HADOOP_OPTION_PREFIX = "hadoop.";
 
 IcebergSysTableJniReader::IcebergSysTableJniReader(
         const std::vector<SlotDescriptor*>& file_slot_descs, RuntimeState* 
state,
-        RuntimeProfile* profile, const TIcebergMetadataParams& range_params)
-        : JniReader(file_slot_descs, state, profile), 
_range_params(range_params) {}
+        RuntimeProfile* profile, const TMetaScanRange& meta_scan_range)
+        : JniReader(file_slot_descs, state, profile), 
_meta_scan_range(meta_scan_range) {}
 
 Status IcebergSysTableJniReader::init_reader(
         const std::unordered_map<std::string, ColumnValueRangeType>* 
colname_to_value_range) {
@@ -39,11 +39,12 @@ Status IcebergSysTableJniReader::init_reader(
         required_types.emplace_back(JniConnector::get_jni_type(desc->type()));
     }
     std::map<std::string, std::string> params;
-    params["serialized_task"] = _range_params.serialized_task;
+    // "," is not in base64
+    params["serialized_splits"] = join(_meta_scan_range.serialized_splits, 
",");
     params["required_fields"] = join(required_fields, ",");
     params["required_types"] = join(required_types, "#");
-    params["time_zone"] = _state->timezone_obj().name();
-    for (const auto& kv : _range_params.hadoop_props) {
+    params["time_zone"] = _state->timezone();
+    for (const auto& kv : _meta_scan_range.hadoop_props) {
         params[HADOOP_OPTION_PREFIX + kv.first] = kv.second;
     }
     _jni_connector =
diff --git a/be/src/vec/exec/format/table/iceberg_sys_table_jni_reader.h 
b/be/src/vec/exec/format/table/iceberg_sys_table_jni_reader.h
index ed867e46abe..982f4357343 100644
--- a/be/src/vec/exec/format/table/iceberg_sys_table_jni_reader.h
+++ b/be/src/vec/exec/format/table/iceberg_sys_table_jni_reader.h
@@ -47,7 +47,7 @@ class IcebergSysTableJniReader : public JniReader {
 public:
     IcebergSysTableJniReader(const std::vector<SlotDescriptor*>& 
file_slot_descs,
                              RuntimeState* state, RuntimeProfile* profile,
-                             const TIcebergMetadataParams& range_params);
+                             const TMetaScanRange& meta_scan_range);
 
     ~IcebergSysTableJniReader() override = default;
 
@@ -55,7 +55,7 @@ public:
             const std::unordered_map<std::string, ColumnValueRangeType>* 
colname_to_value_range);
 
 private:
-    const TIcebergMetadataParams& _range_params;
+    const TMetaScanRange& _meta_scan_range;
 };
 
 #include "common/compile_check_end.h"
diff --git a/be/src/vec/exec/format/table/paimon_jni_reader.cpp 
b/be/src/vec/exec/format/table/paimon_jni_reader.cpp
index 8b10e3a54ff..b603a3ccc54 100644
--- a/be/src/vec/exec/format/table/paimon_jni_reader.cpp
+++ b/be/src/vec/exec/format/table/paimon_jni_reader.cpp
@@ -49,18 +49,11 @@ PaimonJniReader::PaimonJniReader(const 
std::vector<SlotDescriptor*>& file_slot_d
         column_types.emplace_back(JniConnector::get_jni_type(desc->type()));
     }
     std::map<String, String> params;
-    params["db_name"] = range.table_format_params.paimon_params.db_name;
-    params["table_name"] = range.table_format_params.paimon_params.table_name;
     params["paimon_split"] = 
range.table_format_params.paimon_params.paimon_split;
-    params["paimon_column_names"] = 
range.table_format_params.paimon_params.paimon_column_names;
     params["paimon_predicate"] = 
range.table_format_params.paimon_params.paimon_predicate;
-    params["ctl_id"] = 
std::to_string(range.table_format_params.paimon_params.ctl_id);
-    params["db_id"] = 
std::to_string(range.table_format_params.paimon_params.db_id);
-    params["tbl_id"] = 
std::to_string(range.table_format_params.paimon_params.tbl_id);
-    params["last_update_time"] =
-            
std::to_string(range.table_format_params.paimon_params.last_update_time);
     params["required_fields"] = join(column_names, ",");
     params["columns_types"] = join(column_types, "#");
+    params["time_zone"] = _state->timezone();
     if (range_params->__isset.serialized_table) {
         params["serialized_table"] = range_params->serialized_table;
     }
diff --git a/be/src/vec/exec/format/table/paimon_sys_table_jni_reader.cpp 
b/be/src/vec/exec/format/table/paimon_sys_table_jni_reader.cpp
index f63de6586cf..56daa90d0c2 100644
--- a/be/src/vec/exec/format/table/paimon_sys_table_jni_reader.cpp
+++ b/be/src/vec/exec/format/table/paimon_sys_table_jni_reader.cpp
@@ -24,12 +24,11 @@ namespace doris::vectorized {
 #include "common/compile_check_begin.h"
 
 const std::string PaimonSysTableJniReader::HADOOP_OPTION_PREFIX = "hadoop.";
-const std::string PaimonSysTableJniReader::PAIMON_OPTION_PREFIX = "paimon.";
 
 PaimonSysTableJniReader::PaimonSysTableJniReader(
         const std::vector<SlotDescriptor*>& file_slot_descs, RuntimeState* 
state,
-        RuntimeProfile* profile, const TPaimonMetadataParams& range_params)
-        : JniReader(file_slot_descs, state, profile), 
_range_params(range_params) {
+        RuntimeProfile* profile, const TMetaScanRange& meta_scan_range)
+        : JniReader(file_slot_descs, state, profile), 
_meta_scan_range(meta_scan_range) {
     std::vector<std::string> required_fields;
     std::vector<std::string> required_types;
     for (const auto& desc : _file_slot_descs) {
@@ -38,21 +37,13 @@ PaimonSysTableJniReader::PaimonSysTableJniReader(
     }
 
     std::map<std::string, std::string> params;
-    params["db_name"] = _range_params.db_name;
-    params["tbl_name"] = _range_params.tbl_name;
-    params["query_type"] = _range_params.query_type;
-    params["ctl_id"] = std::to_string(_range_params.ctl_id);
-    params["db_id"] = std::to_string(_range_params.db_id);
-    params["tbl_id"] = std::to_string(_range_params.tbl_id);
-    params["serialized_split"] = _range_params.serialized_split;
+    params["serialized_table"] = _meta_scan_range.serialized_table;
+    // "," is not in base64
+    params["serialized_splits"] = join(_meta_scan_range.serialized_splits, 
",");
     params["required_fields"] = join(required_fields, ",");
     params["required_types"] = join(required_types, "#");
-
-    for (const auto& kv : _range_params.paimon_props) {
-        params[PAIMON_OPTION_PREFIX + kv.first] = kv.second;
-    }
-
-    for (const auto& kv : _range_params.hadoop_props) {
+    params["time_zone"] = _state->timezone();
+    for (const auto& kv : _meta_scan_range.hadoop_props) {
         params[HADOOP_OPTION_PREFIX + kv.first] = kv.second;
     }
 
diff --git a/be/src/vec/exec/format/table/paimon_sys_table_jni_reader.h 
b/be/src/vec/exec/format/table/paimon_sys_table_jni_reader.h
index f7b3108f5af..a6f43899e2d 100644
--- a/be/src/vec/exec/format/table/paimon_sys_table_jni_reader.h
+++ b/be/src/vec/exec/format/table/paimon_sys_table_jni_reader.h
@@ -46,10 +46,9 @@ class PaimonSysTableJniReader : public JniReader {
 
 public:
     static const std::string HADOOP_OPTION_PREFIX;
-    static const std::string PAIMON_OPTION_PREFIX;
     PaimonSysTableJniReader(const std::vector<SlotDescriptor*>& 
file_slot_descs,
                             RuntimeState* state, RuntimeProfile* profile,
-                            const TPaimonMetadataParams& range_params);
+                            const TMetaScanRange& meta_scan_range);
 
     ~PaimonSysTableJniReader() override = default;
 
@@ -58,7 +57,7 @@ public:
 
 private:
     const std::unordered_map<std::string, ColumnValueRangeType>* 
_colname_to_value_range;
-    const TPaimonMetadataParams& _range_params;
+    const TMetaScanRange& _meta_scan_range;
 };
 
 #include "common/compile_check_end.h"
diff --git a/be/src/vec/exec/scan/vmeta_scanner.cpp 
b/be/src/vec/exec/scan/vmeta_scanner.cpp
index b8ae79925ac..0860d2bb979 100644
--- a/be/src/vec/exec/scan/vmeta_scanner.cpp
+++ b/be/src/vec/exec/scan/vmeta_scanner.cpp
@@ -67,14 +67,14 @@ Status VMetaScanner::open(RuntimeState* state) {
     RETURN_IF_ERROR(VScanner::open(state));
     if (_scan_range.meta_scan_range.metadata_type == TMetadataType::ICEBERG) {
         // TODO: refactor this code
-        auto reader = IcebergSysTableJniReader::create_unique(
-                _tuple_desc->slots(), state, _profile, 
_scan_range.meta_scan_range.iceberg_params);
+        auto reader = 
IcebergSysTableJniReader::create_unique(_tuple_desc->slots(), state, _profile,
+                                                              
_scan_range.meta_scan_range);
         const std::unordered_map<std::string, ColumnValueRangeType> 
colname_to_value_range;
         RETURN_IF_ERROR(reader->init_reader(&colname_to_value_range));
         _reader = std::move(reader);
     } else if (_scan_range.meta_scan_range.metadata_type == 
TMetadataType::PAIMON) {
-        auto reader = PaimonSysTableJniReader::create_unique(
-                _tuple_desc->slots(), state, _profile, 
_scan_range.meta_scan_range.paimon_params);
+        auto reader = 
PaimonSysTableJniReader::create_unique(_tuple_desc->slots(), state, _profile,
+                                                             
_scan_range.meta_scan_range);
         const std::unordered_map<std::string, ColumnValueRangeType> 
colname_to_value_range;
         RETURN_IF_ERROR(reader->init_reader(&colname_to_value_range));
         _reader = std::move(reader);
@@ -251,9 +251,6 @@ Status VMetaScanner::_fetch_metadata(const TMetaScanRange& 
meta_scan_range) {
     VLOG_CRITICAL << "VMetaScanner::_fetch_metadata";
     TFetchSchemaTableDataRequest request;
     switch (meta_scan_range.metadata_type) {
-    case TMetadataType::ICEBERG:
-        RETURN_IF_ERROR(_build_iceberg_metadata_request(meta_scan_range, 
&request));
-        break;
     case TMetadataType::HUDI:
         RETURN_IF_ERROR(_build_hudi_metadata_request(meta_scan_range, 
&request));
         break;
@@ -319,26 +316,6 @@ Status VMetaScanner::_fetch_metadata(const TMetaScanRange& 
meta_scan_range) {
     return Status::OK();
 }
 
-Status VMetaScanner::_build_iceberg_metadata_request(const TMetaScanRange& 
meta_scan_range,
-                                                     
TFetchSchemaTableDataRequest* request) {
-    VLOG_CRITICAL << "VMetaScanner::_build_iceberg_metadata_request";
-    if (!meta_scan_range.__isset.iceberg_params) {
-        return Status::InternalError("Can not find TIcebergMetadataParams from 
meta_scan_range.");
-    }
-
-    // create request
-    request->__set_cluster_name("");
-    request->__set_schema_table_name(TSchemaTableName::METADATA_TABLE);
-
-    // create TMetadataTableRequestParams
-    TMetadataTableRequestParams metadata_table_params;
-    metadata_table_params.__set_metadata_type(TMetadataType::ICEBERG);
-    
metadata_table_params.__set_iceberg_metadata_params(meta_scan_range.iceberg_params);
-
-    request->__set_metada_table_params(metadata_table_params);
-    return Status::OK();
-}
-
 Status VMetaScanner::_build_hudi_metadata_request(const TMetaScanRange& 
meta_scan_range,
                                                   
TFetchSchemaTableDataRequest* request) {
     VLOG_CRITICAL << "VMetaScanner::_build_hudi_metadata_request";
diff --git a/be/src/vec/exec/scan/vmeta_scanner.h 
b/be/src/vec/exec/scan/vmeta_scanner.h
index 7b344faf18b..1974a713584 100644
--- a/be/src/vec/exec/scan/vmeta_scanner.h
+++ b/be/src/vec/exec/scan/vmeta_scanner.h
@@ -64,8 +64,6 @@ protected:
 private:
     Status _fill_block_with_remote_data(const std::vector<MutableColumnPtr>& 
columns);
     Status _fetch_metadata(const TMetaScanRange& meta_scan_range);
-    Status _build_iceberg_metadata_request(const TMetaScanRange& 
meta_scan_range,
-                                           TFetchSchemaTableDataRequest* 
request);
     Status _build_hudi_metadata_request(const TMetaScanRange& meta_scan_range,
                                         TFetchSchemaTableDataRequest* request);
     Status _build_backends_metadata_request(const TMetaScanRange& 
meta_scan_range,
diff --git 
a/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/paimon/run08.sql
 
b/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/paimon/run08.sql
new file mode 100644
index 00000000000..073d26548a0
--- /dev/null
+++ 
b/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/paimon/run08.sql
@@ -0,0 +1,15 @@
+use paimon;
+create database if not exists test_paimon_spark;
+use test_paimon_spark;
+
+SET TIME ZONE '+08:00';
+
+CREATE TABLE IF NOT EXISTS t_ts_ntz (
+  id INT,
+  ts TIMESTAMP,
+  ts_ntz TIMESTAMP_NTZ
+) USING paimon;
+
+INSERT INTO t_ts_ntz VALUES
+  (1, CAST('2025-08-12 06:00:00+00:00' AS TIMESTAMP), CAST('2025-08-12 
06:00:00' AS TIMESTAMP_NTZ)),
+  (2, CAST('2025-08-12 14:00:00+08:00' AS TIMESTAMP), CAST('2025-08-12 
14:00:00' AS TIMESTAMP_NTZ));
\ No newline at end of file
diff --git 
a/fe/be-java-extensions/iceberg-metadata-scanner/src/main/java/org/apache/doris/iceberg/IcebergSysTableJniScanner.java
 
b/fe/be-java-extensions/iceberg-metadata-scanner/src/main/java/org/apache/doris/iceberg/IcebergSysTableJniScanner.java
index 350d0d872c3..9603ed35504 100644
--- 
a/fe/be-java-extensions/iceberg-metadata-scanner/src/main/java/org/apache/doris/iceberg/IcebergSysTableJniScanner.java
+++ 
b/fe/be-java-extensions/iceberg-metadata-scanner/src/main/java/org/apache/doris/iceberg/IcebergSysTableJniScanner.java
@@ -23,6 +23,7 @@ import org.apache.doris.common.jni.vec.ColumnValue;
 import 
org.apache.doris.common.security.authentication.PreExecutionAuthenticator;
 import 
org.apache.doris.common.security.authentication.PreExecutionAuthenticatorCache;
 
+import com.google.common.base.Preconditions;
 import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.StructLike;
 import org.apache.iceberg.io.CloseableIterator;
@@ -34,6 +35,8 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.TimeZone;
@@ -47,16 +50,21 @@ public class IcebergSysTableJniScanner extends JniScanner {
     private static final String HADOOP_OPTION_PREFIX = "hadoop.";
     private final ClassLoader classLoader;
     private final PreExecutionAuthenticator preExecutionAuthenticator;
-    private final FileScanTask scanTask;
+    private final Iterator<FileScanTask> scanTasks;
     private final List<NestedField> fields;
     private final String timezone;
     private CloseableIterator<StructLike> reader;
 
     public IcebergSysTableJniScanner(int batchSize, Map<String, String> 
params) {
         this.classLoader = this.getClass().getClassLoader();
-        this.scanTask = 
SerializationUtil.deserializeFromBase64(params.get("serialized_task"));
+        List<FileScanTask> scanTasks = 
Arrays.stream(params.get("serialized_splits").split(","))
+                .map(SerializationUtil::deserializeFromBase64)
+                .map(obj -> (FileScanTask) obj)
+                .collect(Collectors.toList());
+        Preconditions.checkState(!scanTasks.isEmpty(), "scanTasks shoudle not 
be empty");
+        this.scanTasks = scanTasks.iterator();
         String[] requiredFields = params.get("required_fields").split(",");
-        this.fields = selectSchema(scanTask.schema().asStruct(), 
requiredFields);
+        this.fields = selectSchema(scanTasks.get(0).schema().asStruct(), 
requiredFields);
         this.timezone = params.getOrDefault("time_zone", 
TimeZone.getDefault().getID());
         Map<String, String> hadoopOptionParams = params.entrySet().stream()
                 .filter(kv -> kv.getKey().startsWith(HADOOP_OPTION_PREFIX))
@@ -69,8 +77,14 @@ public class IcebergSysTableJniScanner extends JniScanner {
 
     @Override
     public void open() throws IOException {
+        Thread.currentThread().setContextClassLoader(classLoader);
+        nextScanTask();
+    }
+
+    private void nextScanTask() throws IOException {
+        Preconditions.checkArgument(scanTasks.hasNext());
+        FileScanTask scanTask = scanTasks.next();
         try {
-            Thread.currentThread().setContextClassLoader(classLoader);
             preExecutionAuthenticator.execute(() -> {
                 // execute FileScanTask to get rows
                 reader = scanTask.asDataTask().rows().iterator();
@@ -78,7 +92,7 @@ public class IcebergSysTableJniScanner extends JniScanner {
             });
         } catch (Exception e) {
             this.close();
-            String msg = String.format("Failed to open 
IcebergMetadataJniScanner");
+            String msg = String.format("Failed to open next scan task: %s", 
scanTask);
             LOG.error(msg, e);
             throw new IOException(msg, e);
         }
@@ -86,11 +100,11 @@ public class IcebergSysTableJniScanner extends JniScanner {
 
     @Override
     protected int getNext() throws IOException {
-        if (reader == null) {
-            return 0;
-        }
         int rows = 0;
-        while (reader.hasNext() && rows < getBatchSize()) {
+        while ((reader.hasNext() || scanTasks.hasNext()) && rows < 
getBatchSize()) {
+            if (!reader.hasNext()) {
+                nextScanTask();
+            }
             StructLike row = reader.next();
             for (int i = 0; i < fields.size(); i++) {
                 NestedField field = fields.get(i);
diff --git 
a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonColumnValue.java
 
b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonColumnValue.java
index 73aa6ce8550..af8a13149e1 100644
--- 
a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonColumnValue.java
+++ 
b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonColumnValue.java
@@ -46,15 +46,17 @@ public class PaimonColumnValue implements ColumnValue {
     private DataGetters record;
     private ColumnType dorisType;
     private DataType dataType;
+    private String timeZone;
 
     public PaimonColumnValue() {
     }
 
-    public PaimonColumnValue(DataGetters record, int idx, ColumnType 
columnType, DataType dataType) {
+    public PaimonColumnValue(DataGetters record, int idx, ColumnType 
columnType, DataType dataType, String timeZone) {
         this.idx = idx;
         this.record = record;
         this.dorisType = columnType;
         this.dataType = dataType;
+        this.timeZone = timeZone;
     }
 
     public void setIdx(int idx, ColumnType dorisType, DataType dataType) {
@@ -67,6 +69,10 @@ public class PaimonColumnValue implements ColumnValue {
         this.record = record;
     }
 
+    public void setTimeZone(String timeZone) {
+        this.timeZone = timeZone;
+    }
+
     @Override
     public boolean canGetStringAsBytes() {
         return true;
@@ -136,7 +142,8 @@ public class PaimonColumnValue implements ColumnValue {
     public LocalDateTime getDateTime() {
         Timestamp ts = record.getTimestamp(idx, dorisType.getPrecision());
         if (dataType instanceof LocalZonedTimestampType) {
-            return LocalDateTime.ofInstant(ts.toInstant(), 
ZoneId.systemDefault());
+            return ts.toLocalDateTime().atZone(ZoneId.of("UTC"))
+                    
.withZoneSameInstant(ZoneId.of(timeZone)).toLocalDateTime();
         } else {
             return ts.toLocalDateTime();
         }
@@ -157,7 +164,7 @@ public class PaimonColumnValue implements ColumnValue {
         InternalArray recordArray = record.getArray(idx);
         for (int i = 0; i < recordArray.size(); i++) {
             PaimonColumnValue arrayColumnValue = new 
PaimonColumnValue((DataGetters) recordArray, i,
-                    dorisType.getChildTypes().get(0), ((ArrayType) 
dataType).getElementType());
+                    dorisType.getChildTypes().get(0), ((ArrayType) 
dataType).getElementType(), timeZone);
             values.add(arrayColumnValue);
         }
     }
@@ -168,13 +175,13 @@ public class PaimonColumnValue implements ColumnValue {
         InternalArray key = map.keyArray();
         for (int i = 0; i < key.size(); i++) {
             PaimonColumnValue keyColumnValue = new 
PaimonColumnValue((DataGetters) key, i,
-                    dorisType.getChildTypes().get(0), ((MapType) 
dataType).getKeyType());
+                    dorisType.getChildTypes().get(0), ((MapType) 
dataType).getKeyType(), timeZone);
             keys.add(keyColumnValue);
         }
         InternalArray value = map.valueArray();
         for (int i = 0; i < value.size(); i++) {
             PaimonColumnValue valueColumnValue = new 
PaimonColumnValue((DataGetters) value, i,
-                    dorisType.getChildTypes().get(1), ((MapType) 
dataType).getValueType());
+                    dorisType.getChildTypes().get(1), ((MapType) 
dataType).getValueType(), timeZone);
             values.add(valueColumnValue);
         }
     }
@@ -185,7 +192,7 @@ public class PaimonColumnValue implements ColumnValue {
         InternalRow row = record.getRow(idx, structFieldIndex.size());
         for (int i : structFieldIndex) {
             values.add(new PaimonColumnValue(row, i, 
dorisType.getChildTypes().get(i),
-                    ((RowType) dataType).getFields().get(i).type()));
+                    ((RowType) dataType).getFields().get(i).type(), timeZone));
         }
     }
 }
diff --git 
a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java
 
b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java
index d25d5f7b94f..4dc1fe90a33 100644
--- 
a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java
+++ 
b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java
@@ -39,17 +39,14 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
+import java.util.TimeZone;
 import java.util.stream.Collectors;
 
 public class PaimonJniScanner extends JniScanner {
     private static final Logger LOG = 
LoggerFactory.getLogger(PaimonJniScanner.class);
-    @Deprecated
-    private static final String PAIMON_OPTION_PREFIX = "paimon.";
-    @Deprecated
     private static final String HADOOP_OPTION_PREFIX = "hadoop.";
 
     private final Map<String, String> params;
-    @Deprecated
     private final Map<String, String> hadoopOptionParams;
     private final String paimonSplit;
     private final String paimonPredicate;
@@ -76,6 +73,8 @@ public class PaimonJniScanner extends JniScanner {
         }
         paimonSplit = params.get("paimon_split");
         paimonPredicate = params.get("paimon_predicate");
+        String timeZone = params.getOrDefault("time_zone", 
TimeZone.getDefault().getID());
+        columnValue.setTimeZone(timeZone);
         initTableInfo(columnTypes, requiredFields, batchSize);
         hadoopOptionParams = params.entrySet().stream()
                 .filter(kv -> kv.getKey().startsWith(HADOOP_OPTION_PREFIX))
diff --git 
a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonSysTableJniScanner.java
 
b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonSysTableJniScanner.java
index 0344c8b2db4..79289bb8256 100644
--- 
a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonSysTableJniScanner.java
+++ 
b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonSysTableJniScanner.java
@@ -21,9 +21,8 @@ import org.apache.doris.common.jni.JniScanner;
 import org.apache.doris.common.jni.vec.ColumnType;
 import 
org.apache.doris.common.security.authentication.PreExecutionAuthenticator;
 import 
org.apache.doris.common.security.authentication.PreExecutionAuthenticatorCache;
-import org.apache.doris.paimon.PaimonTableCache.PaimonTableCacheKey;
-import org.apache.doris.paimon.PaimonTableCache.TableExt;
 
+import com.google.common.base.Preconditions;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.table.Table;
@@ -36,39 +35,32 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.TimeZone;
 import java.util.stream.Collectors;
 
 /**
  * JNI-based scanner for reading Apache Paimon system tables
+ * TODO: unify this with PaimonJniScanner in future
  */
 public class PaimonSysTableJniScanner extends JniScanner {
     private static final Logger LOG = 
LoggerFactory.getLogger(PaimonSysTableJniScanner.class);
 
     private static final String HADOOP_OPTION_PREFIX = "hadoop.";
-    private static final String PAIMON_OPTION_PREFIX = "paimon.";
-
-    private final Map<String, String> params;
-    private final Map<String, String> hadoopOptionParams;
-    private final Map<String, String> paimonOptionParams;
 
     private final ClassLoader classLoader;
-    private final Split paimonSplit;
-    private Table table;
-    private RecordReader<InternalRow> reader;
+    private final Table table;
+    private final Iterator<Split> paimonSplits;
     private final PaimonColumnValue columnValue = new PaimonColumnValue();
-    private List<DataType> paimonDataTypeList;
-    private List<String> paimonAllFieldNames;
+    private final List<String> paimonAllFieldNames;
+    private final int[] projected;
+    private final List<DataType> paimonDataTypeList;
     private final PreExecutionAuthenticator preExecutionAuthenticator;
-    private RecordReader.RecordIterator<InternalRow> recordIterator = null;
-    private final long ctlId;
-    private final long dbId;
-    private final long tblId;
-    private final String dbName;
-    private final String tblName;
-    private final String queryType;
+    private RecordReader<InternalRow> reader;
+    private RecordReader.RecordIterator<InternalRow> recordIterator;
 
     /**
      * Constructs a new PaimonSysTableJniScanner for reading Paimon system 
tables.
@@ -78,52 +70,60 @@ public class PaimonSysTableJniScanner extends JniScanner {
         if (LOG.isDebugEnabled()) {
             LOG.debug("params:{}", params);
         }
-        this.params = params;
         String[] requiredFields = params.get("required_fields").split(",");
         String[] requiredTypes = params.get("required_types").split("#");
         ColumnType[] columnTypes = new ColumnType[requiredTypes.length];
         for (int i = 0; i < requiredTypes.length; i++) {
             columnTypes[i] = ColumnType.parseType(requiredFields[i], 
requiredTypes[i]);
         }
+        String timeZone = params.getOrDefault("time_zone", 
TimeZone.getDefault().getID());
+        columnValue.setTimeZone(timeZone);
         initTableInfo(columnTypes, requiredFields, batchSize);
-        this.paimonSplit = 
PaimonUtils.deserialize(params.get("serialized_split"));
-        this.ctlId = Long.parseLong(params.get("ctl_id"));
-        this.dbId = Long.parseLong(params.get("db_id"));
-        this.tblId = Long.parseLong(params.get("tbl_id"));
-        this.dbName = params.get("db_name");
-        this.tblName = params.get("tbl_name");
-        this.queryType = params.get("query_type");
-        this.hadoopOptionParams = params.entrySet().stream()
+        this.table = PaimonUtils.deserialize(params.get("serialized_table"));
+        this.paimonAllFieldNames = 
PaimonUtils.getFieldNames(this.table.rowType());
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("paimonAllFieldNames:{}", paimonAllFieldNames);
+        }
+        resetDatetimeV2Precision();
+        this.projected = 
Arrays.stream(fields).mapToInt(paimonAllFieldNames::indexOf).toArray();
+        this.paimonDataTypeList = Arrays.stream(projected).mapToObj(i -> 
table.rowType().getTypeAt(i))
+                .collect(Collectors.toList());
+        this.paimonSplits = 
Arrays.stream(params.get("serialized_splits").split(","))
+                .map(PaimonUtils::deserialize).map(obj -> (Split) obj)
+                .collect(Collectors.toList()).iterator();
+        Map<String, String> hadoopOptionParams = params.entrySet().stream()
                 .filter(kv -> kv.getKey().startsWith(HADOOP_OPTION_PREFIX))
-                .collect(Collectors
-                        .toMap(kv1 -> 
kv1.getKey().substring(HADOOP_OPTION_PREFIX.length()),
-                                Entry::getValue));
-        this.paimonOptionParams = params.entrySet().stream()
-                .filter(kv -> kv.getKey().startsWith(PAIMON_OPTION_PREFIX))
-                .collect(Collectors
-                        .toMap(kv1 -> 
kv1.getKey().substring(PAIMON_OPTION_PREFIX.length()),
-                                Entry::getValue));
+                .collect(Collectors.toMap(kv1 -> 
kv1.getKey().substring(HADOOP_OPTION_PREFIX.length()),
+                        Entry::getValue));
         this.preExecutionAuthenticator = 
PreExecutionAuthenticatorCache.getAuthenticator(hadoopOptionParams);
     }
 
     @Override
-    public void open() {
+    public void open() throws IOException {
+        Thread.currentThread().setContextClassLoader(classLoader);
+        if (!paimonSplits.hasNext()) {
+            throw new IOException("Failed to open PaimonSysTableJniScanner: No 
valid splits found");
+        }
+        nextReader();
+    }
+
+    private void nextReader() throws IOException {
+        Preconditions.checkArgument(paimonSplits.hasNext(), "No more splits 
available");
+        Split paimonSplit = paimonSplits.next();
+        ReadBuilder readBuilder = table.newReadBuilder();
+        readBuilder.withProjection(projected);
         try {
-            // When the user does not specify hive-site.xml, Paimon will look 
for the file from the classpath:
-            //    org.apache.paimon.hive.HiveCatalog.createHiveConf:
-            //        
`Thread.currentThread().getContextClassLoader().getResource(HIVE_SITE_FILE)`
-            // so we need to provide a classloader, otherwise it will cause 
NPE.
-            Thread.currentThread().setContextClassLoader(classLoader);
             preExecutionAuthenticator.execute(() -> {
-                initTable();
-                initReader();
+                reader = 
readBuilder.newRead().executeFilter().createReader(paimonSplit);
+                Preconditions.checkArgument(recordIterator == null);
+                recordIterator = reader.readBatch();
                 return null;
             });
-            resetDatetimeV2Precision();
-
-        } catch (Throwable e) {
-            LOG.warn("Failed to open paimon_scanner: {}", e.getMessage(), e);
-            throw new RuntimeException(e);
+        } catch (Exception e) {
+            this.close();
+            String msg = String.format("Failed to open next paimonSplit: %s", 
paimonSplits);
+            LOG.error(msg, e);
+            throw new IOException(msg, e);
         }
     }
 
@@ -139,48 +139,10 @@ public class PaimonSysTableJniScanner extends JniScanner {
         try {
             return 
preExecutionAuthenticator.execute(this::readAndProcessNextBatch);
         } catch (Exception e) {
-            throw new RuntimeException(e);
+            throw new IOException("Failed to getNext in 
PaimonSysTableJniScanner", e);
         }
     }
 
-    private void initTable() {
-        PaimonTableCacheKey key = new PaimonTableCacheKey(ctlId, dbId, tblId,
-                paimonOptionParams, hadoopOptionParams, dbName, tblName, 
queryType);
-        TableExt tableExt = PaimonTableCache.getTable(key);
-        Table paimonTable = tableExt.getTable();
-        if (paimonTable == null) {
-            throw new RuntimeException(
-                    String.format(
-                            "Failed to get Paimon system table  
{%s}.{%s}${%s}. ",
-                            dbName, tblName, queryType));
-        }
-        this.table = paimonTable;
-        this.paimonAllFieldNames = 
PaimonUtils.getFieldNames(this.table.rowType());
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("paimonAllFieldNames:{}", paimonAllFieldNames);
-        }
-    }
-
-    private void initReader() throws IOException {
-        ReadBuilder readBuilder = table.newReadBuilder();
-        if (this.fields.length > this.paimonAllFieldNames.size()) {
-            throw new IOException(
-                    String.format(
-                            "The jni reader fields' size {%s} is not matched 
with paimon fields' size {%s}."
-                                    + " Please refresh table and try again",
-                            fields.length, paimonAllFieldNames.size()));
-        }
-        int[] projected = getProjected();
-        readBuilder.withProjection(projected);
-        reader = 
readBuilder.newRead().executeFilter().createReader(paimonSplit);
-        paimonDataTypeList =
-                Arrays.stream(projected).mapToObj(i -> 
table.rowType().getTypeAt(i)).collect(Collectors.toList());
-    }
-
-    private int[] getProjected() {
-        return 
Arrays.stream(fields).mapToInt(paimonAllFieldNames::indexOf).toArray();
-    }
-
     private void resetDatetimeV2Precision() {
         for (int i = 0; i < types.length; i++) {
             if (types[i].isDateTimeV2()) {
@@ -199,35 +161,27 @@ public class PaimonSysTableJniScanner extends JniScanner {
 
     private int readAndProcessNextBatch() throws IOException {
         int rows = 0;
-        try {
-            if (recordIterator == null) {
-                recordIterator = reader.readBatch();
-            }
-
-            while (recordIterator != null) {
-                InternalRow record;
-                while ((record = recordIterator.next()) != null) {
-                    columnValue.setOffsetRow(record);
-                    for (int i = 0; i < fields.length; i++) {
-                        columnValue.setIdx(i, types[i], 
paimonDataTypeList.get(i));
-                        long l = System.nanoTime();
-                        appendData(i, columnValue);
-                        appendDataTime += System.nanoTime() - l;
-                    }
-                    rows++;
-                    if (rows >= batchSize) {
-                        return rows;
-                    }
+        while (recordIterator != null) {
+            InternalRow record;
+            while ((record = recordIterator.next()) != null) {
+                columnValue.setOffsetRow(record);
+                for (int i = 0; i < fields.length; i++) {
+                    columnValue.setIdx(i, types[i], paimonDataTypeList.get(i));
+                    long l = System.nanoTime();
+                    appendData(i, columnValue);
+                    appendDataTime += System.nanoTime() - l;
+                }
+                rows++;
+                if (rows >= batchSize) {
+                    return rows;
                 }
-                recordIterator.releaseBatch();
-                recordIterator = reader.readBatch();
             }
-        } catch (Exception e) {
-            close();
-            LOG.warn("Failed to get the next batch of paimon. "
-                            + "split: {}, requiredFieldNames: {}, 
paimonAllFieldNames: {}, dataType: {}",
-                    paimonSplit, params.get("required_fields"), 
paimonAllFieldNames, paimonDataTypeList, e);
-            throw new IOException(e);
+            recordIterator.releaseBatch();
+            recordIterator = reader.readBatch();
+            if (recordIterator == null && paimonSplits.hasNext()) {
+                // try to get next reader
+                nextReader();
+            }
         }
         return rows;
     }
diff --git 
a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonTableCache.java
 
b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonTableCache.java
deleted file mode 100644
index e5f067af96b..00000000000
--- 
a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonTableCache.java
+++ /dev/null
@@ -1,221 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.paimon;
-
-import com.google.common.base.Objects;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.paimon.catalog.Catalog;
-import org.apache.paimon.catalog.CatalogContext;
-import org.apache.paimon.catalog.CatalogFactory;
-import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.options.Options;
-import org.apache.paimon.table.Table;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-
-public class PaimonTableCache {
-    // Max cache num of paimon table
-    public static final long max_external_schema_cache_num = 50;
-    // The expiration time of a cache object after last access of it.
-    public static final long external_cache_expire_time_minutes_after_access = 
100;
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(PaimonTableCache.class);
-
-    private static LoadingCache<PaimonTableCacheKey, TableExt> tableCache = 
CacheBuilder.newBuilder()
-            .maximumSize(max_external_schema_cache_num)
-            
.expireAfterAccess(external_cache_expire_time_minutes_after_access, 
TimeUnit.MINUTES)
-            .build(new CacheLoader<PaimonTableCacheKey, TableExt>() {
-                @Override
-                public TableExt load(PaimonTableCacheKey key) {
-                    return loadTable(key);
-                }
-            });
-
-    private static TableExt loadTable(PaimonTableCacheKey key) {
-        try {
-            LOG.warn("load table:{}", key);
-            Catalog catalog = createCatalog(key.getPaimonOptionParams(), 
key.getHadoopOptionParams());
-            Table table;
-            if (key.getQueryType() != null) {
-                table = catalog.getTable(new Identifier(key.getDbName(), 
key.getTblName(),
-                        null, key.getQueryType()));
-            } else {
-                table = catalog.getTable(Identifier.create(key.getDbName(), 
key.getTblName()));
-            }
-            return new TableExt(table, System.currentTimeMillis());
-        } catch (Catalog.TableNotExistException e) {
-            LOG.warn("failed to create paimon table ", e);
-            throw new RuntimeException(e);
-        }
-    }
-
-    private static Catalog createCatalog(
-            Map<String, String> paimonOptionParams,
-            Map<String, String> hadoopOptionParams) {
-        Options options = new Options();
-        paimonOptionParams.entrySet().stream().forEach(kv -> 
options.set(kv.getKey(), kv.getValue()));
-        Configuration hadoopConf = new Configuration();
-        hadoopOptionParams.entrySet().stream().forEach(kv -> 
hadoopConf.set(kv.getKey(), kv.getValue()));
-        CatalogContext context = CatalogContext.create(options, hadoopConf);
-        return CatalogFactory.createCatalog(context);
-    }
-
-    public static void invalidateTableCache(PaimonTableCacheKey key) {
-        tableCache.invalidate(key);
-    }
-
-    public static TableExt getTable(PaimonTableCacheKey key) {
-        try {
-            return tableCache.get(key);
-        } catch (ExecutionException e) {
-            throw new RuntimeException("failed to get table for:" + key);
-        }
-    }
-
-    public static class TableExt {
-        private Table table;
-        private long createTime;
-
-        public TableExt(Table table, long createTime) {
-            this.table = table;
-            this.createTime = createTime;
-        }
-
-        public Table getTable() {
-            return table;
-        }
-
-        public long getCreateTime() {
-            return createTime;
-        }
-    }
-
-    public static class PaimonTableCacheKey {
-        // in key
-        private final long ctlId;
-        private final long dbId;
-        private final long tblId;
-
-        // not in key
-        private Map<String, String> paimonOptionParams;
-        private Map<String, String> hadoopOptionParams;
-        private String dbName;
-        private String tblName;
-        private String queryType;
-
-        public PaimonTableCacheKey(long ctlId, long dbId, long tblId,
-                Map<String, String> paimonOptionParams,
-                Map<String, String> hadoopOptionParams,
-                String dbName, String tblName) {
-            this.ctlId = ctlId;
-            this.dbId = dbId;
-            this.tblId = tblId;
-            this.paimonOptionParams = paimonOptionParams;
-            this.hadoopOptionParams = hadoopOptionParams;
-            this.dbName = dbName;
-            this.tblName = tblName;
-        }
-
-        public PaimonTableCacheKey(long ctlId, long dbId, long tblId,
-                Map<String, String> paimonOptionParams,
-                Map<String, String> hadoopOptionParams,
-                String dbName, String tblName, String queryType) {
-            this.ctlId = ctlId;
-            this.dbId = dbId;
-            this.tblId = tblId;
-            this.paimonOptionParams = paimonOptionParams;
-            this.hadoopOptionParams = hadoopOptionParams;
-            this.dbName = dbName;
-            this.tblName = tblName;
-            this.queryType = queryType;
-        }
-
-        public long getCtlId() {
-            return ctlId;
-        }
-
-        public long getDbId() {
-            return dbId;
-        }
-
-        public long getTblId() {
-            return tblId;
-        }
-
-        public Map<String, String> getPaimonOptionParams() {
-            return paimonOptionParams;
-        }
-
-        public Map<String, String> getHadoopOptionParams() {
-            return hadoopOptionParams;
-        }
-
-        public String getDbName() {
-            return dbName;
-        }
-
-        public String getTblName() {
-            return tblName;
-        }
-
-        public String getQueryType() {
-            return queryType;
-        }
-
-        @Override
-        public boolean equals(Object o) {
-            if (this == o) {
-                return true;
-            }
-            if (o == null || getClass() != o.getClass()) {
-                return false;
-            }
-            PaimonTableCacheKey that = (PaimonTableCacheKey) o;
-            return ctlId == that.ctlId && dbId == that.dbId && tblId == 
that.tblId && Objects.equal(
-                    queryType,
-                    that.queryType);
-        }
-
-        @Override
-        public int hashCode() {
-            return Objects.hashCode(ctlId, dbId, tblId);
-        }
-
-        @Override
-        public String toString() {
-            return "PaimonTableCacheKey{"
-                    + "ctlId=" + ctlId
-                    + ", dbId=" + dbId
-                    + ", tblId=" + tblId
-                    + ", paimonOptionParams=" + paimonOptionParams
-                    + ", hadoopOptionParams=" + hadoopOptionParams
-                    + ", dbName='" + dbName + '\''
-                    + ", tblName='" + tblName + '\''
-                    + ", queryType='" + queryType + '\''
-                    + '}';
-        }
-    }
-
-}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
index 1751d1733e8..3bfe7877c9b 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
@@ -87,7 +87,6 @@ public class PaimonScanNode extends FileQueryScanNode {
     private static final String DORIS_START_TIMESTAMP = "startTimestamp";
     private static final String DORIS_END_TIMESTAMP = "endTimestamp";
     private static final String DORIS_INCREMENTAL_BETWEEN_SCAN_MODE = 
"incrementalBetweenScanMode";
-    private static final String DEFAULT_INCREMENTAL_BETWEEN_SCAN_MODE = "auto";
 
     private enum SplitReadType {
         JNI,
@@ -217,15 +216,6 @@ public class PaimonScanNode extends FileQueryScanNode {
         }
         fileDesc.setFileFormat(fileFormat);
         
fileDesc.setPaimonPredicate(PaimonUtil.encodeObjectToString(predicates));
-        
fileDesc.setPaimonColumnNames(source.getDesc().getSlots().stream().map(slot -> 
slot.getColumn().getName())
-                .collect(Collectors.joining(",")));
-        fileDesc.setDbName(((PaimonExternalTable) 
source.getTargetTable()).getDbName());
-        fileDesc.setPaimonOptions(((PaimonExternalCatalog) 
source.getCatalog()).getPaimonOptionsMap());
-        fileDesc.setTableName(source.getTargetTable().getName());
-        fileDesc.setCtlId(source.getCatalog().getId());
-        fileDesc.setDbId(((PaimonExternalTable) 
source.getTargetTable()).getDbId());
-        fileDesc.setTblId(source.getTargetTable().getId());
-        fileDesc.setLastUpdateTime(source.getTargetTable().getUpdateTime());
         // The hadoop conf should be same with
         // PaimonExternalCatalog.createCatalog()#getConfiguration()
         
fileDesc.setHadoopConf(source.getCatalog().getCatalogProperty().getBackendStorageProperties());
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/MetadataScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/MetadataScanNode.java
index ac6281112e5..5073dd13d53 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/MetadataScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/MetadataScanNode.java
@@ -70,24 +70,58 @@ public class MetadataScanNode extends ExternalScanNode {
 
     @Override
     protected void createScanRangeLocations() {
-        List<String> requiredFileds = desc.getSlots().stream()
+        List<String> requiredFields = desc.getSlots().stream()
                 .filter(slot -> slot.isMaterialized())
                 .map(slot -> slot.getColumn().getName())
                 .collect(java.util.stream.Collectors.toList());
-        for (TMetaScanRange metaScanRange : 
tvf.getMetaScanRanges(requiredFileds)) {
-            TScanRange scanRange = new TScanRange();
-            scanRange.setMetaScanRange(metaScanRange);
+        TMetaScanRange metaScanRange = tvf.getMetaScanRange(requiredFields);
 
+        if (!metaScanRange.isSetSerializedSplits()) {
+            // no need to split ranges to send to backends
             TScanRangeLocation location = new TScanRangeLocation();
             Backend backend = backendPolicy.getNextBe();
             location.setBackendId(backend.getId());
             location.setServer(new TNetworkAddress(backend.getHost(), 
backend.getBePort()));
 
+            TScanRange scanRange = new TScanRange();
+            scanRange.setMetaScanRange(metaScanRange);
+
             TScanRangeLocations locations = new TScanRangeLocations();
             locations.addToLocations(location);
             locations.setScanRange(scanRange);
 
             scanRangeLocations.add(locations);
+        } else {
+            // need to split ranges to send to backends
+            List<Backend> backends = 
Lists.newArrayList(backendPolicy.getBackends());
+            List<String> splits = metaScanRange.getSerializedSplits();
+            int numSplitsPerBE = Math.max(1, splits.size() / backends.size());
+
+            for (int i = 0; i < backends.size(); i++) {
+                int from = i * numSplitsPerBE;
+                if (from >= splits.size()) {
+                    continue; // no splits for this backend
+                }
+                int to = Math.min((i + 1) * numSplitsPerBE, splits.size());
+
+                // set splited task to TMetaScanRange
+                TMetaScanRange subRange = metaScanRange.deepCopy();
+                subRange.setSerializedSplits(splits.subList(from, to));
+
+                TScanRangeLocation location = new TScanRangeLocation();
+                Backend backend = backends.get(i);
+                location.setBackendId(backend.getId());
+                location.setServer(new TNetworkAddress(backend.getHost(), 
backend.getBePort()));
+
+                TScanRange scanRange = new TScanRange();
+                scanRange.setMetaScanRange(subRange);
+
+                TScanRangeLocations locations = new TScanRangeLocations();
+                locations.addToLocations(location);
+                locations.setScanRange(scanRange);
+
+                scanRangeLocations.add(locations);
+            }
         }
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/BackendsTableValuedFunction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/BackendsTableValuedFunction.java
index 826e1e7c59d..aca6779fc64 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/BackendsTableValuedFunction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/BackendsTableValuedFunction.java
@@ -33,7 +33,6 @@ import org.apache.doris.thrift.TMetadataType;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
 
 import java.util.List;
 import java.util.Map;
@@ -104,13 +103,13 @@ public class BackendsTableValuedFunction extends 
MetadataTableValuedFunction {
     }
 
     @Override
-    public List<TMetaScanRange> getMetaScanRanges(List<String> requiredFileds) 
{
+    public TMetaScanRange getMetaScanRange(List<String> requiredFileds) {
         TMetaScanRange metaScanRange = new TMetaScanRange();
         metaScanRange.setMetadataType(TMetadataType.BACKENDS);
         TBackendsMetadataParams backendsMetadataParams = new 
TBackendsMetadataParams();
         backendsMetadataParams.setClusterName("");
         metaScanRange.setBackendsParams(backendsMetadataParams);
-        return Lists.newArrayList(metaScanRange);
+        return metaScanRange;
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/CatalogsTableValuedFunction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/CatalogsTableValuedFunction.java
index 75df6bb6221..35719f3fa52 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/CatalogsTableValuedFunction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/CatalogsTableValuedFunction.java
@@ -26,7 +26,6 @@ import org.apache.doris.thrift.TMetadataType;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
 
 import java.util.List;
 import java.util.Map;
@@ -79,9 +78,9 @@ public class CatalogsTableValuedFunction extends 
MetadataTableValuedFunction {
     }
 
     @Override
-    public List<TMetaScanRange> getMetaScanRanges(List<String> requiredFileds) 
{
+    public TMetaScanRange getMetaScanRange(List<String> requiredFileds) {
         TMetaScanRange metaScanRange = new TMetaScanRange();
         metaScanRange.setMetadataType(TMetadataType.CATALOGS);
-        return Lists.newArrayList(metaScanRange);
+        return metaScanRange;
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/FrontendsDisksTableValuedFunction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/FrontendsDisksTableValuedFunction.java
index d1b176f35a1..31ddf55313a 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/FrontendsDisksTableValuedFunction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/FrontendsDisksTableValuedFunction.java
@@ -32,7 +32,6 @@ import org.apache.doris.thrift.TMetadataType;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
 
 import java.util.List;
 import java.util.Map;
@@ -88,13 +87,13 @@ public class FrontendsDisksTableValuedFunction extends 
MetadataTableValuedFuncti
     }
 
     @Override
-    public List<TMetaScanRange> getMetaScanRanges(List<String> requiredFileds) 
{
+    public TMetaScanRange getMetaScanRange(List<String> requiredFileds) {
         TMetaScanRange metaScanRange = new TMetaScanRange();
         metaScanRange.setMetadataType(TMetadataType.FRONTENDS_DISKS);
         TFrontendsMetadataParams frontendsMetadataParams = new 
TFrontendsMetadataParams();
         frontendsMetadataParams.setClusterName("");
         metaScanRange.setFrontendsParams(frontendsMetadataParams);
-        return Lists.newArrayList(metaScanRange);
+        return metaScanRange;
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/FrontendsTableValuedFunction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/FrontendsTableValuedFunction.java
index 5acf44f4d3b..efb90070e65 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/FrontendsTableValuedFunction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/FrontendsTableValuedFunction.java
@@ -32,7 +32,6 @@ import org.apache.doris.thrift.TMetadataType;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
 
 import java.util.List;
 import java.util.Map;
@@ -97,13 +96,13 @@ public class FrontendsTableValuedFunction extends 
MetadataTableValuedFunction {
     }
 
     @Override
-    public List<TMetaScanRange> getMetaScanRanges(List<String> requiredFileds) 
{
+    public TMetaScanRange getMetaScanRange(List<String> requiredFileds) {
         TMetaScanRange metaScanRange = new TMetaScanRange();
         metaScanRange.setMetadataType(TMetadataType.FRONTENDS);
         TFrontendsMetadataParams frontendsMetadataParams = new 
TFrontendsMetadataParams();
         frontendsMetadataParams.setClusterName("");
         metaScanRange.setFrontendsParams(frontendsMetadataParams);
-        return Lists.newArrayList(metaScanRange);
+        return metaScanRange;
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/HudiTableValuedFunction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/HudiTableValuedFunction.java
index 70e1ec84928..2125d420fa5 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/HudiTableValuedFunction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/HudiTableValuedFunction.java
@@ -120,7 +120,7 @@ public class HudiTableValuedFunction extends 
MetadataTableValuedFunction {
     }
 
     @Override
-    public List<TMetaScanRange> getMetaScanRanges(List<String> requiredFileds) 
{
+    public TMetaScanRange getMetaScanRange(List<String> requiredFileds) {
         TMetaScanRange metaScanRange = new TMetaScanRange();
         metaScanRange.setMetadataType(TMetadataType.HUDI);
         // set hudi metadata params
@@ -130,7 +130,7 @@ public class HudiTableValuedFunction extends 
MetadataTableValuedFunction {
         hudiMetadataParams.setDatabase(hudiTableName.getDb());
         hudiMetadataParams.setTable(hudiTableName.getTbl());
         metaScanRange.setHudiParams(hudiMetadataParams);
-        return Lists.newArrayList(metaScanRange);
+        return metaScanRange;
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/IcebergTableValuedFunction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/IcebergTableValuedFunction.java
index d9ccc392084..359ad2ce57d 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/IcebergTableValuedFunction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/IcebergTableValuedFunction.java
@@ -31,12 +31,10 @@ import org.apache.doris.datasource.ExternalTable;
 import org.apache.doris.datasource.iceberg.IcebergUtils;
 import org.apache.doris.mysql.privilege.PrivPredicate;
 import org.apache.doris.qe.ConnectContext;
-import org.apache.doris.thrift.TIcebergMetadataParams;
 import org.apache.doris.thrift.TMetaScanRange;
 import org.apache.doris.thrift.TMetadataType;
 
 import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.iceberg.FileScanTask;
@@ -48,6 +46,8 @@ import org.apache.iceberg.util.SerializationUtil;
 
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
 
 /**
  * The class of table valued function for iceberg metadata.
@@ -135,8 +135,7 @@ public class IcebergTableValuedFunction extends 
MetadataTableValuedFunction {
     }
 
     @Override
-    public List<TMetaScanRange> getMetaScanRanges(List<String> requiredFileds) 
{
-        List<TMetaScanRange> scanRanges = Lists.newArrayList();
+    public TMetaScanRange getMetaScanRange(List<String> requiredFileds) {
         CloseableIterable<FileScanTask> tasks;
         try {
             tasks = preExecutionAuthenticator.execute(() -> {
@@ -145,17 +144,15 @@ public class IcebergTableValuedFunction extends 
MetadataTableValuedFunction {
         } catch (Exception e) {
             throw new RuntimeException(ExceptionUtils.getRootCauseMessage(e));
         }
-        for (FileScanTask task : tasks) {
-            TMetaScanRange metaScanRange = new TMetaScanRange();
-            metaScanRange.setMetadataType(TMetadataType.ICEBERG);
-            // set iceberg metadata params
-            TIcebergMetadataParams icebergMetadataParams = new 
TIcebergMetadataParams();
-            icebergMetadataParams.setHadoopProps(hadoopProps);
-            
icebergMetadataParams.setSerializedTask(SerializationUtil.serializeToBase64(task));
-            metaScanRange.setIcebergParams(icebergMetadataParams);
-            scanRanges.add(metaScanRange);
-        }
-        return scanRanges;
+
+        TMetaScanRange tMetaScanRange = new TMetaScanRange();
+        tMetaScanRange.setMetadataType(TMetadataType.ICEBERG);
+        tMetaScanRange.setHadoopProps(hadoopProps);
+        
tMetaScanRange.setSerializedTable(SerializationUtil.serializeToBase64(sysTable));
+        
tMetaScanRange.setSerializedSplits(StreamSupport.stream(tasks.spliterator(), 
false)
+                .map(SerializationUtil::serializeToBase64)
+                .collect(Collectors.toList()));
+        return tMetaScanRange;
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/JobsTableValuedFunction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/JobsTableValuedFunction.java
index b5d0489d30c..d9c5ec5ed2b 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/JobsTableValuedFunction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/JobsTableValuedFunction.java
@@ -31,7 +31,6 @@ import org.apache.doris.thrift.TMetadataTableRequestParams;
 import org.apache.doris.thrift.TMetadataType;
 
 import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
 import java.util.List;
@@ -99,14 +98,14 @@ public class JobsTableValuedFunction extends 
MetadataTableValuedFunction {
     }
 
     @Override
-    public List<TMetaScanRange> getMetaScanRanges(List<String> requiredFileds) 
{
+    public TMetaScanRange getMetaScanRange(List<String> requiredFileds) {
         TMetaScanRange metaScanRange = new TMetaScanRange();
         metaScanRange.setMetadataType(TMetadataType.JOBS);
         TJobsMetadataParams jobParam = new TJobsMetadataParams();
         jobParam.setType(jobType.name());
         
jobParam.setCurrentUserIdent(ConnectContext.get().getCurrentUserIdentity().toThrift());
         metaScanRange.setJobsParams(jobParam);
-        return Lists.newArrayList(metaScanRange);
+        return metaScanRange;
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java
index 92b30b0347b..32be139657c 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java
@@ -60,7 +60,7 @@ public abstract class MetadataTableValuedFunction extends 
TableValuedFunctionIf
 
     public abstract TMetadataType getMetadataType();
 
-    public abstract List<TMetaScanRange> getMetaScanRanges(List<String> 
requiredFileds);
+    public abstract TMetaScanRange getMetaScanRange(List<String> 
requiredFileds);
 
     @Override
     public ScanNode getScanNode(PlanNodeId id, TupleDescriptor desc, 
SessionVariable sv) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MvInfosTableValuedFunction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MvInfosTableValuedFunction.java
index c40a8d4716d..e0c42165530 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MvInfosTableValuedFunction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MvInfosTableValuedFunction.java
@@ -29,7 +29,6 @@ import org.apache.doris.thrift.TMetadataType;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -106,7 +105,7 @@ public class MvInfosTableValuedFunction extends 
MetadataTableValuedFunction {
     }
 
     @Override
-    public List<TMetaScanRange> getMetaScanRanges(List<String> requiredFileds) 
{
+    public TMetaScanRange getMetaScanRange(List<String> requiredFileds) {
         if (LOG.isDebugEnabled()) {
             LOG.debug("getMetaScanRange() start");
         }
@@ -119,7 +118,7 @@ public class MvInfosTableValuedFunction extends 
MetadataTableValuedFunction {
         if (LOG.isDebugEnabled()) {
             LOG.debug("getMetaScanRange() end");
         }
-        return Lists.newArrayList(metaScanRange);
+        return metaScanRange;
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/PaimonTableValuedFunction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/PaimonTableValuedFunction.java
index 6d5e1fba09d..5fdd3b1846b 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/PaimonTableValuedFunction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/PaimonTableValuedFunction.java
@@ -34,7 +34,6 @@ import org.apache.doris.mysql.privilege.PrivPredicate;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.thrift.TMetaScanRange;
 import org.apache.doris.thrift.TMetadataType;
-import org.apache.doris.thrift.TPaimonMetadataParams;
 
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Maps;
@@ -60,12 +59,7 @@ public class PaimonTableValuedFunction extends 
MetadataTableValuedFunction {
     private final Table paimonSysTable;
     private final List<Column> schema;
     private final Map<String, String> hadoopProps;
-    private final Map<String, String> paimonProps;
     private final ExecutionAuthenticator hadoopAuthenticator;
-    private final TableName paimonTableName;
-    private final long ctlId;
-    private final long dbId;
-    private final long tblId;
 
     /**
      * Creates a new Paimon table-valued function instance.
@@ -84,18 +78,14 @@ public class PaimonTableValuedFunction extends 
MetadataTableValuedFunction {
             throw new AnalysisException("Catalog " + paimonTableName.getCtl() 
+ " is not an paimon catalog");
         }
 
-        this.paimonTableName = paimonTableName;
         PaimonExternalCatalog paimonExternalCatalog = (PaimonExternalCatalog) 
dorisCatalog;
         this.hadoopProps = 
paimonExternalCatalog.getCatalogProperty().getHadoopProperties();
-        this.paimonProps = paimonExternalCatalog.getPaimonOptionsMap();
         this.hadoopAuthenticator = 
paimonExternalCatalog.getExecutionAuthenticator();
-        this.ctlId = paimonExternalCatalog.getId();
 
         ExternalDatabase<? extends ExternalTable> database = 
paimonExternalCatalog.getDb(paimonTableName.getDb())
                 .orElseThrow(() -> new AnalysisException(
                         String.format("Paimon catalog database '%s' does not 
exist", paimonTableName.getDb())
                 ));
-        this.dbId = database.getId();
 
         ExternalTable externalTable = 
database.getTable(paimonTableName.getTbl())
                 .orElseThrow(() -> new AnalysisException(
@@ -103,7 +93,6 @@ public class PaimonTableValuedFunction extends 
MetadataTableValuedFunction {
                                 paimonTableName.getDb(), 
paimonTableName.getTbl())
                 ));
         NameMapping buildNameMapping = externalTable.getOrBuildNameMapping();
-        this.tblId = externalTable.getId();
 
         this.paimonSysTable = 
paimonExternalCatalog.getPaimonTable(buildNameMapping,
                 "main", queryType);
@@ -148,7 +137,7 @@ public class PaimonTableValuedFunction extends 
MetadataTableValuedFunction {
     }
 
     @Override
-    public List<TMetaScanRange> getMetaScanRanges(List<String> requiredFileds) 
{
+    public TMetaScanRange getMetaScanRange(List<String> requiredFileds) {
         int[] projections = requiredFileds.stream().mapToInt(
                         field -> paimonSysTable.rowType().getFieldNames()
                                 .stream()
@@ -157,7 +146,6 @@ public class PaimonTableValuedFunction extends 
MetadataTableValuedFunction {
                                 .indexOf(field))
                 .toArray();
         List<Split> splits;
-
         try {
             splits = hadoopAuthenticator.execute(
                     () -> 
paimonSysTable.newReadBuilder().withProjection(projections).newScan().plan().splits());
@@ -165,7 +153,13 @@ public class PaimonTableValuedFunction extends 
MetadataTableValuedFunction {
             throw new RuntimeException(ExceptionUtils.getRootCauseMessage(e));
         }
 
-        return 
splits.stream().map(this::createMetaScanRange).collect(Collectors.toList());
+        TMetaScanRange tMetaScanRange = new TMetaScanRange();
+        tMetaScanRange.setMetadataType(TMetadataType.PAIMON);
+        tMetaScanRange.setHadoopProps(hadoopProps);
+        
tMetaScanRange.setSerializedTable(PaimonUtil.encodeObjectToString(paimonSysTable));
+        tMetaScanRange.setSerializedSplits(
+                
splits.stream().map(PaimonUtil::encodeObjectToString).collect(Collectors.toList()));
+        return tMetaScanRange;
     }
 
     @Override
@@ -177,23 +171,4 @@ public class PaimonTableValuedFunction extends 
MetadataTableValuedFunction {
     public List<Column> getTableColumns() throws AnalysisException {
         return schema;
     }
-
-    private TMetaScanRange createMetaScanRange(Split split) {
-        TMetaScanRange tMetaScanRange = new TMetaScanRange();
-        tMetaScanRange.setMetadataType(TMetadataType.PAIMON);
-
-        TPaimonMetadataParams tPaimonMetadataParams = new 
TPaimonMetadataParams();
-        tPaimonMetadataParams.setCtlId(ctlId);
-        tPaimonMetadataParams.setDbId(dbId);
-        tPaimonMetadataParams.setTblId(tblId);
-        tPaimonMetadataParams.setQueryType(queryType);
-        tPaimonMetadataParams.setDbName(paimonTableName.getDb());
-        tPaimonMetadataParams.setTblName(paimonTableName.getTbl());
-        tPaimonMetadataParams.setHadoopProps(hadoopProps);
-        tPaimonMetadataParams.setPaimonProps(paimonProps);
-        
tPaimonMetadataParams.setSerializedSplit(PaimonUtil.encodeObjectToString(split));
-
-        tMetaScanRange.setPaimonParams(tPaimonMetadataParams);
-        return tMetaScanRange;
-    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/PartitionValuesTableValuedFunction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/PartitionValuesTableValuedFunction.java
index e0a0d4dc649..494a68edf3a 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/PartitionValuesTableValuedFunction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/PartitionValuesTableValuedFunction.java
@@ -145,7 +145,7 @@ public class PartitionValuesTableValuedFunction extends 
MetadataTableValuedFunct
     }
 
     @Override
-    public List<TMetaScanRange> getMetaScanRanges(List<String> requiredFileds) 
{
+    public TMetaScanRange getMetaScanRange(List<String> requiredFileds) {
         if (LOG.isDebugEnabled()) {
             LOG.debug("getMetaScanRange() start");
         }
@@ -156,7 +156,7 @@ public class PartitionValuesTableValuedFunction extends 
MetadataTableValuedFunct
         partitionParam.setDatabase(databaseName);
         partitionParam.setTable(tableName);
         metaScanRange.setPartitionValuesParams(partitionParam);
-        return Lists.newArrayList(metaScanRange);
+        return metaScanRange;
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/PartitionsTableValuedFunction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/PartitionsTableValuedFunction.java
index 3ffb77cdbc6..53f99c549a3 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/PartitionsTableValuedFunction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/PartitionsTableValuedFunction.java
@@ -43,7 +43,6 @@ import org.apache.doris.thrift.TPartitionsMetadataParams;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
@@ -211,7 +210,7 @@ public class PartitionsTableValuedFunction extends 
MetadataTableValuedFunction {
     }
 
     @Override
-    public List<TMetaScanRange> getMetaScanRanges(List<String> requiredFileds) 
{
+    public TMetaScanRange getMetaScanRange(List<String> requiredFileds) {
         if (LOG.isDebugEnabled()) {
             LOG.debug("getMetaScanRange() start");
         }
@@ -225,7 +224,7 @@ public class PartitionsTableValuedFunction extends 
MetadataTableValuedFunction {
         if (LOG.isDebugEnabled()) {
             LOG.debug("getMetaScanRange() end");
         }
-        return Lists.newArrayList(metaScanRange);
+        return metaScanRange;
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TasksTableValuedFunction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TasksTableValuedFunction.java
index 5d60adbeacf..b343a7dbeed 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TasksTableValuedFunction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TasksTableValuedFunction.java
@@ -31,7 +31,6 @@ import org.apache.doris.thrift.TMetadataType;
 import org.apache.doris.thrift.TTasksMetadataParams;
 
 import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
 import java.util.List;
@@ -99,14 +98,14 @@ public class TasksTableValuedFunction extends 
MetadataTableValuedFunction {
     }
 
     @Override
-    public List<TMetaScanRange> getMetaScanRanges(List<String> requiredFileds) 
{
+    public TMetaScanRange getMetaScanRange(List<String> requiredFileds) {
         TMetaScanRange metaScanRange = new TMetaScanRange();
         metaScanRange.setMetadataType(TMetadataType.TASKS);
         TTasksMetadataParams taskParam = new TTasksMetadataParams();
         taskParam.setType(jobType.name());
         
taskParam.setCurrentUserIdent(ConnectContext.get().getCurrentUserIdentity().toThrift());
         metaScanRange.setTasksParams(taskParam);
-        return Lists.newArrayList(metaScanRange);
+        return metaScanRange;
     }
 
     @Override
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index 12b0f412561..adc5be9cd88 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -549,12 +549,13 @@ struct TDataGenScanRange {
 }
 
 
+// deprecated
 struct TIcebergMetadataParams {
   1: optional string serialized_task
   2: optional map<string, string> hadoop_props
 }
 
-
+// deprecated
 struct TPaimonMetadataParams {
   1: optional string db_name
   2: optional string tbl_name
@@ -624,7 +625,7 @@ struct TMetaCacheStatsParams {
 
 struct TMetaScanRange {
   1: optional Types.TMetadataType metadata_type
-  2: optional TIcebergMetadataParams iceberg_params
+  2: optional TIcebergMetadataParams iceberg_params // deprecated
   3: optional TBackendsMetadataParams backends_params
   4: optional TFrontendsMetadataParams frontends_params
   5: optional TQueriesMetadataParams queries_params
@@ -635,7 +636,12 @@ struct TMetaScanRange {
   10: optional TMetaCacheStatsParams meta_cache_stats_params
   11: optional TPartitionValuesMetadataParams partition_values_params
   12: optional THudiMetadataParams hudi_params
-  13: optional TPaimonMetadataParams paimon_params
+  13: optional TPaimonMetadataParams paimon_params // deprecated
+
+  // for quering sys tables for Paimon/Iceberg
+  14: optional map<string, string> hadoop_props
+  15: optional string serialized_table;
+  16: optional list<string> serialized_splits;
 }
 
 // Specification of an individual data range which is held in its entirety
diff --git 
a/regression-test/data/external_table_p0/paimon/test_paimon_timestamp_with_time_zone.out
 
b/regression-test/data/external_table_p0/paimon/test_paimon_timestamp_with_time_zone.out
new file mode 100644
index 00000000000..6c4acf47ef8
Binary files /dev/null and 
b/regression-test/data/external_table_p0/paimon/test_paimon_timestamp_with_time_zone.out
 differ
diff --git 
a/regression-test/suites/external_table_p0/paimon/test_paimon_timestamp_with_time_zone.groovy
 
b/regression-test/suites/external_table_p0/paimon/test_paimon_timestamp_with_time_zone.groovy
new file mode 100644
index 00000000000..68a9a06522c
--- /dev/null
+++ 
b/regression-test/suites/external_table_p0/paimon/test_paimon_timestamp_with_time_zone.groovy
@@ -0,0 +1,61 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_paimon_timestamp_with_time_zone", 
"p0,external,doris,external_docker,external_docker_doris") {
+    String enabled = context.config.otherConfigs.get("enablePaimonTest")
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        String minio_port = 
context.config.otherConfigs.get("iceberg_minio_port")
+        String catalog_name = "test_paimon_timestamp_with_time_zone"
+        String db_name = "test_paimon_spark"
+        String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+        String rest_port = 
context.config.otherConfigs.get("iceberg_rest_uri_port")
+
+        sql """drop catalog if exists ${catalog_name}"""
+
+        sql """
+            CREATE CATALOG ${catalog_name} PROPERTIES (
+                    'type' = 'paimon',
+                    'warehouse' = 's3://warehouse/wh',
+                    's3.endpoint' = 'http://${externalEnvIp}:${minio_port}',
+                    's3.access_key' = 'admin',
+                    's3.secret_key' = 'password',
+                    's3.path.style.access' = 'true'
+            );
+        """
+        sql """use `${catalog_name}`.`${db_name}`;"""
+        
+        def test_select_timestamp = {
+            qt_select_timestamp """ select * from t_ts_ntz order by id; """
+        }
+
+        try {
+            sql """ set time_zone = 'Asia/Shanghai'; """
+            sql """ set force_jni_scanner = true; """
+            test_select_timestamp()
+            sql """ set force_jni_scanner = false; """
+            test_select_timestamp()
+            sql """ set time_zone = '+10:00'; """
+            sql """ set force_jni_scanner = true; """
+            test_select_timestamp()
+            sql """ set force_jni_scanner = false; """
+            test_select_timestamp()
+        } finally {
+            sql """ unset variable time_zone; """
+            sql """ set force_jni_scanner = false; """
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to