This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 54bb19f1fbd [enhance](paimon) opt count pushdown for paimon and 
refactor be logic (#46911)
54bb19f1fbd is described below

commit 54bb19f1fbd582ddd894e007cd644f0284f958a0
Author: Socrates <suyit...@selectdb.com>
AuthorDate: Sun Mar 2 22:40:34 2025 +0800

    [enhance](paimon) opt count pushdown for paimon and refactor be logic 
(#46911)
    
    ### What problem does this PR solve?
    
    Related PR: #44038
    
    1. Obtain directly from statistics for select count(*) from paimon_table
    2. Refactor TableFormatReader, move same logic of IcebergTableReader,
    PaimonTableReader and TransactionalHiveReader to TableFormatReader
---
 be/src/vec/exec/format/table/iceberg_reader.cpp    |  43 ++----
 be/src/vec/exec/format/table/iceberg_reader.h      |  24 +---
 be/src/vec/exec/format/table/paimon_jni_reader.cpp |  23 ++-
 be/src/vec/exec/format/table/paimon_jni_reader.h   |   1 +
 be/src/vec/exec/format/table/paimon_reader.cpp     |  28 ++--
 be/src/vec/exec/format/table/paimon_reader.h       |  26 ++--
 be/src/vec/exec/format/table/table_format_reader.h |  57 ++++++--
 .../format/table/transactional_hive_reader.cpp     |  15 +-
 .../exec/format/table/transactional_hive_reader.h  |  13 +-
 be/src/vec/exec/scan/vfile_scanner.cpp             |  13 +-
 .../org/apache/doris/datasource/FileScanNode.java  |  46 +++---
 .../datasource/iceberg/source/IcebergScanNode.java |  12 +-
 .../datasource/paimon/source/PaimonScanNode.java   | 146 +++++++++++--------
 .../datasource/paimon/source/PaimonSplit.java      |  15 +-
 .../paimon/source/PaimonScanNodeTest.java          | 155 +++++++++++++++++++++
 gensrc/thrift/PlanNodes.thrift                     |   3 +
 .../paimon/test_paimon_catalog.out                 | Bin 795479 -> 795039 bytes
 .../paimon/test_paimon_deletion_vector.out         | Bin 0 -> 525 bytes
 .../paimon/test_paimon_catalog.groovy              |  14 --
 .../paimon/test_paimon_deletion_vector.groovy      |  96 +++++++++++++
 20 files changed, 517 insertions(+), 213 deletions(-)

diff --git a/be/src/vec/exec/format/table/iceberg_reader.cpp 
b/be/src/vec/exec/format/table/iceberg_reader.cpp
index 3d6d4df129d..daaf3167369 100644
--- a/be/src/vec/exec/format/table/iceberg_reader.cpp
+++ b/be/src/vec/exec/format/table/iceberg_reader.cpp
@@ -77,13 +77,8 @@ 
IcebergTableReader::IcebergTableReader(std::unique_ptr<GenericReader> file_forma
                                        const TFileScanRangeParams& params,
                                        const TFileRangeDesc& range, 
ShardedKVCache* kv_cache,
                                        io::IOContext* io_ctx)
-        : TableFormatReader(std::move(file_format_reader)),
-          _profile(profile),
-          _state(state),
-          _params(params),
-          _range(range),
-          _kv_cache(kv_cache),
-          _io_ctx(io_ctx) {
+        : TableFormatReader(std::move(file_format_reader), state, profile, 
params, range, io_ctx),
+          _kv_cache(kv_cache) {
     static const char* iceberg_profile = "IcebergProfile";
     ADD_TIMER(_profile, iceberg_profile);
     _iceberg_profile.num_delete_files =
@@ -94,31 +89,9 @@ 
IcebergTableReader::IcebergTableReader(std::unique_ptr<GenericReader> file_forma
             ADD_CHILD_TIMER(_profile, "DeleteFileReadTime", iceberg_profile);
     _iceberg_profile.delete_rows_sort_time =
             ADD_CHILD_TIMER(_profile, "DeleteRowsSortTime", iceberg_profile);
-    if (range.table_format_params.iceberg_params.__isset.row_count) {
-        _remaining_table_level_row_count = 
range.table_format_params.iceberg_params.row_count;
-    } else {
-        _remaining_table_level_row_count = -1;
-    }
 }
 
-Status IcebergTableReader::get_next_block(Block* block, size_t* read_rows, 
bool* eof) {
-    // already get rows from be
-    if (_push_down_agg_type == TPushAggOp::type::COUNT && 
_remaining_table_level_row_count > 0) {
-        auto rows = std::min(_remaining_table_level_row_count,
-                             (int64_t)_state->query_options().batch_size);
-        _remaining_table_level_row_count -= rows;
-        auto mutate_columns = block->mutate_columns();
-        for (auto& col : mutate_columns) {
-            col->resize(rows);
-        }
-        block->set_columns(std::move(mutate_columns));
-        *read_rows = rows;
-        if (_remaining_table_level_row_count == 0) {
-            *eof = true;
-        }
-
-        return Status::OK();
-    }
+Status IcebergTableReader::get_next_block_inner(Block* block, size_t* 
read_rows, bool* eof) {
     RETURN_IF_ERROR(_expand_block_if_need(block));
 
     // To support iceberg schema evolution. We change the column name in block 
to
@@ -161,13 +134,13 @@ Status IcebergTableReader::get_columns(
     return _file_format_reader->get_columns(name_to_type, missing_cols);
 }
 
-Status IcebergTableReader::init_row_filters(const TFileRangeDesc& range, 
io::IOContext* io_ctx) {
+Status IcebergTableReader::init_row_filters() {
     // We get the count value by doris's be, so we don't need to read the 
delete file
-    if (_push_down_agg_type == TPushAggOp::type::COUNT && 
_remaining_table_level_row_count > 0) {
+    if (_push_down_agg_type == TPushAggOp::type::COUNT && 
_table_level_row_count > 0) {
         return Status::OK();
     }
 
-    const auto& table_desc = range.table_format_params.iceberg_params;
+    const auto& table_desc = _range.table_format_params.iceberg_params;
     const auto& version = table_desc.format_version;
     if (version < MIN_SUPPORT_DELETE_FILES_VERSION) {
         return Status::OK();
@@ -545,7 +518,7 @@ Status IcebergParquetReader::init_reader(
     _gen_new_colname_to_value_range();
     parquet_reader->set_table_to_file_col_map(_table_col_to_file_col);
     parquet_reader->iceberg_sanitize(_all_required_col_names);
-    RETURN_IF_ERROR(init_row_filters(_range, _io_ctx));
+    RETURN_IF_ERROR(init_row_filters());
     return parquet_reader->init_reader(
             _all_required_col_names, _not_in_file_col_names, 
&_new_colname_to_value_range,
             conjuncts, tuple_descriptor, row_descriptor, colname_to_slot_id,
@@ -617,7 +590,7 @@ Status IcebergOrcReader::init_reader(
     _gen_file_col_names();
     _gen_new_colname_to_value_range();
     orc_reader->set_table_col_to_file_col(_table_col_to_file_col);
-    RETURN_IF_ERROR(init_row_filters(_range, _io_ctx));
+    RETURN_IF_ERROR(init_row_filters());
     return orc_reader->init_reader(&_all_required_col_names, 
&_new_colname_to_value_range,
                                    conjuncts, false, tuple_descriptor, 
row_descriptor,
                                    not_single_slot_filter_conjuncts, 
slot_id_to_filter_conjuncts);
diff --git a/be/src/vec/exec/format/table/iceberg_reader.h 
b/be/src/vec/exec/format/table/iceberg_reader.h
index e500538b6f9..2fbf7b5904f 100644
--- a/be/src/vec/exec/format/table/iceberg_reader.h
+++ b/be/src/vec/exec/format/table/iceberg_reader.h
@@ -29,10 +29,8 @@
 #include "exec/olap_common.h"
 #include "runtime/define_primitive_type.h"
 #include "runtime/primitive_type.h"
-#include "runtime/runtime_state.h"
 #include "runtime/types.h"
 #include "table_format_reader.h"
-#include "util/runtime_profile.h"
 #include "vec/columns/column_dictionary.h"
 #include "vec/exec/format/orc/vorc_reader.h"
 #include "vec/exec/format/parquet/vparquet_reader.h"
@@ -80,9 +78,9 @@ public:
                        io::IOContext* io_ctx);
     ~IcebergTableReader() override = default;
 
-    Status init_row_filters(const TFileRangeDesc& range, io::IOContext* 
io_ctx) final;
+    Status init_row_filters() final;
 
-    Status get_next_block(Block* block, size_t* read_rows, bool* eof) final;
+    Status get_next_block_inner(Block* block, size_t* read_rows, bool* eof) 
final;
 
     Status get_columns(std::unordered_map<std::string, TypeDescriptor>* 
name_to_type,
                        std::unordered_set<std::string>* missing_cols) final;
@@ -135,10 +133,6 @@ protected:
     // Remove the added delete columns
     Status _shrink_block_if_need(Block* block);
 
-    RuntimeProfile* _profile;
-    RuntimeState* _state;
-    const TFileScanRangeParams& _params;
-    const TFileRangeDesc& _range;
     // owned by scan node
     ShardedKVCache* _kv_cache;
     IcebergProfile _iceberg_profile;
@@ -162,13 +156,9 @@ protected:
     std::vector<std::string> _expand_col_names;
     std::vector<ColumnWithTypeAndName> _expand_columns;
 
-    io::IOContext* _io_ctx;
     bool _has_schema_change = false;
     bool _has_iceberg_schema = false;
 
-    // the table level row count for optimizing query like:
-    // select count(*) from table;
-    int64_t _remaining_table_level_row_count;
     Fileformat _file_format = Fileformat::NONE;
 
     const int64_t MIN_SUPPORT_DELETE_FILES_VERSION = 2;
@@ -212,9 +202,9 @@ public:
             const std::unordered_map<int, VExprContextSPtrs>* 
slot_id_to_filter_conjuncts);
 
     Status _read_position_delete_file(const TFileRangeDesc* delete_range,
-                                      DeleteFile* position_delete) override;
+                                      DeleteFile* position_delete) final;
 
-    void set_delete_rows() override {
+    void set_delete_rows() final {
         auto* parquet_reader = (ParquetReader*)(_file_format_reader.get());
         parquet_reader->set_delete_rows(&_iceberg_delete_rows);
     }
@@ -223,7 +213,7 @@ public:
 
 protected:
     std::unique_ptr<GenericReader> _create_equality_reader(
-            const TFileRangeDesc& delete_desc) override {
+            const TFileRangeDesc& delete_desc) final {
         return ParquetReader::create_unique(
                 _profile, _params, delete_desc, READ_DELETE_FILE_BATCH_SIZE,
                 const_cast<cctz::time_zone*>(&_state->timezone_obj()), 
_io_ctx, _state);
@@ -234,7 +224,7 @@ public:
     ENABLE_FACTORY_CREATOR(IcebergOrcReader);
 
     Status _read_position_delete_file(const TFileRangeDesc* delete_range,
-                                      DeleteFile* position_delete) override;
+                                      DeleteFile* position_delete) final;
 
     IcebergOrcReader(std::unique_ptr<GenericReader> file_format_reader, 
RuntimeProfile* profile,
                      RuntimeState* state, const TFileScanRangeParams& params,
@@ -242,7 +232,7 @@ public:
             : IcebergTableReader(std::move(file_format_reader), profile, 
state, params, range,
                                  kv_cache, io_ctx) {}
 
-    void set_delete_rows() override {
+    void set_delete_rows() final {
         auto* orc_reader = (OrcReader*)_file_format_reader.get();
         orc_reader->set_position_delete_rowids(&_iceberg_delete_rows);
     }
diff --git a/be/src/vec/exec/format/table/paimon_jni_reader.cpp 
b/be/src/vec/exec/format/table/paimon_jni_reader.cpp
index cbcdd2d81bd..a05ea4511f4 100644
--- a/be/src/vec/exec/format/table/paimon_jni_reader.cpp
+++ b/be/src/vec/exec/format/table/paimon_jni_reader.cpp
@@ -20,9 +20,9 @@
 #include <map>
 
 #include "runtime/descriptors.h"
+#include "runtime/runtime_state.h"
 #include "runtime/types.h"
 #include "vec/core/types.h"
-
 namespace doris {
 class RuntimeProfile;
 class RuntimeState;
@@ -64,6 +64,11 @@ PaimonJniReader::PaimonJniReader(const 
std::vector<SlotDescriptor*>& file_slot_d
     if (range_params->__isset.serialized_table) {
         params["serialized_table"] = range_params->serialized_table;
     }
+    if (range.table_format_params.__isset.table_level_row_count) {
+        _remaining_table_level_row_count = 
range.table_format_params.table_level_row_count;
+    } else {
+        _remaining_table_level_row_count = -1;
+    }
 
     // Used to create paimon option
     for (const auto& kv : 
range.table_format_params.paimon_params.paimon_options) {
@@ -79,6 +84,22 @@ PaimonJniReader::PaimonJniReader(const 
std::vector<SlotDescriptor*>& file_slot_d
 }
 
 Status PaimonJniReader::get_next_block(Block* block, size_t* read_rows, bool* 
eof) {
+    if (_push_down_agg_type == TPushAggOp::type::COUNT && 
_remaining_table_level_row_count >= 0) {
+        auto rows = std::min(_remaining_table_level_row_count,
+                             (int64_t)_state->query_options().batch_size);
+        _remaining_table_level_row_count -= rows;
+        auto mutate_columns = block->mutate_columns();
+        for (auto& col : mutate_columns) {
+            col->resize(rows);
+        }
+        block->set_columns(std::move(mutate_columns));
+        *read_rows = rows;
+        if (_remaining_table_level_row_count == 0) {
+            *eof = true;
+        }
+
+        return Status::OK();
+    }
     return _jni_connector->get_next_block(block, read_rows, eof);
 }
 
diff --git a/be/src/vec/exec/format/table/paimon_jni_reader.h 
b/be/src/vec/exec/format/table/paimon_jni_reader.h
index 64ef962f0de..b5744428392 100644
--- a/be/src/vec/exec/format/table/paimon_jni_reader.h
+++ b/be/src/vec/exec/format/table/paimon_jni_reader.h
@@ -67,6 +67,7 @@ public:
 
 private:
     std::unordered_map<std::string, ColumnValueRangeType>* 
_colname_to_value_range;
+    int64_t _remaining_table_level_row_count;
 };
 
 #include "common/compile_check_end.h"
diff --git a/be/src/vec/exec/format/table/paimon_reader.cpp 
b/be/src/vec/exec/format/table/paimon_reader.cpp
index 8e4be026bab..dba8efc20e2 100644
--- a/be/src/vec/exec/format/table/paimon_reader.cpp
+++ b/be/src/vec/exec/format/table/paimon_reader.cpp
@@ -20,13 +20,16 @@
 #include <vector>
 
 #include "common/status.h"
+#include "runtime/runtime_state.h"
 #include "util/deletion_vector.h"
 
 namespace doris::vectorized {
 #include "common/compile_check_begin.h"
 PaimonReader::PaimonReader(std::unique_ptr<GenericReader> file_format_reader,
-                           RuntimeProfile* profile, const 
TFileScanRangeParams& params)
-        : TableFormatReader(std::move(file_format_reader)), _profile(profile), 
_params(params) {
+                           RuntimeProfile* profile, RuntimeState* state,
+                           const TFileScanRangeParams& params, const 
TFileRangeDesc& range,
+                           io::IOContext* io_ctx)
+        : TableFormatReader(std::move(file_format_reader), state, profile, 
params, range, io_ctx) {
     static const char* paimon_profile = "PaimonProfile";
     ADD_TIMER(_profile, paimon_profile);
     _paimon_profile.num_delete_rows =
@@ -35,15 +38,18 @@ PaimonReader::PaimonReader(std::unique_ptr<GenericReader> 
file_format_reader,
             ADD_CHILD_TIMER(_profile, "DeleteFileReadTime", paimon_profile);
 }
 
-Status PaimonReader::init_row_filters(const TFileRangeDesc& range, 
io::IOContext* io_ctx) {
-    const auto& table_desc = range.table_format_params.paimon_params;
+Status PaimonReader::init_row_filters() {
+    const auto& table_desc = _range.table_format_params.paimon_params;
     if (!table_desc.__isset.deletion_file) {
         return Status::OK();
     }
 
     // set push down agg type to NONE because we can not do count push down opt
     // if there are delete files.
-    _file_format_reader->set_push_down_agg_type(TPushAggOp::NONE);
+    if (!_range.table_format_params.paimon_params.__isset.row_count) {
+        _file_format_reader->set_push_down_agg_type(TPushAggOp::NONE);
+    }
+
     const auto& deletion_file = table_desc.deletion_file;
     io::FileSystemProperties properties = {
             .system_type = _params.file_type,
@@ -51,9 +57,9 @@ Status PaimonReader::init_row_filters(const TFileRangeDesc& 
range, io::IOContext
             .hdfs_params = _params.hdfs_params,
             .broker_addresses {},
     };
-    if (range.__isset.file_type) {
+    if (_range.__isset.file_type) {
         // for compatibility
-        properties.system_type = range.file_type;
+        properties.system_type = _range.file_type;
     }
     if (_params.__isset.broker_addresses) {
         properties.broker_addresses.assign(_params.broker_addresses.begin(),
@@ -64,7 +70,7 @@ Status PaimonReader::init_row_filters(const TFileRangeDesc& 
range, io::IOContext
             .path = deletion_file.path,
             .file_size = -1,
             .mtime = 0,
-            .fs_name = range.fs_name,
+            .fs_name = _range.fs_name,
     };
 
     // TODO: cache the file in local
@@ -78,7 +84,7 @@ Status PaimonReader::init_row_filters(const TFileRangeDesc& 
range, io::IOContext
     {
         SCOPED_TIMER(_paimon_profile.delete_files_read_time);
         RETURN_IF_ERROR(
-                delete_file_reader->read_at(deletion_file.offset, result, 
&bytes_read, io_ctx));
+                delete_file_reader->read_at(deletion_file.offset, result, 
&bytes_read, _io_ctx));
     }
     if (bytes_read != deletion_file.length + 4) {
         return Status::IOError(
@@ -99,5 +105,9 @@ Status PaimonReader::init_row_filters(const TFileRangeDesc& 
range, io::IOContext
     }
     return Status::OK();
 }
+
+Status PaimonReader::get_next_block_inner(Block* block, size_t* read_rows, 
bool* eof) {
+    return _file_format_reader->get_next_block(block, read_rows, eof);
+}
 #include "common/compile_check_end.h"
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/table/paimon_reader.h 
b/be/src/vec/exec/format/table/paimon_reader.h
index 3776b485a45..09501e55773 100644
--- a/be/src/vec/exec/format/table/paimon_reader.h
+++ b/be/src/vec/exec/format/table/paimon_reader.h
@@ -29,10 +29,13 @@ namespace doris::vectorized {
 class PaimonReader : public TableFormatReader {
 public:
     PaimonReader(std::unique_ptr<GenericReader> file_format_reader, 
RuntimeProfile* profile,
-                 const TFileScanRangeParams& params);
+                 RuntimeState* state, const TFileScanRangeParams& params,
+                 const TFileRangeDesc& range, io::IOContext* io_ctx);
     ~PaimonReader() override = default;
 
-    Status init_row_filters(const TFileRangeDesc& range, io::IOContext* 
io_ctx) final;
+    Status init_row_filters() final;
+
+    Status get_next_block_inner(Block* block, size_t* read_rows, bool* eof) 
final;
 
 protected:
     struct PaimonProfile {
@@ -40,23 +43,21 @@ protected:
         RuntimeProfile::Counter* delete_files_read_time;
     };
     std::vector<int64_t> _delete_rows;
-    RuntimeProfile* _profile;
     PaimonProfile _paimon_profile;
-    virtual void set_delete_rows() = 0;
 
-private:
-    const TFileScanRangeParams& _params;
+    virtual void set_delete_rows() = 0;
 };
 
 class PaimonOrcReader final : public PaimonReader {
 public:
     ENABLE_FACTORY_CREATOR(PaimonOrcReader);
     PaimonOrcReader(std::unique_ptr<GenericReader> file_format_reader, 
RuntimeProfile* profile,
-                    const TFileScanRangeParams& params)
-            : PaimonReader(std::move(file_format_reader), profile, params) {};
+                    RuntimeState* state, const TFileScanRangeParams& params,
+                    const TFileRangeDesc& range, io::IOContext* io_ctx)
+            : PaimonReader(std::move(file_format_reader), profile, state, 
params, range, io_ctx) {};
     ~PaimonOrcReader() final = default;
 
-    void set_delete_rows() override {
+    void set_delete_rows() final {
         (reinterpret_cast<OrcReader*>(_file_format_reader.get()))
                 ->set_position_delete_rowids(&_delete_rows);
     }
@@ -66,11 +67,12 @@ class PaimonParquetReader final : public PaimonReader {
 public:
     ENABLE_FACTORY_CREATOR(PaimonParquetReader);
     PaimonParquetReader(std::unique_ptr<GenericReader> file_format_reader, 
RuntimeProfile* profile,
-                        const TFileScanRangeParams& params)
-            : PaimonReader(std::move(file_format_reader), profile, params) {};
+                        RuntimeState* state, const TFileScanRangeParams& 
params,
+                        const TFileRangeDesc& range, io::IOContext* io_ctx)
+            : PaimonReader(std::move(file_format_reader), profile, state, 
params, range, io_ctx) {};
     ~PaimonParquetReader() final = default;
 
-    void set_delete_rows() override {
+    void set_delete_rows() final {
         (reinterpret_cast<ParquetReader*>(_file_format_reader.get()))
                 ->set_delete_rows(&_delete_rows);
     }
diff --git a/be/src/vec/exec/format/table/table_format_reader.h 
b/be/src/vec/exec/format/table/table_format_reader.h
index b143149c8a6..6c03715c59f 100644
--- a/be/src/vec/exec/format/table/table_format_reader.h
+++ b/be/src/vec/exec/format/table/table_format_reader.h
@@ -17,13 +17,14 @@
 
 #pragma once
 
+#include <algorithm>
 #include <cstddef>
-#include <memory>
 #include <string>
-#include <unordered_map>
-#include <unordered_set>
 
 #include "common/status.h"
+#include "runtime/runtime_state.h"
+#include "util/runtime_profile.h"
+#include "vec/core/block.h"
 #include "vec/exec/format/generic_reader.h"
 
 namespace doris {
@@ -38,12 +39,44 @@ namespace doris::vectorized {
 #include "common/compile_check_begin.h"
 class TableFormatReader : public GenericReader {
 public:
-    TableFormatReader(std::unique_ptr<GenericReader> file_format_reader)
-            : _file_format_reader(std::move(file_format_reader)) {}
+    TableFormatReader(std::unique_ptr<GenericReader> file_format_reader, 
RuntimeState* state,
+                      RuntimeProfile* profile, const TFileScanRangeParams& 
params,
+                      const TFileRangeDesc& range, io::IOContext* io_ctx)
+            : _file_format_reader(std::move(file_format_reader)),
+              _state(state),
+              _profile(profile),
+              _params(params),
+              _range(range),
+              _io_ctx(io_ctx) {
+        if (range.table_format_params.__isset.table_level_row_count) {
+            _table_level_row_count = 
range.table_format_params.table_level_row_count;
+        } else {
+            _table_level_row_count = -1;
+        }
+    }
     ~TableFormatReader() override = default;
-    Status get_next_block(Block* block, size_t* read_rows, bool* eof) override 
{
-        return _file_format_reader->get_next_block(block, read_rows, eof);
+    Status get_next_block(Block* block, size_t* read_rows, bool* eof) final {
+        if (_push_down_agg_type == TPushAggOp::type::COUNT && 
_table_level_row_count >= 0) {
+            auto rows =
+                    std::min(_table_level_row_count, 
(int64_t)_state->query_options().batch_size);
+            _table_level_row_count -= rows;
+            auto mutate_columns = block->mutate_columns();
+            for (auto& col : mutate_columns) {
+                col->resize(rows);
+            }
+            block->set_columns(std::move(mutate_columns));
+            *read_rows = rows;
+            if (_table_level_row_count == 0) {
+                *eof = true;
+            }
+
+            return Status::OK();
+        }
+        return get_next_block_inner(block, read_rows, eof);
     }
+
+    virtual Status get_next_block_inner(Block* block, size_t* read_rows, bool* 
eof) = 0;
+
     Status get_columns(std::unordered_map<std::string, TypeDescriptor>* 
name_to_type,
                        std::unordered_set<std::string>* missing_cols) override 
{
         return _file_format_reader->get_columns(name_to_type, missing_cols);
@@ -63,11 +96,17 @@ public:
 
     bool fill_all_columns() const override { return 
_file_format_reader->fill_all_columns(); }
 
-    virtual Status init_row_filters(const TFileRangeDesc& range, 
io::IOContext* io_ctx) = 0;
+    virtual Status init_row_filters() = 0;
 
 protected:
-    std::string _table_format;                          // hudi, iceberg
+    std::string _table_format;                          // hudi, iceberg, 
paimon
     std::unique_ptr<GenericReader> _file_format_reader; // parquet, orc
+    RuntimeState* _state = nullptr;                     // for query options
+    RuntimeProfile* _profile = nullptr;
+    const TFileScanRangeParams& _params;
+    const TFileRangeDesc& _range;
+    io::IOContext* _io_ctx = nullptr;
+    int64_t _table_level_row_count = -1; // for optimization of count(*) push 
down
     void _collect_profile_before_close() override {
         if (_file_format_reader != nullptr) {
             _file_format_reader->collect_profile_before_close();
diff --git a/be/src/vec/exec/format/table/transactional_hive_reader.cpp 
b/be/src/vec/exec/format/table/transactional_hive_reader.cpp
index d550ff5e1d9..406d58813e9 100644
--- a/be/src/vec/exec/format/table/transactional_hive_reader.cpp
+++ b/be/src/vec/exec/format/table/transactional_hive_reader.cpp
@@ -19,7 +19,6 @@
 
 #include <re2/re2.h>
 
-#include "runtime/runtime_state.h"
 #include "transactional_hive_common.h"
 #include "vec/data_types/data_type_factory.hpp"
 #include "vec/exec/format/orc/vorc_reader.h"
@@ -41,12 +40,7 @@ 
TransactionalHiveReader::TransactionalHiveReader(std::unique_ptr<GenericReader>
                                                  RuntimeProfile* profile, 
RuntimeState* state,
                                                  const TFileScanRangeParams& 
params,
                                                  const TFileRangeDesc& range, 
io::IOContext* io_ctx)
-        : TableFormatReader(std::move(file_format_reader)),
-          _profile(profile),
-          _state(state),
-          _params(params),
-          _range(range),
-          _io_ctx(io_ctx) {
+        : TableFormatReader(std::move(file_format_reader), state, profile, 
params, range, io_ctx) {
     static const char* transactional_hive_profile = "TransactionalHiveProfile";
     ADD_TIMER(_profile, transactional_hive_profile);
     _transactional_orc_profile.num_delete_files =
@@ -74,7 +68,7 @@ Status TransactionalHiveReader::init_reader(
     return status;
 }
 
-Status TransactionalHiveReader::get_next_block(Block* block, size_t* 
read_rows, bool* eof) {
+Status TransactionalHiveReader::get_next_block_inner(Block* block, size_t* 
read_rows, bool* eof) {
     for (const auto& i : TransactionalHive::READ_PARAMS) {
         DataTypePtr data_type =
                 
DataTypeFactory::instance().create_data_type(TypeDescriptor(i.type), false);
@@ -93,8 +87,7 @@ Status TransactionalHiveReader::get_columns(
     return _file_format_reader->get_columns(name_to_type, missing_cols);
 }
 
-Status TransactionalHiveReader::init_row_filters(const TFileRangeDesc& range,
-                                                 io::IOContext* io_ctx) {
+Status TransactionalHiveReader::init_row_filters() {
     std::string data_file_path = _range.path;
     // the path in _range is remove the namenode prefix,
     // and the file_path in delete file is full path, so we should add it back.
@@ -128,7 +121,7 @@ Status TransactionalHiveReader::init_row_filters(const 
TFileRangeDesc& range,
 
     SCOPED_TIMER(_transactional_orc_profile.delete_files_read_time);
     for (const auto& delete_delta :
-         range.table_format_params.transactional_hive_params.delete_deltas) {
+         _range.table_format_params.transactional_hive_params.delete_deltas) {
         const std::string file_name = file_path.filename().string();
 
         //need opt.
diff --git a/be/src/vec/exec/format/table/transactional_hive_reader.h 
b/be/src/vec/exec/format/table/transactional_hive_reader.h
index 9c3f284464c..217f40b3e78 100644
--- a/be/src/vec/exec/format/table/transactional_hive_reader.h
+++ b/be/src/vec/exec/format/table/transactional_hive_reader.h
@@ -28,7 +28,6 @@
 #include "common/status.h"
 #include "exec/olap_common.h"
 #include "table_format_reader.h"
-#include "util/runtime_profile.h"
 #include "vec/common/hash_table/phmap_fwd_decl.h"
 
 namespace doris {
@@ -87,12 +86,12 @@ public:
                             io::IOContext* io_ctx);
     ~TransactionalHiveReader() override = default;
 
-    Status init_row_filters(const TFileRangeDesc& range, io::IOContext* 
io_ctx) override;
+    Status init_row_filters() final;
 
-    Status get_next_block(Block* block, size_t* read_rows, bool* eof) override;
+    Status get_next_block_inner(Block* block, size_t* read_rows, bool* eof) 
final;
 
     Status get_columns(std::unordered_map<std::string, TypeDescriptor>* 
name_to_type,
-                       std::unordered_set<std::string>* missing_cols) override;
+                       std::unordered_set<std::string>* missing_cols) final;
 
     Status init_reader(
             const std::vector<std::string>& column_names,
@@ -109,16 +108,10 @@ private:
         RuntimeProfile::Counter* delete_files_read_time = nullptr;
     };
 
-    RuntimeProfile* _profile = nullptr;
-    RuntimeState* _state = nullptr;
-    const TFileScanRangeParams& _params;
-    const TFileRangeDesc& _range;
     TransactionalHiveProfile _transactional_orc_profile;
     AcidRowIDSet _delete_rows;
     std::unique_ptr<IColumn::Filter> _delete_rows_filter_ptr;
     std::vector<std::string> _col_names;
-
-    io::IOContext* _io_ctx = nullptr;
 };
 
 inline bool operator<(const TransactionalHiveReader::AcidRowID& lhs,
diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp 
b/be/src/vec/exec/scan/vfile_scanner.cpp
index 1812ddbc737..fe0c7315c5d 100644
--- a/be/src/vec/exec/scan/vfile_scanner.cpp
+++ b/be/src/vec/exec/scan/vfile_scanner.cpp
@@ -27,7 +27,6 @@
 
 #include <algorithm>
 #include <boost/iterator/iterator_facade.hpp>
-#include <iterator>
 #include <map>
 #include <ranges>
 #include <tuple>
@@ -1006,8 +1005,8 @@ Status VFileScanner::_get_next_reader() {
                         &_slot_id_to_filter_conjuncts);
                 std::unique_ptr<PaimonParquetReader> paimon_reader =
                         
PaimonParquetReader::create_unique(std::move(parquet_reader), _profile,
-                                                           *_params);
-                RETURN_IF_ERROR(paimon_reader->init_row_filters(range, 
_io_ctx.get()));
+                                                           _state, *_params, 
range, _io_ctx.get());
+                RETURN_IF_ERROR(paimon_reader->init_row_filters());
                 _cur_reader = std::move(paimon_reader);
             } else {
                 bool hive_parquet_use_column_names = true;
@@ -1048,7 +1047,7 @@ Status VFileScanner::_get_next_reader() {
                         _file_col_names, _colname_to_value_range, 
_push_down_conjuncts,
                         _real_tuple_desc, _default_val_row_desc.get(),
                         &_not_single_slot_filter_conjuncts, 
&_slot_id_to_filter_conjuncts);
-                RETURN_IF_ERROR(tran_orc_reader->init_row_filters(range, 
_io_ctx.get()));
+                RETURN_IF_ERROR(tran_orc_reader->init_row_filters());
                 _cur_reader = std::move(tran_orc_reader);
             } else if (range.__isset.table_format_params &&
                        range.table_format_params.table_format_type == 
"iceberg") {
@@ -1068,9 +1067,9 @@ Status VFileScanner::_get_next_reader() {
                         &_file_col_names, _colname_to_value_range, 
_push_down_conjuncts, false,
                         _real_tuple_desc, _default_val_row_desc.get(),
                         &_not_single_slot_filter_conjuncts, 
&_slot_id_to_filter_conjuncts);
-                std::unique_ptr<PaimonOrcReader> paimon_reader =
-                        PaimonOrcReader::create_unique(std::move(orc_reader), 
_profile, *_params);
-                RETURN_IF_ERROR(paimon_reader->init_row_filters(range, 
_io_ctx.get()));
+                std::unique_ptr<PaimonOrcReader> paimon_reader = 
PaimonOrcReader::create_unique(
+                        std::move(orc_reader), _profile, _state, *_params, 
range, _io_ctx.get());
+                RETURN_IF_ERROR(paimon_reader->init_row_filters());
                 _cur_reader = std::move(paimon_reader);
             } else {
                 bool hive_orc_use_column_names = true;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java
index b7d34312313..8d3aeaa6a26 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java
@@ -61,6 +61,8 @@ public abstract class FileScanNode extends ExternalScanNode {
     // For explain
     protected long totalFileSize = 0;
     protected long totalPartitionNum = 0;
+    // For display pushdown agg result
+    protected long tableLevelRowCount = -1;
 
     public FileScanNode(PlanNodeId id, TupleDescriptor desc, String 
planNodeName, StatisticalType statisticalType,
             boolean needCheckColumnPriv) {
@@ -82,11 +84,12 @@ public abstract class FileScanNode extends ExternalScanNode 
{
         super.toThrift(planNode);
     }
 
-    public long getPushDownCount() {
-        // 1. Do not use `0`: If the number of entries in the table is 0,
-        //                    it is unclear whether optimization has been 
performed.
-        // 2. Do not use `null` or `-`: This makes it easier for the program 
to parse the `explain` data.
-        return -1;
+    protected void setPushDownCount(long count) {
+        tableLevelRowCount = count;
+    }
+
+    private long getPushDownCount() {
+        return tableLevelRowCount;
     }
 
     @Override
@@ -106,9 +109,9 @@ public abstract class FileScanNode extends ExternalScanNode 
{
             output.append("(approximate)");
         }
         output.append("inputSplitNum=").append(selectedSplitNum).append(", 
totalFileSize=")
-            .append(totalFileSize).append(", 
scanRanges=").append(scanRangeLocations.size()).append("\n");
+                .append(totalFileSize).append(", 
scanRanges=").append(scanRangeLocations.size()).append("\n");
         
output.append(prefix).append("partition=").append(selectedPartitionNum).append("/").append(totalPartitionNum)
-            .append("\n");
+                .append("\n");
 
         if (detailLevel == TExplainLevel.VERBOSE && !isBatchMode()) {
             output.append(prefix).append("backends:").append("\n");
@@ -133,25 +136,25 @@ public abstract class FileScanNode extends 
ExternalScanNode {
                 if (size <= 4) {
                     for (TFileRangeDesc file : fileRangeDescs) {
                         output.append(prefix).append("    
").append(file.getPath())
-                            .append(" start: ").append(file.getStartOffset())
-                            .append(" length: ").append(file.getSize())
-                            .append("\n");
+                                .append(" start: 
").append(file.getStartOffset())
+                                .append(" length: ").append(file.getSize())
+                                .append("\n");
                     }
                 } else {
                     for (int i = 0; i < 3; i++) {
                         TFileRangeDesc file = fileRangeDescs.get(i);
                         output.append(prefix).append("    
").append(file.getPath())
-                            .append(" start: ").append(file.getStartOffset())
-                            .append(" length: ").append(file.getSize())
-                            .append("\n");
+                                .append(" start: 
").append(file.getStartOffset())
+                                .append(" length: ").append(file.getSize())
+                                .append("\n");
                     }
                     int other = size - 4;
                     output.append(prefix).append("    ... other 
").append(other).append(" files ...\n");
                     TFileRangeDesc file = fileRangeDescs.get(size - 1);
                     output.append(prefix).append("    ").append(file.getPath())
-                        .append(" start: ").append(file.getStartOffset())
-                        .append(" length: ").append(file.getSize())
-                        .append("\n");
+                            .append(" start: ").append(file.getStartOffset())
+                            .append(" length: ").append(file.getSize())
+                            .append("\n");
                 }
             }
         }
@@ -182,10 +185,10 @@ public abstract class FileScanNode extends 
ExternalScanNode {
     }
 
     protected void setDefaultValueExprs(TableIf tbl,
-                                        Map<String, SlotDescriptor> 
slotDescByName,
-                                        Map<String, Expr> exprByName,
-                                        TFileScanRangeParams params,
-                                        boolean useVarcharAsNull) throws 
UserException {
+            Map<String, SlotDescriptor> slotDescByName,
+            Map<String, Expr> exprByName,
+            TFileScanRangeParams params,
+            boolean useVarcharAsNull) throws UserException {
         Preconditions.checkNotNull(tbl);
         TExpr tExpr = new TExpr();
         tExpr.setNodes(Lists.newArrayList());
@@ -222,7 +225,8 @@ public abstract class FileScanNode extends ExternalScanNode 
{
             // if slot desc is null, which mean it is an unrelated slot, just 
skip.
             // eg:
             // (a, b, c) set (x=a, y=b, z=c)
-            // c does not exist in file, the z will be filled with null, even 
if z has default value.
+            // c does not exist in file, the z will be filled with null, even 
if z has
+            // default value.
             // and if z is not nullable, the load will fail.
             if (slotDesc != null) {
                 if (expr != null) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
index 0a9269ce860..e5c140da53b 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
@@ -153,12 +153,12 @@ public class IcebergScanNode extends FileQueryScanNode {
     private void setIcebergParams(TFileRangeDesc rangeDesc, IcebergSplit 
icebergSplit) {
         TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc();
         
tableFormatFileDesc.setTableFormatType(icebergSplit.getTableFormatType().value());
+        if (tableLevelPushDownCount) {
+            
tableFormatFileDesc.setTableLevelRowCount(icebergSplit.getTableLevelRowCount());
+        }
         TIcebergFileDesc fileDesc = new TIcebergFileDesc();
         fileDesc.setFormatVersion(formatVersion);
         fileDesc.setOriginalFilePath(icebergSplit.getOriginalPath());
-        if (tableLevelPushDownCount) {
-            fileDesc.setRowCount(icebergSplit.getTableLevelRowCount());
-        }
         if (formatVersion < MIN_DELETE_FILE_SUPPORT_VERSION) {
             fileDesc.setContent(FileContent.DATA.id());
         } else {
@@ -336,6 +336,7 @@ public class IcebergScanNode extends FileQueryScanNode {
                 } else {
                     pushDownCountSplits = 
Collections.singletonList(splits.get(0));
                 }
+                setPushDownCount(countFromSnapshot);
                 assignCountToSplits(pushDownCountSplits, countFromSnapshot);
                 return pushDownCountSplits;
             }
@@ -476,11 +477,6 @@ public class IcebergScanNode extends FileQueryScanNode {
         super.toThrift(planNode);
     }
 
-    @Override
-    public long getPushDownCount() {
-        return getCountFromSnapshot();
-    }
-
     @Override
     public String getNodeExplainString(String prefix, TExplainLevel 
detailLevel) {
         if (pushdownIcebergPredicates.isEmpty()) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
index 59e7eed5d42..beb59e40462 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
@@ -40,6 +40,7 @@ import org.apache.doris.thrift.TPaimonFileDesc;
 import org.apache.doris.thrift.TPushAggOp;
 import org.apache.doris.thrift.TTableFormatFileDesc;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Sets;
 import org.apache.logging.log4j.LogManager;
@@ -56,6 +57,7 @@ import org.apache.paimon.utils.InstantiationUtil;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Base64;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -105,8 +107,6 @@ public class PaimonScanNode extends FileQueryScanNode {
     private int paimonSplitNum = 0;
     private List<SplitStat> splitStats = new ArrayList<>();
     private String serializedTable;
-
-    private boolean pushDownCount = false;
     private static final long COUNT_WITH_PARALLEL_SPLITS = 10000;
 
     public PaimonScanNode(PlanNodeId id,
@@ -187,7 +187,8 @@ public class PaimonScanNode extends FileQueryScanNode {
         fileDesc.setDbId(((PaimonExternalTable) 
source.getTargetTable()).getDbId());
         fileDesc.setTblId(source.getTargetTable().getId());
         fileDesc.setLastUpdateTime(source.getTargetTable().getUpdateTime());
-        // The hadoop conf should be same with 
PaimonExternalCatalog.createCatalog()#getConfiguration()
+        // The hadoop conf should be same with
+        // PaimonExternalCatalog.createCatalog()#getConfiguration()
         
fileDesc.setHadoopConf(source.getCatalog().getCatalogProperty().getHadoopProperties());
         Optional<DeletionFile> optDeletionFile = paimonSplit.getDeletionFile();
         if (optDeletionFile.isPresent()) {
@@ -198,10 +199,41 @@ public class PaimonScanNode extends FileQueryScanNode {
             tDeletionFile.setLength(deletionFile.length());
             fileDesc.setDeletionFile(tDeletionFile);
         }
+        if (paimonSplit.getRowCount().isPresent()) {
+            
tableFormatFileDesc.setTableLevelRowCount(paimonSplit.getRowCount().get());
+        }
         tableFormatFileDesc.setPaimonParams(fileDesc);
         rangeDesc.setTableFormatParams(tableFormatFileDesc);
     }
 
+    @VisibleForTesting
+    public static Optional<Long> 
calcuteTableLevelCount(List<org.apache.paimon.table.source.Split> paimonSplits) 
{
+        // check if all splits don't have deletion vector or cardinality of 
every
+        // deletion vector is not null
+        long totalCount = 0;
+        long deletionVectorCount = 0;
+
+        for (org.apache.paimon.table.source.Split s : paimonSplits) {
+            totalCount += s.rowCount();
+
+            Optional<List<DeletionFile>> deletionFiles = s.deletionFiles();
+            if (deletionFiles.isPresent()) {
+                for (DeletionFile dv : deletionFiles.get()) {
+                    if (dv != null) {
+                        Long cardinality = dv.cardinality();
+                        if (cardinality == null) {
+                            // if there is a null deletion vector, we can't 
calculate the table level count
+                            return Optional.empty();
+                        } else {
+                            deletionVectorCount += cardinality;
+                        }
+                    }
+                }
+            }
+        }
+        return Optional.of(totalCount - deletionVectorCount);
+    }
+
     @Override
     public List<Split> getSplits(int numBackends) throws UserException {
         boolean forceJniScanner = sessionVariable.isForceJniScanner();
@@ -245,38 +277,34 @@ public class PaimonScanNode extends FileQueryScanNode {
                     splitStat.setType(SplitReadType.NATIVE);
                     splitStat.setRawFileConvertable(true);
                     List<RawFile> rawFiles = optRawFiles.get();
-                    if (optDeletionFiles.isPresent()) {
-                        List<DeletionFile> deletionFiles = 
optDeletionFiles.get();
-                        for (int i = 0; i < rawFiles.size(); i++) {
-                            RawFile file = rawFiles.get(i);
-                            DeletionFile deletionFile = deletionFiles.get(i);
-                            LocationPath locationPath = new 
LocationPath(file.path(),
-                                    source.getCatalog().getProperties());
-                            try {
-                                List<Split> dorisSplits = 
FileSplitter.splitFile(
-                                        locationPath,
-                                        getRealFileSplitSize(0),
-                                        null,
-                                        file.length(),
-                                        -1,
-                                        true,
-                                        null,
-                                        
PaimonSplit.PaimonSplitCreator.DEFAULT);
-                                for (Split dorisSplit : dorisSplits) {
-                                    // the element in DeletionFiles might be 
null
-                                    if (deletionFile != null) {
-                                        splitStat.setHasDeletionVector(true);
-                                        ((PaimonSplit) 
dorisSplit).setDeletionFile(deletionFile);
-                                    }
-                                    splits.add(dorisSplit);
+                    for (int i = 0; i < rawFiles.size(); i++) {
+                        RawFile file = rawFiles.get(i);
+                        LocationPath locationPath = new 
LocationPath(file.path(),
+                                source.getCatalog().getProperties());
+                        try {
+                            List<Split> dorisSplits = FileSplitter.splitFile(
+                                    locationPath,
+                                    // if applyCountPushdown is true, we can't 
to split the file
+                                    // becasue the raw file and deletion 
vector is one-to-one mapping
+                                    getRealFileSplitSize(applyCountPushdown ? 
Long.MAX_VALUE : 0),
+                                    null,
+                                    file.length(),
+                                    -1,
+                                    true,
+                                    null,
+                                    PaimonSplit.PaimonSplitCreator.DEFAULT);
+                            for (Split dorisSplit : dorisSplits) {
+                                // try to set deletion file
+                                if (optDeletionFiles.isPresent() && 
optDeletionFiles.get().get(i) != null) {
+                                    ((PaimonSplit) 
dorisSplit).setDeletionFile(optDeletionFiles.get().get(i));
+                                    splitStat.setHasDeletionVector(true);
                                 }
-                                ++rawFileSplitNum;
-                            } catch (IOException e) {
-                                throw new UserException("Paimon error to split 
file: " + e.getMessage(), e);
                             }
+                            splits.addAll(dorisSplits);
+                            ++rawFileSplitNum;
+                        } catch (IOException e) {
+                            throw new UserException("Paimon error to split 
file: " + e.getMessage(), e);
                         }
-                    } else {
-                        createRawFileSplits(rawFiles, splits, 
applyCountPushdown ? Long.MAX_VALUE : 0);
                     }
                 } else {
                     if (ignoreSplitType == 
SessionVariable.IgnoreSplitType.IGNORE_JNI) {
@@ -295,31 +323,30 @@ public class PaimonScanNode extends FileQueryScanNode {
             splitStats.add(splitStat);
         }
 
-        this.selectedPartitionNum = selectedPartitionValues.size();
-        // TODO: get total partition number
-        return splits;
-    }
-
-    private void createRawFileSplits(List<RawFile> rawFiles, List<Split> 
splits, long blockSize) throws UserException {
-        for (RawFile file : rawFiles) {
-            LocationPath locationPath = new LocationPath(file.path(),
-                    source.getCatalog().getProperties());
-            try {
-                splits.addAll(
-                        FileSplitter.splitFile(
-                                locationPath,
-                                getRealFileSplitSize(blockSize),
-                                null,
-                                file.length(),
-                                -1,
-                                true,
-                                null,
-                                PaimonSplit.PaimonSplitCreator.DEFAULT));
-                ++rawFileSplitNum;
-            } catch (IOException e) {
-                throw new UserException("Paimon error to split file: " + 
e.getMessage(), e);
+        // if applyCountPushdown is true, calcute row count for count pushdown
+        if (applyCountPushdown) {
+            // we can create a special empty split and skip the plan process
+            if (splits.isEmpty()) {
+                return splits;
+            }
+            Optional<Long> optTableLevelCount = 
calcuteTableLevelCount(paimonSplits);
+            if (optTableLevelCount.isPresent()) {
+                long tableLevelRowCount = optTableLevelCount.get();
+                List<Split> pushDownCountSplits;
+                if (tableLevelRowCount > COUNT_WITH_PARALLEL_SPLITS) {
+                    int minSplits = 
sessionVariable.getParallelExecInstanceNum() * numBackends;
+                    pushDownCountSplits = splits.subList(0, 
Math.min(splits.size(), minSplits));
+                } else {
+                    pushDownCountSplits = 
Collections.singletonList(splits.get(0));
+                }
+                setPushDownCount(tableLevelRowCount);
+                assignCountToSplits(pushDownCountSplits, tableLevelRowCount);
+                return pushDownCountSplits;
             }
         }
+
+        this.selectedPartitionNum = selectedPartitionValues.size();
+        return splits;
     }
 
     private String getFileFormat(String path) {
@@ -405,4 +432,13 @@ public class PaimonScanNode extends FileQueryScanNode {
         }
         return sb.toString();
     }
+
+    private void assignCountToSplits(List<Split> splits, long totalCount) {
+        int size = splits.size();
+        long countPerSplit = totalCount / size;
+        for (int i = 0; i < size - 1; i++) {
+            ((PaimonSplit) splits.get(i)).setRowCount(countPerSplit);
+        }
+        ((PaimonSplit) splits.get(size - 1)).setRowCount(countPerSplit + 
totalCount % size);
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSplit.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSplit.java
index 988f043ad0e..f4d3d724089 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSplit.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSplit.java
@@ -36,13 +36,13 @@ public class PaimonSplit extends FileSplit {
     private static final LocationPath DUMMY_PATH = new 
LocationPath("/dummyPath", Maps.newHashMap());
     private Split split;
     private TableFormatType tableFormatType;
-    private Optional<DeletionFile> optDeletionFile;
+    private Optional<DeletionFile> optDeletionFile = Optional.empty();
+    private Optional<Long> optRowCount = Optional.empty();
 
     public PaimonSplit(Split split) {
         super(DUMMY_PATH, 0, 0, 0, 0, null, null);
         this.split = split;
         this.tableFormatType = TableFormatType.PAIMON;
-        this.optDeletionFile = Optional.empty();
 
         if (split instanceof DataSplit) {
             List<DataFileMeta> dataFileMetas = ((DataSplit) split).dataFiles();
@@ -57,7 +57,6 @@ public class PaimonSplit extends FileSplit {
             String[] hosts, List<String> partitionList) {
         super(file, start, length, fileLength, modificationTime, hosts, 
partitionList);
         this.tableFormatType = TableFormatType.PAIMON;
-        this.optDeletionFile = Optional.empty();
         this.selfSplitWeight = length;
     }
 
@@ -90,6 +89,14 @@ public class PaimonSplit extends FileSplit {
         this.optDeletionFile = Optional.of(deletionFile);
     }
 
+    public Optional<Long> getRowCount() {
+        return optRowCount;
+    }
+
+    public void setRowCount(long rowCount) {
+        this.optRowCount = Optional.of(rowCount);
+    }
+
     public static class PaimonSplitCreator implements SplitCreator {
 
         static final PaimonSplitCreator DEFAULT = new PaimonSplitCreator();
@@ -103,7 +110,7 @@ public class PaimonSplit extends FileSplit {
                 long modificationTime,
                 String[] hosts,
                 List<String> partitionValues) {
-            PaimonSplit split =  new PaimonSplit(path, start, length, 
fileLength,
+            PaimonSplit split = new PaimonSplit(path, start, length, 
fileLength,
                     modificationTime, hosts, partitionValues);
             split.setTargetSplitSize(fileSplitSize);
             return split;
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/source/PaimonScanNodeTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/source/PaimonScanNodeTest.java
new file mode 100644
index 00000000000..f67f3a93977
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/source/PaimonScanNodeTest.java
@@ -0,0 +1,155 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.paimon.source;
+
+import org.apache.paimon.table.source.DeletionFile;
+import org.apache.paimon.table.source.Split;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+
+public class PaimonScanNodeTest {
+
+    @Test
+    public void testCalcuteTableLevelCount() {
+        List<Split> splits = new ArrayList<>();
+
+        // Create mock splits with row count and deletion files
+        Split split1 = new Split() {
+            @Override
+            public long rowCount() {
+                return 100;
+            }
+
+            @Override
+            public Optional<List<DeletionFile>> deletionFiles() {
+                List<DeletionFile> deletionFiles = new ArrayList<>();
+                deletionFiles.add(new DeletionFile("path1", 0, 10, 10L));
+                deletionFiles.add(new DeletionFile("path2", 0, 20, 20L));
+                return Optional.of(deletionFiles);
+            }
+        };
+
+        Split split2 = new Split() {
+            @Override
+            public long rowCount() {
+                return 200;
+            }
+
+            @Override
+            public Optional<List<DeletionFile>> deletionFiles() {
+                List<DeletionFile> deletionFiles = new ArrayList<>();
+                deletionFiles.add(new DeletionFile("path3", 0, 30, 30L));
+                deletionFiles.add(new DeletionFile("path4", 0, 40, 40L));
+                return Optional.of(deletionFiles);
+            }
+        };
+
+        splits.add(split1);
+        splits.add(split2);
+
+        Optional<Long> result = PaimonScanNode.calcuteTableLevelCount(splits);
+        Assert.assertTrue(result.isPresent());
+        Assert.assertEquals(200, result.get().longValue());
+    }
+
+    @Test
+    public void testCalcuteTableLevelCountWithNullDeletionFile() {
+        List<Split> splits = new ArrayList<>();
+
+        // Create mock splits with row count and null deletion files
+        Split split1 = new Split() {
+            @Override
+            public long rowCount() {
+                return 100;
+            }
+
+            @Override
+            public Optional<List<DeletionFile>> deletionFiles() {
+                List<DeletionFile> deletionFiles = new ArrayList<>();
+                deletionFiles.add(null);
+                deletionFiles.add(new DeletionFile("path2", 0, 20, 20L));
+                return Optional.of(deletionFiles);
+            }
+        };
+
+        Split split2 = new Split() {
+            @Override
+            public long rowCount() {
+                return 200;
+            }
+
+            @Override
+            public Optional<List<DeletionFile>> deletionFiles() {
+                return Optional.empty();
+            }
+        };
+
+        splits.add(split1);
+        splits.add(split2);
+
+        Optional<Long> result = PaimonScanNode.calcuteTableLevelCount(splits);
+        Assert.assertTrue(result.isPresent());
+        Assert.assertEquals(280, result.get().longValue());
+    }
+
+    @Test
+    public void testCalcuteTableLevelCountWithNullCardinality() {
+        List<Split> splits = new ArrayList<>();
+
+        // Create mock splits with row count and deletion files with null 
cardinality
+        Split split1 = new Split() {
+            @Override
+            public long rowCount() {
+                return 100;
+            }
+
+            @Override
+            public Optional<List<DeletionFile>> deletionFiles() {
+                List<DeletionFile> deletionFiles = new ArrayList<>();
+                deletionFiles.add(new DeletionFile("path1", 0, 10, null));
+                deletionFiles.add(new DeletionFile("path2", 0, 20, 20L));
+                return Optional.of(deletionFiles);
+            }
+        };
+
+        Split split2 = new Split() {
+            @Override
+            public long rowCount() {
+                return 200;
+            }
+
+            @Override
+            public Optional<List<DeletionFile>> deletionFiles() {
+                List<DeletionFile> deletionFiles = new ArrayList<>();
+                deletionFiles.add(new DeletionFile("path3", 0, 30, 30L));
+                deletionFiles.add(null);
+                return Optional.of(deletionFiles);
+            }
+        };
+
+        splits.add(split1);
+        splits.add(split2);
+
+        Optional<Long> result = PaimonScanNode.calcuteTableLevelCount(splits);
+        Assert.assertFalse(result.isPresent());
+    }
+}
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index a46cc8f7299..f6d795259c1 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -296,6 +296,7 @@ struct TIcebergFileDesc {
     // Deprecated
     5: optional Exprs.TExpr file_select_conjunct;
     6: optional string original_file_path;
+    // Deprecated
     7: optional i64 row_count;
 }
 
@@ -320,6 +321,7 @@ struct TPaimonFileDesc {
     12: optional TPaimonDeletionFileDesc deletion_file;
     13: optional map<string, string> hadoop_conf // deprecated
     14: optional string paimon_table  // deprecated
+    15: optional i64 row_count // deprecated
 }
 
 struct TTrinoConnectorFileDesc {
@@ -387,6 +389,7 @@ struct TTableFormatFileDesc {
     6: optional TMaxComputeFileDesc max_compute_params
     7: optional TTrinoConnectorFileDesc trino_connector_params
     8: optional TLakeSoulFileDesc lakesoul_params
+    9: optional i64 table_level_row_count
 }
 
 enum TTextSerdeType {
diff --git 
a/regression-test/data/external_table_p0/paimon/test_paimon_catalog.out 
b/regression-test/data/external_table_p0/paimon/test_paimon_catalog.out
index a394836625d..f3b44964915 100644
Binary files 
a/regression-test/data/external_table_p0/paimon/test_paimon_catalog.out and 
b/regression-test/data/external_table_p0/paimon/test_paimon_catalog.out differ
diff --git 
a/regression-test/data/external_table_p0/paimon/test_paimon_deletion_vector.out 
b/regression-test/data/external_table_p0/paimon/test_paimon_deletion_vector.out
new file mode 100644
index 00000000000..f0b1e92a088
Binary files /dev/null and 
b/regression-test/data/external_table_p0/paimon/test_paimon_deletion_vector.out 
differ
diff --git 
a/regression-test/suites/external_table_p0/paimon/test_paimon_catalog.groovy 
b/regression-test/suites/external_table_p0/paimon/test_paimon_catalog.groovy
index 9668cbb0950..41afb02e0f9 100644
--- a/regression-test/suites/external_table_p0/paimon/test_paimon_catalog.groovy
+++ b/regression-test/suites/external_table_p0/paimon/test_paimon_catalog.groovy
@@ -181,13 +181,6 @@ suite("test_paimon_catalog", 
"p0,external,doris,external_docker,external_docker_
             def c108= """ select id from tb_with_upper_case where id = 1 """
             def c109= """ select id from tb_with_upper_case where id < 1 """
 
-            def c110 = """select count(*) from deletion_vector_orc;"""
-            def c111 = """select count(*) from deletion_vector_parquet;"""
-            def c112 = """select count(*) from deletion_vector_orc where id > 
2;"""
-            def c113 = """select count(*) from deletion_vector_parquet where 
id > 2;"""
-            def c114 = """select * from deletion_vector_orc where id > 2;"""
-            def c115 = """select * from deletion_vector_parquet where id > 
2;"""
-
             String hdfs_port = context.config.otherConfigs.get("hive2HdfsPort")
             String catalog_name = "ctl_test_paimon_catalog"
             String externalEnvIp = 
context.config.otherConfigs.get("externalEnvIp")
@@ -296,13 +289,6 @@ suite("test_paimon_catalog", 
"p0,external,doris,external_docker,external_docker_
                 qt_c107 c107
                 qt_c108 c108
                 qt_c109 c109
-
-                qt_c110 c110
-                qt_c111 c111
-                qt_c112 c112
-                qt_c113 c113
-                qt_c114 c114
-                qt_c115 c115
             }
 
             test_cases("false", "false")
diff --git 
a/regression-test/suites/external_table_p0/paimon/test_paimon_deletion_vector.groovy
 
b/regression-test/suites/external_table_p0/paimon/test_paimon_deletion_vector.groovy
new file mode 100644
index 00000000000..fade251ed56
--- /dev/null
+++ 
b/regression-test/suites/external_table_p0/paimon/test_paimon_deletion_vector.groovy
@@ -0,0 +1,96 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_paimon_deletion_vector", 
"p0,external,doris,external_docker,external_docker_doris") {
+
+    logger.info("start paimon test")
+    String enabled = context.config.otherConfigs.get("enablePaimonTest")
+    if (enabled == null || !enabled.equalsIgnoreCase("true")) {
+        logger.info("disabled paimon test")
+        return
+    }
+
+    try {
+        String catalog_name = "test_paimon_deletion_vector"
+        String hdfs_port = context.config.otherConfigs.get("hive2HdfsPort")
+        String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+        sql """drop catalog if exists ${catalog_name}"""
+        sql """create catalog if not exists ${catalog_name} properties (
+            "type" = "paimon",
+            "paimon.catalog.type"="filesystem",
+            "warehouse" = 
"hdfs://${externalEnvIp}:${hdfs_port}/user/doris/paimon1"
+        );"""
+        sql """use `${catalog_name}`.`db1`"""
+
+        def test_cases = { String force ->
+            sql """ set force_jni_scanner=${force} """
+            qt_1 """select count(*) from deletion_vector_orc;"""
+            qt_2 """select count(*) from deletion_vector_parquet;"""
+            qt_3 """select count(*) from deletion_vector_orc where id > 2;"""
+            qt_4 """select count(*) from deletion_vector_parquet where id > 
2;"""
+            qt_5 """select * from deletion_vector_orc where id > 2 order by 
id;"""
+            qt_6 """select * from deletion_vector_parquet where id > 2 order 
by id;"""
+            qt_7 """select * from deletion_vector_table_1_0 order by id;"""
+            qt_8 """select count(*) from deletion_vector_table_1_0;"""
+            qt_9 """select count(*) from deletion_vector_table_1_0 where id > 
2;"""
+        }
+
+        def test_table_count_push_down = { String force ->
+            sql """ set force_jni_scanner=${force} """
+            explain {
+                sql("select count(*) from deletion_vector_orc;")
+                contains "pushdown agg=COUNT (-1)"
+            }
+            explain {
+                sql("select count(*) from deletion_vector_parquet;")
+                contains "pushdown agg=COUNT (-1)"
+            }
+            explain {
+                sql("select count(*) from deletion_vector_table_1_0;")
+                contains "pushdown agg=COUNT (8)"
+            }
+        }
+
+        def test_not_table_count_push_down = { String force ->
+            sql """ set enable_count_push_down_for_external_table=false; """
+            sql """ set force_jni_scanner=${force} """
+            explain {
+                sql("select count(*) from deletion_vector_orc;")
+                contains "pushdown agg=NONE"
+            }
+            explain {
+                sql("select count(*) from deletion_vector_parquet;")
+                contains "pushdown agg=NONE"
+            }
+            explain {
+                sql("select count(*) from deletion_vector_table_1_0;")
+                contains "pushdown agg=NONE"
+            }
+        }
+
+        test_cases("false")
+        test_cases("true")
+        test_table_count_push_down("false")
+        test_table_count_push_down("true")
+        test_not_table_count_push_down("false")
+        test_not_table_count_push_down("true")
+    } finally {
+        sql """ set enable_count_push_down_for_external_table=true; """
+        sql """set force_jni_scanner=false"""
+    }
+
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to