This is an automated email from the ASF dual-hosted git repository.

kakachen pushed a commit to branch data_lake_reader_refactoring
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/data_lake_reader_refactoring 
by this push:
     new 4541ae1b562 fix v3 (#62305)
4541ae1b562 is described below

commit 4541ae1b562325668243cd6b2a1f5ec50a711331
Author: daidai <[email protected]>
AuthorDate: Fri Apr 10 10:56:42 2026 +0800

    fix v3 (#62305)
    
    ### What problem does this PR solve?
    
    Issue Number: close #xxx
    
    Related PR: #xxx
    
    Problem Summary:
    
    ### Release note
    
    None
    
    ### Check List (For Author)
    
    - Test <!-- At least one of them must be included. -->
        - [ ] Regression test
        - [ ] Unit Test
        - [ ] Manual test (add detailed scripts or steps below)
        - [ ] No need to test or manual test. Explain why:
    - [ ] This is a refactor/code format and no logic has been changed.
            - [ ] Previous test can cover this change.
            - [ ] No code files have been changed.
            - [ ] Other reason <!-- Add your reason?  -->
    
    - Behavior changed:
        - [ ] No.
        - [ ] Yes. <!-- Explain the behavior change -->
    
    - Does this need documentation?
        - [ ] No.
    - [ ] Yes. <!-- Add document PR link here. eg:
    https://github.com/apache/doris-website/pull/1214 -->
    
    ### Check List (For Reviewer who merge this PR)
    
    - [ ] Confirm the release note
    - [ ] Confirm test cases
    - [ ] Confirm document
    - [ ] Add branch pick label <!-- Add branch pick label that this PR
    should merge into -->
---
 be/src/exec/scan/file_scanner.cpp                  | 134 +++------
 be/src/exec/scan/file_scanner.h                    |   6 +-
 be/src/format/orc/vorc_reader.cpp                  | 102 ++-----
 be/src/format/orc/vorc_reader.h                    |  33 ++-
 be/src/format/parquet/vparquet_group_reader.cpp    | 105 ++-----
 be/src/format/parquet/vparquet_group_reader.h      |   9 -
 be/src/format/parquet/vparquet_reader.cpp          |  97 ++++---
 be/src/format/parquet/vparquet_reader.h            |  29 +-
 be/src/format/table/hive_reader.cpp                |  20 ++
 be/src/format/table/iceberg_reader.cpp             |  92 +++++--
 be/src/format/table/iceberg_reader.h               |  16 --
 be/src/format/table/iceberg_reader_mixin.h         |  77 ++++++
 be/src/format/table/table_format_reader.h          |  53 ++++
 be/src/format/table/transactional_hive_reader.cpp  |  13 +-
 .../apache/doris/datasource/FileQueryScanNode.java |   3 -
 .../doris/datasource/hive/source/HiveScanNode.java |  10 +
 .../datasource/iceberg/source/IcebergScanNode.java |  15 +
 .../doris/datasource/tvf/source/TVFScanNode.java   |  11 +
 ...ceberg_v3_row_lineage_rewrite_data_files.groovy | 244 +++++++++++++++++
 ...est_iceberg_v2_to_v3_doris_spark_compare.groovy | 223 +++++++++++++++
 ...test_iceberg_v3_row_lineage_query_insert.groovy | 304 +++++++++++++++++++++
 ...eberg_v3_row_lineage_update_delete_merge.groovy | 292 ++++++++++++++++++++
 22 files changed, 1490 insertions(+), 398 deletions(-)

diff --git a/be/src/exec/scan/file_scanner.cpp 
b/be/src/exec/scan/file_scanner.cpp
index 841d29bf183..300d103b54a 100644
--- a/be/src/exec/scan/file_scanner.cpp
+++ b/be/src/exec/scan/file_scanner.cpp
@@ -868,14 +868,13 @@ void FileScanner::_truncate_char_or_varchar_column(Block* 
block, int idx, int le
     Block::erase_useless_column(block, num_columns_without_result);
 }
 
-Status FileScanner::_create_row_id_column_iterator() {
+std::shared_ptr<segment_v2::RowIdColumnIteratorV2> 
FileScanner::_create_row_id_column_iterator() {
     auto& id_file_map = _state->get_id_file_map();
     auto file_id = id_file_map->get_file_mapping_id(
             
std::make_shared<FileMapping>(((FileScanLocalState*)_local_state)->parent_id(),
                                           _current_range, 
_should_enable_file_meta_cache()));
-    _row_id_column_iterator_pair.first = 
std::make_shared<RowIdColumnIteratorV2>(
-            IdManager::ID_VERSION, BackendOptions::get_backend_id(), file_id);
-    return Status::OK();
+    return std::make_shared<RowIdColumnIteratorV2>(IdManager::ID_VERSION,
+                                                   
BackendOptions::get_backend_id(), file_id);
 }
 
 void FileScanner::_fill_base_init_context(ReaderInitContext* ctx) {
@@ -1236,28 +1235,10 @@ Status FileScanner::_init_parquet_reader(FileMetaCache* 
file_meta_cache_ptr,
         std::unique_ptr<IcebergParquetReader> iceberg_reader = 
IcebergParquetReader::create_unique(
                 _kv_cache, _profile, *_params, range, 
_state->query_options().batch_size,
                 &_state->timezone_obj(), _io_ctx.get(), _state, 
file_meta_cache_ptr);
-
-        // Transfer properties
-        if (_row_id_column_iterator_pair.second != -1) {
-            RETURN_IF_ERROR(_create_row_id_column_iterator());
-            
iceberg_reader->set_row_id_column_iterator(_row_id_column_iterator_pair);
-        }
-        if (_row_lineage_columns.row_id_column_idx != -1 ||
-            _row_lineage_columns.last_updated_sequence_number_column_idx != 
-1) {
-            auto row_lineage_columns = std::make_shared<RowLineageColumns>();
-            row_lineage_columns->row_id_column_idx = 
_row_lineage_columns.row_id_column_idx;
-            row_lineage_columns->last_updated_sequence_number_column_idx =
-                    
_row_lineage_columns.last_updated_sequence_number_column_idx;
-            const auto& iceberg_params = 
range.table_format_params.iceberg_params;
-            row_lineage_columns->first_row_id =
-                    iceberg_params.__isset.first_row_id ? 
iceberg_params.first_row_id : -1;
-            row_lineage_columns->last_updated_sequence_number =
-                    iceberg_params.__isset.last_updated_sequence_number
-                            ? iceberg_params.last_updated_sequence_number
-                            : -1;
-            
iceberg_reader->set_row_lineage_columns(std::move(row_lineage_columns));
-        }
-
+        iceberg_reader->set_create_row_id_column_iterator_func(
+                [this]() -> std::shared_ptr<segment_v2::RowIdColumnIteratorV2> 
{
+                    return _create_row_id_column_iterator();
+                });
         init_status = 
static_cast<GenericReader*>(iceberg_reader.get())->init_reader(&pctx);
         _cur_reader = std::move(iceberg_reader);
     } else if (range.__isset.table_format_params &&
@@ -1266,10 +1247,6 @@ Status FileScanner::_init_parquet_reader(FileMetaCache* 
file_meta_cache_ptr,
         auto paimon_reader = PaimonParquetReader::create_unique(
                 _profile, *_params, range, _state->query_options().batch_size,
                 &_state->timezone_obj(), _kv_cache, _io_ctx.get(), _state, 
file_meta_cache_ptr);
-        if (_row_id_column_iterator_pair.second != -1) {
-            RETURN_IF_ERROR(_create_row_id_column_iterator());
-            
paimon_reader->set_row_id_column_iterator(_row_id_column_iterator_pair);
-        }
         init_status = 
static_cast<GenericReader*>(paimon_reader.get())->init_reader(&pctx);
         _cur_reader = std::move(paimon_reader);
     } else if (range.__isset.table_format_params &&
@@ -1278,10 +1255,6 @@ Status FileScanner::_init_parquet_reader(FileMetaCache* 
file_meta_cache_ptr,
         auto hudi_reader = HudiParquetReader::create_unique(
                 _profile, *_params, range, _state->query_options().batch_size,
                 &_state->timezone_obj(), _io_ctx.get(), _state, 
file_meta_cache_ptr);
-        if (_row_id_column_iterator_pair.second != -1) {
-            RETURN_IF_ERROR(_create_row_id_column_iterator());
-            
hudi_reader->set_row_id_column_iterator(_row_id_column_iterator_pair);
-        }
         init_status = 
static_cast<GenericReader*>(hudi_reader.get())->init_reader(&pctx);
         _cur_reader = std::move(hudi_reader);
     } else if (range.table_format_params.table_format_type == "hive") {
@@ -1289,10 +1262,10 @@ Status FileScanner::_init_parquet_reader(FileMetaCache* 
file_meta_cache_ptr,
                 _profile, *_params, range, _state->query_options().batch_size,
                 &_state->timezone_obj(), _io_ctx.get(), _state, 
&_is_file_slot, file_meta_cache_ptr,
                 _state->query_options().enable_parquet_lazy_mat);
-        if (_row_id_column_iterator_pair.second != -1) {
-            RETURN_IF_ERROR(_create_row_id_column_iterator());
-            
hive_reader->set_row_id_column_iterator(_row_id_column_iterator_pair);
-        }
+        hive_reader->set_create_row_id_column_iterator_func(
+                [this]() -> std::shared_ptr<segment_v2::RowIdColumnIteratorV2> 
{
+                    return _create_row_id_column_iterator();
+                });
         init_status = 
static_cast<GenericReader*>(hive_reader.get())->init_reader(&pctx);
         _cur_reader = std::move(hive_reader);
     } else if (range.table_format_params.table_format_type == "tvf") {
@@ -1302,10 +1275,10 @@ Status FileScanner::_init_parquet_reader(FileMetaCache* 
file_meta_cache_ptr,
                     &_state->timezone_obj(), _io_ctx.get(), _state, 
file_meta_cache_ptr,
                     _state->query_options().enable_parquet_lazy_mat);
         }
-        if (_row_id_column_iterator_pair.second != -1) {
-            RETURN_IF_ERROR(_create_row_id_column_iterator());
-            
parquet_reader->set_row_id_column_iterator(_row_id_column_iterator_pair);
-        }
+        parquet_reader->set_create_row_id_column_iterator_func(
+                [this]() -> std::shared_ptr<segment_v2::RowIdColumnIteratorV2> 
{
+                    return _create_row_id_column_iterator();
+                });
         init_status = 
static_cast<GenericReader*>(parquet_reader.get())->init_reader(&pctx);
         _cur_reader = std::move(parquet_reader);
     } else if (_is_load) {
@@ -1340,10 +1313,10 @@ Status FileScanner::_init_orc_reader(FileMetaCache* 
file_meta_cache_ptr,
         auto tran_orc_reader = TransactionalHiveReader::create_unique(
                 _profile, _state, *_params, range, 
_state->query_options().batch_size,
                 _state->timezone(), _io_ctx.get(), file_meta_cache_ptr);
-        if (_row_id_column_iterator_pair.second != -1) {
-            RETURN_IF_ERROR(_create_row_id_column_iterator());
-            
tran_orc_reader->set_row_id_column_iterator(_row_id_column_iterator_pair);
-        }
+        tran_orc_reader->set_create_row_id_column_iterator_func(
+                [this]() -> std::shared_ptr<segment_v2::RowIdColumnIteratorV2> 
{
+                    return _create_row_id_column_iterator();
+                });
         init_status = 
static_cast<GenericReader*>(tran_orc_reader.get())->init_reader(&octx);
 
         _cur_reader = std::move(tran_orc_reader);
@@ -1353,27 +1326,10 @@ Status FileScanner::_init_orc_reader(FileMetaCache* 
file_meta_cache_ptr,
         std::unique_ptr<IcebergOrcReader> iceberg_reader = 
IcebergOrcReader::create_unique(
                 _kv_cache, _profile, _state, *_params, range, 
_state->query_options().batch_size,
                 _state->timezone(), _io_ctx.get(), file_meta_cache_ptr);
-
-        // Transfer properties
-        if (_row_id_column_iterator_pair.second != -1) {
-            RETURN_IF_ERROR(_create_row_id_column_iterator());
-            
iceberg_reader->set_row_id_column_iterator(_row_id_column_iterator_pair);
-        }
-        if (_row_lineage_columns.row_id_column_idx != -1 ||
-            _row_lineage_columns.last_updated_sequence_number_column_idx != 
-1) {
-            auto row_lineage_columns = std::make_shared<RowLineageColumns>();
-            row_lineage_columns->row_id_column_idx = 
_row_lineage_columns.row_id_column_idx;
-            row_lineage_columns->last_updated_sequence_number_column_idx =
-                    
_row_lineage_columns.last_updated_sequence_number_column_idx;
-            const auto& iceberg_params = 
range.table_format_params.iceberg_params;
-            row_lineage_columns->first_row_id =
-                    iceberg_params.__isset.first_row_id ? 
iceberg_params.first_row_id : -1;
-            row_lineage_columns->last_updated_sequence_number =
-                    iceberg_params.__isset.last_updated_sequence_number
-                            ? iceberg_params.last_updated_sequence_number
-                            : -1;
-            
iceberg_reader->set_row_lineage_columns(std::move(row_lineage_columns));
-        }
+        iceberg_reader->set_create_row_id_column_iterator_func(
+                [this]() -> std::shared_ptr<segment_v2::RowIdColumnIteratorV2> 
{
+                    return _create_row_id_column_iterator();
+                });
         init_status = 
static_cast<GenericReader*>(iceberg_reader.get())->init_reader(&octx);
 
         _cur_reader = std::move(iceberg_reader);
@@ -1383,10 +1339,6 @@ Status FileScanner::_init_orc_reader(FileMetaCache* 
file_meta_cache_ptr,
         auto paimon_reader = PaimonOrcReader::create_unique(
                 _profile, _state, *_params, range, 
_state->query_options().batch_size,
                 _state->timezone(), _kv_cache, _io_ctx.get(), 
file_meta_cache_ptr);
-        if (_row_id_column_iterator_pair.second != -1) {
-            RETURN_IF_ERROR(_create_row_id_column_iterator());
-            
paimon_reader->set_row_id_column_iterator(_row_id_column_iterator_pair);
-        }
         init_status = 
static_cast<GenericReader*>(paimon_reader.get())->init_reader(&octx);
 
         _cur_reader = std::move(paimon_reader);
@@ -1396,10 +1348,6 @@ Status FileScanner::_init_orc_reader(FileMetaCache* 
file_meta_cache_ptr,
         auto hudi_reader = HudiOrcReader::create_unique(
                 _profile, _state, *_params, range, 
_state->query_options().batch_size,
                 _state->timezone(), _io_ctx.get(), file_meta_cache_ptr);
-        if (_row_id_column_iterator_pair.second != -1) {
-            RETURN_IF_ERROR(_create_row_id_column_iterator());
-            
hudi_reader->set_row_id_column_iterator(_row_id_column_iterator_pair);
-        }
         init_status = 
static_cast<GenericReader*>(hudi_reader.get())->init_reader(&octx);
 
         _cur_reader = std::move(hudi_reader);
@@ -1409,10 +1357,10 @@ Status FileScanner::_init_orc_reader(FileMetaCache* 
file_meta_cache_ptr,
                 _profile, _state, *_params, range, 
_state->query_options().batch_size,
                 _state->timezone(), _io_ctx.get(), &_is_file_slot, 
file_meta_cache_ptr,
                 _state->query_options().enable_orc_lazy_mat);
-        if (_row_id_column_iterator_pair.second != -1) {
-            RETURN_IF_ERROR(_create_row_id_column_iterator());
-            
hive_reader->set_row_id_column_iterator(_row_id_column_iterator_pair);
-        }
+        hive_reader->set_create_row_id_column_iterator_func(
+                [this]() -> std::shared_ptr<segment_v2::RowIdColumnIteratorV2> 
{
+                    return _create_row_id_column_iterator();
+                });
         init_status = 
static_cast<GenericReader*>(hive_reader.get())->init_reader(&octx);
 
         _cur_reader = std::move(hive_reader);
@@ -1424,10 +1372,10 @@ Status FileScanner::_init_orc_reader(FileMetaCache* 
file_meta_cache_ptr,
                     _state->timezone(), _io_ctx.get(), file_meta_cache_ptr,
                     _state->query_options().enable_orc_lazy_mat);
         }
-        if (_row_id_column_iterator_pair.second != -1) {
-            RETURN_IF_ERROR(_create_row_id_column_iterator());
-            
orc_reader->set_row_id_column_iterator(_row_id_column_iterator_pair);
-        }
+        orc_reader->set_create_row_id_column_iterator_func(
+                [this]() -> std::shared_ptr<segment_v2::RowIdColumnIteratorV2> 
{
+                    return _create_row_id_column_iterator();
+                });
         init_status = 
static_cast<GenericReader*>(orc_reader.get())->init_reader(&octx);
         _cur_reader = std::move(orc_reader);
     } else if (_is_load) {
@@ -1683,26 +1631,6 @@ Status FileScanner::_init_expr_ctxes() {
             col_desc.category = ColumnCategory::PARTITION_KEY;
         }
 
-        if (it->second->col_name().starts_with(BeConsts::GLOBAL_ROWID_COL)) {
-            _row_id_column_iterator_pair.second = 
_default_val_row_desc->get_column_id(slot_id);
-            continue;
-        }
-
-        bool is_row_lineage_col = false;
-        if (it->second->col_name() == IcebergTableReader::ROW_LINEAGE_ROW_ID) {
-            _row_lineage_columns.row_id_column_idx = 
_default_val_row_desc->get_column_id(slot_id);
-            is_row_lineage_col = true;
-        }
-
-        if (it->second->col_name() == 
IcebergTableReader::ROW_LINEAGE_LAST_UPDATED_SEQ_NUMBER) {
-            _row_lineage_columns.last_updated_sequence_number_column_idx =
-                    _default_val_row_desc->get_column_id(slot_id);
-            is_row_lineage_col = true;
-        }
-        if (is_row_lineage_col) {
-            col_desc.category = ColumnCategory::SYNTHESIZED;
-        }
-
         // Derive is_file_slot from category
         bool is_file_slot = (col_desc.category == ColumnCategory::REGULAR ||
                              col_desc.category == ColumnCategory::GENERATED);
diff --git a/be/src/exec/scan/file_scanner.h b/be/src/exec/scan/file_scanner.h
index 865872f1166..f3362380599 100644
--- a/be/src/exec/scan/file_scanner.h
+++ b/be/src/exec/scan/file_scanner.h
@@ -232,10 +232,6 @@ private:
     // otherwise, point to _output_tuple_desc
     const TupleDescriptor* _real_tuple_desc = nullptr;
 
-    std::pair<std::shared_ptr<RowIdColumnIteratorV2>, int> 
_row_id_column_iterator_pair = {nullptr,
-                                                                               
            -1};
-    // for iceberg row lineage
-    RowLineageColumns _row_lineage_columns;
     int64_t _last_bytes_read_from_local = 0;
     int64_t _last_bytes_read_from_remote = 0;
 
@@ -282,7 +278,7 @@ private:
                             std::unique_ptr<OrcReader> orc_reader = nullptr);
     Status _init_parquet_reader(FileMetaCache* file_meta_cache_ptr,
                                 std::unique_ptr<ParquetReader> parquet_reader 
= nullptr);
-    Status _create_row_id_column_iterator();
+    std::shared_ptr<segment_v2::RowIdColumnIteratorV2> 
_create_row_id_column_iterator();
 
     TFileFormatType::type _get_current_format_type() {
         // for compatibility, if format_type is not set in range, use the 
format type of params
diff --git a/be/src/format/orc/vorc_reader.cpp 
b/be/src/format/orc/vorc_reader.cpp
index c23f27bf495..1ecc88e22c7 100644
--- a/be/src/format/orc/vorc_reader.cpp
+++ b/be/src/format/orc/vorc_reader.cpp
@@ -467,20 +467,6 @@ Status OrcReader::_do_init_reader(ReaderInitContext* 
base_ctx) {
     // and standalone path (_fill_missing_cols empty, _table_info_node_ptr may 
be null).
     _init_file_column_mapping();
 
-    // Register row-position-based synthesized column handler.
-    // _row_id_column_iterator_pair and _row_lineage_columns are set before 
init_reader
-    // by FileScanner. This must be outside has_column_descs() guard because 
standalone
-    // readers (e.g., orc_read_lines tests) also use row_id columns.
-    if (_row_id_column_iterator_pair.first != nullptr ||
-        (_row_lineage_columns != nullptr &&
-         (_row_lineage_columns->need_row_ids() ||
-          _row_lineage_columns->has_last_updated_sequence_number_column()))) {
-        register_synthesized_column_handler(
-                BeConsts::ROWID_COL, [this](Block* block, size_t rows) -> 
Status {
-                    return _fill_row_id_columns(block, 
_row_reader->getRowNumber());
-                });
-    }
-
     // ---- Inlined set_fill_columns logic (partition/missing/synthesized 
classification) ----
 
     // 1. Collect predicate columns from conjuncts for lazy materialization
@@ -542,6 +528,16 @@ Status OrcReader::on_before_init_reader(ReaderInitContext* 
ctx) {
         if (desc.category == ColumnCategory::REGULAR ||
             desc.category == ColumnCategory::GENERATED) {
             ctx->column_names.push_back(desc.name);
+        } else if (desc.category == ColumnCategory::SYNTHESIZED &&
+                   desc.name.starts_with(BeConsts::GLOBAL_ROWID_COL)) {
+            auto topn_row_id_column_iter = 
_create_topn_row_id_column_iterator();
+            this->register_synthesized_column_handler(
+                    desc.name,
+                    [iter = std::move(topn_row_id_column_iter), this, &desc](
+                            Block* block, size_t rows) -> Status {
+                        return fill_topn_row_id(iter, desc.name, block, rows);
+                    });
+            continue;
         }
     }
 
@@ -905,6 +901,10 @@ bool OrcReader::_check_slot_can_push_down(const VExprSPtr& 
expr) {
         return false;
     }
 
+    if (disable_column_opt(slot_ref->expr_name())) {
+        return false;
+    }
+
     // Directly use _get_orc_predicate_type since we only need the type
     auto [valid, predicate_type] = _get_orc_predicate_type(slot_ref);
     if (valid) {
@@ -1273,17 +1273,6 @@ void OrcReader::_classify_columns_for_lazy_read(
                 TransactionalHive::READ_ROW_COLUMN_NAMES.end());
     }
 
-    auto check_iceberg_row_lineage_column_idx = [&](const auto& col_name) -> 
int {
-        if (_row_lineage_columns != nullptr) {
-            if (col_name == IcebergTableReader::ROW_LINEAGE_ROW_ID) {
-                return _row_lineage_columns->row_id_column_idx;
-            } else if (col_name == 
IcebergTableReader::ROW_LINEAGE_LAST_UPDATED_SEQ_NUMBER) {
-                return 
_row_lineage_columns->last_updated_sequence_number_column_idx;
-            }
-        }
-        return -1;
-    };
-
     for (auto& read_table_col : _read_table_cols) {
         _lazy_read_ctx.all_read_columns.emplace_back(read_table_col);
         if (!predicate_table_columns.empty()) {
@@ -1301,7 +1290,7 @@ void OrcReader::_classify_columns_for_lazy_read(
                 
_lazy_read_ctx.predicate_columns.second.emplace_back(iter->second.second);
                 _lazy_read_ctx.predicate_orc_columns.emplace_back(
                         
_table_info_node_ptr->children_file_column_name(iter->first));
-                if (check_iceberg_row_lineage_column_idx(read_table_col) != 
-1) {
+                if (disable_column_opt(read_table_col)) {
                     // Todo : enable lazy mat where filter iceberg row lineage 
column.
                     _enable_lazy_mat = false;
                 }
@@ -1331,7 +1320,7 @@ void OrcReader::_classify_columns_for_lazy_read(
                 }
             }
             _lazy_read_ctx.predicate_missing_columns.emplace(kv.first, 
kv.second);
-            if (check_iceberg_row_lineage_column_idx(kv.first) != -1) {
+            if (disable_column_opt(kv.first)) {
                 _enable_lazy_mat = false;
             }
         }
@@ -1481,52 +1470,6 @@ Status OrcReader::_init_orc_row_reader() {
     return Status::OK();
 }
 
-Status OrcReader::_fill_row_id_columns(Block* block, int64_t start_row) {
-    size_t fill_size = _batch->numElements;
-    if (_row_id_column_iterator_pair.first != nullptr) {
-        
RETURN_IF_ERROR(_row_id_column_iterator_pair.first->seek_to_ordinal(start_row));
-        auto col = block->get_by_position(_row_id_column_iterator_pair.second)
-                           .column->assume_mutable();
-        
RETURN_IF_ERROR(_row_id_column_iterator_pair.first->next_batch(&fill_size, 
col));
-    }
-
-    if (_row_lineage_columns != nullptr && 
_row_lineage_columns->need_row_ids() &&
-        _row_lineage_columns->first_row_id >= 0) {
-        auto col = 
block->get_by_position(_row_lineage_columns->row_id_column_idx)
-                           .column->assume_mutable();
-        auto* nullable_column = assert_cast<ColumnNullable*>(col.get());
-        auto& null_map = nullable_column->get_null_map_data();
-        auto& data =
-                
assert_cast<ColumnInt64&>(*nullable_column->get_nested_column_ptr()).get_data();
-        for (size_t i = 0; i < fill_size; ++i) {
-            if (null_map[i] != 0) {
-                null_map[i] = 0;
-                data[i] = _row_lineage_columns->first_row_id + start_row + 
static_cast<int64_t>(i);
-            }
-        }
-    }
-
-    if (_row_lineage_columns != nullptr &&
-        _row_lineage_columns->has_last_updated_sequence_number_column() &&
-        _row_lineage_columns->last_updated_sequence_number >= 0) {
-        auto col = block->get_by_position(
-                                
_row_lineage_columns->last_updated_sequence_number_column_idx)
-                           .column->assume_mutable();
-        auto* nullable_column = assert_cast<ColumnNullable*>(col.get());
-        auto& null_map = nullable_column->get_null_map_data();
-        auto& data =
-                
assert_cast<ColumnInt64&>(*nullable_column->get_nested_column_ptr()).get_data();
-        for (size_t i = 0; i < fill_size; ++i) {
-            if (null_map[i] != 0) {
-                null_map[i] = 0;
-                data[i] = _row_lineage_columns->last_updated_sequence_number;
-            }
-        }
-    }
-
-    return Status::OK();
-}
-
 void OrcReader::_init_system_properties() {
     if (_scan_range.__isset.file_type) {
         // for compatibility
@@ -2455,9 +2398,8 @@ Status OrcReader::_get_next_block_impl(Block* block, 
size_t* read_rows, bool* eo
             _current_batch_row_positions[i] =
                     static_cast<rowid_t>(start_row + static_cast<int64_t>(i));
         }
-        if (has_synthesized_column_handlers()) {
-            RETURN_IF_ERROR(fill_synthesized_columns(block, block->rows()));
-        }
+        RETURN_IF_ERROR(fill_synthesized_columns(block, block->rows()));
+        RETURN_IF_ERROR(fill_generated_columns(block, block->rows()));
 
         if (block->rows() == 0) {
             RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block, nullptr));
@@ -2606,9 +2548,8 @@ Status OrcReader::_get_next_block_impl(Block* block, 
size_t* read_rows, bool* eo
             _current_batch_row_positions[i] =
                     static_cast<rowid_t>(start_row + static_cast<int64_t>(i));
         }
-        if (has_synthesized_column_handlers()) {
-            RETURN_IF_ERROR(fill_synthesized_columns(block, block->rows()));
-        }
+        RETURN_IF_ERROR(fill_synthesized_columns(block, block->rows()));
+        RETURN_IF_ERROR(fill_generated_columns(block, block->rows()));
 
         if (block->rows() == 0) {
             RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block, nullptr));
@@ -2932,7 +2873,8 @@ Status OrcReader::fill_dict_filter_column_names(
     int i = 0;
     for (const auto& predicate_col_name : predicate_col_names) {
         int slot_id = predicate_col_slot_ids[i];
-        if (!_disable_dict_filter && _can_filter_by_dict(slot_id)) {
+        if (!_disable_dict_filter && !disable_column_opt(predicate_col_name) &&
+            _can_filter_by_dict(slot_id)) {
             _dict_filter_cols.emplace_back(predicate_col_name, slot_id);
             column_names.emplace_back(
                     
_table_info_node_ptr->children_file_column_name(predicate_col_name));
diff --git a/be/src/format/orc/vorc_reader.h b/be/src/format/orc/vorc_reader.h
index d4b38e15d17..44fd0e8df52 100644
--- a/be/src/format/orc/vorc_reader.h
+++ b/be/src/format/orc/vorc_reader.h
@@ -212,13 +212,25 @@ public:
 
     static std::string get_field_name_lower_case(const orc::Type* orc_type, 
int pos);
 
-    void set_row_id_column_iterator(
-            const 
std::pair<std::shared_ptr<segment_v2::RowIdColumnIteratorV2>, int>&
-                    iterator_pair) {
-        _row_id_column_iterator_pair = iterator_pair;
+    void set_create_row_id_column_iterator_func(
+            
std::function<std::shared_ptr<segment_v2::RowIdColumnIteratorV2>()> 
create_func) {
+        _create_topn_row_id_column_iterator = create_func;
     }
-    void set_row_lineage_columns(std::shared_ptr<RowLineageColumns> 
row_lineage_columns) {
-        _row_lineage_columns = std::move(row_lineage_columns);
+
+    Status fill_topn_row_id(
+            std::shared_ptr<segment_v2::RowIdColumnIteratorV2> 
_row_id_column_iterator,
+            std::string col_name, Block* block, size_t rows) {
+        int col_pos = block->get_position_by_name(col_name);
+        DCHECK(col_pos >= 0);
+        if (col_pos < 0) {
+            return Status::InternalError("Column {} not found in block", 
col_name);
+        }
+        auto col = block->get_by_position(col_pos).column->assume_mutable();
+        const auto& row_ids = this->current_batch_row_positions();
+        RETURN_IF_ERROR(
+                _row_id_column_iterator->read_by_rowids(row_ids.data(), 
row_ids.size(), col));
+
+        return Status::OK();
     }
 
     static bool inline is_hive1_col_name(const orc::Type* orc_type_ptr) {
@@ -687,8 +699,6 @@ private:
         return true;
     }
 
-    Status _fill_row_id_columns(Block* block, int64_t start_row);
-
     bool _seek_to_read_one_line() {
         if (_read_by_rows) {
             if (_row_ids.empty()) {
@@ -795,6 +805,9 @@ protected:
     // in on_before_init_reader.
     LazyReadContext _lazy_read_ctx;
 
+    std::function<std::shared_ptr<segment_v2::RowIdColumnIteratorV2>()>
+            _create_topn_row_id_column_iterator;
+
 private:
     std::unique_ptr<IColumn::Filter> _filter;
     const AcidRowIDSet* _delete_rows = nullptr;
@@ -826,10 +839,6 @@ private:
     int64_t _orc_once_max_read_bytes = 8L * 1024L * 1024L;
     int64_t _orc_max_merge_distance_bytes = 1L * 1024L * 1024L;
 
-    std::pair<std::shared_ptr<segment_v2::RowIdColumnIteratorV2>, int>
-            _row_id_column_iterator_pair = {nullptr, -1};
-    std::shared_ptr<RowLineageColumns> _row_lineage_columns;
-
     std::vector<rowid_t> _current_batch_row_positions;
 
     // Through this node, you can find the file column based on the table 
column.
diff --git a/be/src/format/parquet/vparquet_group_reader.cpp 
b/be/src/format/parquet/vparquet_group_reader.cpp
index 0efeea70c10..a5b89207a24 100644
--- a/be/src/format/parquet/vparquet_group_reader.cpp
+++ b/be/src/format/parquet/vparquet_group_reader.cpp
@@ -157,8 +157,8 @@ Status RowGroupReader::init(
         for (size_t i = 0; i < predicate_col_names.size(); ++i) {
             const std::string& predicate_col_name = predicate_col_names[i];
             int slot_id = predicate_col_slot_ids[i];
-            if (predicate_col_name == IcebergTableReader::ROW_LINEAGE_ROW_ID ||
-                predicate_col_name == 
IcebergTableReader::ROW_LINEAGE_LAST_UPDATED_SEQ_NUMBER) {
+
+            if (_table_format_reader->disable_column_opt(predicate_col_name)) {
                 // row lineage column can not dict filter.
                 if (_slot_id_to_filter_conjuncts->find(slot_id) !=
                     _slot_id_to_filter_conjuncts->end()) {
@@ -334,7 +334,7 @@ Status RowGroupReader::next_batch(Block* block, size_t 
batch_size, size_t* read_
             RETURN_IF_ERROR(_get_current_batch_row_id(*read_rows));
         }
         RETURN_IF_ERROR(_table_format_reader->fill_synthesized_columns(block, 
*read_rows));
-
+        RETURN_IF_ERROR(_table_format_reader->fill_generated_columns(block, 
*read_rows));
         Status st = VExprContext::filter_block(_lazy_read_ctx.conjuncts, 
block, block->columns());
         *read_rows = block->rows();
         return st;
@@ -353,10 +353,12 @@ Status RowGroupReader::next_batch(Block* block, size_t 
batch_size, size_t* read_
         RETURN_IF_ERROR(_table_format_reader->on_fill_missing_columns(
                 block, *read_rows, _lazy_read_ctx.missing_col_names));
 
-        if (_table_format_reader->has_synthesized_column_handlers()) {
+        if (_table_format_reader->has_synthesized_column_handlers() ||
+            _table_format_reader->has_generated_column_handlers()) {
             RETURN_IF_ERROR(_get_current_batch_row_id(*read_rows));
         }
         RETURN_IF_ERROR(_table_format_reader->fill_synthesized_columns(block, 
*read_rows));
+        RETURN_IF_ERROR(_table_format_reader->fill_generated_columns(block, 
*read_rows));
 
 #ifndef NDEBUG
         for (auto col : *block) {
@@ -635,11 +637,12 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t 
batch_size, size_t* re
                 block, pre_read_rows, 
_lazy_read_ctx.predicate_partition_col_names));
         RETURN_IF_ERROR(_table_format_reader->on_fill_missing_columns(
                 block, pre_read_rows, 
_lazy_read_ctx.predicate_missing_col_names));
-        if (_table_format_reader->has_synthesized_column_handlers()) {
+        if (_table_format_reader->has_synthesized_column_handlers() ||
+            _table_format_reader->has_generated_column_handlers()) {
             RETURN_IF_ERROR(_get_current_batch_row_id(pre_read_rows));
         }
         RETURN_IF_ERROR(_table_format_reader->fill_synthesized_columns(block, 
pre_read_rows));
-
+        RETURN_IF_ERROR(_table_format_reader->fill_generated_columns(block, 
pre_read_rows));
         RETURN_IF_ERROR(_build_pos_delete_filter(pre_read_rows));
 
 #ifndef NDEBUG
@@ -655,13 +658,6 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t 
batch_size, size_t* re
 #endif
 
         bool can_filter_all = false;
-        bool resize_first_column = _lazy_read_ctx.resize_first_column;
-        if (resize_first_column && 
_table_format_reader->has_synthesized_column_handlers()) {
-            int row_id_idx = 
block->get_position_by_name(doris::BeConsts::ICEBERG_ROWID_COL);
-            if (row_id_idx == 0) {
-                resize_first_column = false;
-            }
-        }
         {
             SCOPED_RAW_TIMER(&_predicate_filter_time);
 
@@ -720,20 +716,8 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t 
batch_size, size_t* re
                             .column->assume_mutable()
                             ->clear();
                 }
-                if (_row_id_column_iterator_pair.first != nullptr) {
-                    block->get_by_position(_row_id_column_iterator_pair.second)
-                            .column->assume_mutable()
-                            ->clear();
-                }
-                if (_table_format_reader->has_synthesized_column_handlers()) {
-                    int row_id_idx =
-                            
block->get_position_by_name(doris::BeConsts::ICEBERG_ROWID_COL);
-                    if (row_id_idx >= 0) {
-                        block->get_by_position(static_cast<size_t>(row_id_idx))
-                                .column->assume_mutable()
-                                ->clear();
-                    }
-                }
+                
RETURN_IF_ERROR(_table_format_reader->clear_synthesized_columns(block));
+                
RETURN_IF_ERROR(_table_format_reader->clear_generated_columns(block));
                 Block::erase_useless_column(block, origin_column_num);
             }
 
@@ -792,17 +776,8 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t 
batch_size, size_t* re
     {
         SCOPED_RAW_TIMER(&_predicate_filter_time);
         if (filter_map.has_filter()) {
-            std::vector<uint32_t> predicate_columns = 
_lazy_read_ctx.all_predicate_col_ids;
-            if (_table_format_reader->has_synthesized_column_handlers()) {
-                int row_id_idx = 
block->get_position_by_name(doris::BeConsts::ICEBERG_ROWID_COL);
-                if (row_id_idx >= 0 &&
-                    std::find(predicate_columns.begin(), 
predicate_columns.end(),
-                              static_cast<uint32_t>(row_id_idx)) == 
predicate_columns.end()) {
-                    
predicate_columns.push_back(static_cast<uint32_t>(row_id_idx));
-                }
-            }
-            RETURN_IF_CATCH_EXCEPTION(
-                    Block::filter_block_internal(block, predicate_columns, 
result_filter));
+            RETURN_IF_CATCH_EXCEPTION(Block::filter_block_internal(
+                    block, _lazy_read_ctx.all_predicate_col_ids, 
result_filter));
             Block::erase_useless_column(block, origin_column_num);
 
         } else {
@@ -899,8 +874,8 @@ Status RowGroupReader::_read_empty_batch(size_t batch_size, 
size_t* read_rows, b
         _position_delete_ctx.current_row_id = end_row_id;
         *batch_eof = _position_delete_ctx.current_row_id == 
_position_delete_ctx.last_row_id;
 
-        if (_row_id_column_iterator_pair.first != nullptr ||
-            _table_format_reader->has_synthesized_column_handlers()) {
+        if (_table_format_reader->has_synthesized_column_handlers() ||
+            _table_format_reader->has_generated_column_handlers()) {
             *modify_row_ids = true;
             _current_batch_row_ids.clear();
             _current_batch_row_ids.resize(*read_rows);
@@ -924,7 +899,8 @@ Status RowGroupReader::_read_empty_batch(size_t batch_size, 
size_t* read_rows, b
             _remaining_rows = 0;
             *batch_eof = true;
         }
-        if (_table_format_reader->has_synthesized_column_handlers()) {
+        if (_table_format_reader->has_synthesized_column_handlers() ||
+            _table_format_reader->has_generated_column_handlers()) {
             *modify_row_ids = true;
             RETURN_IF_ERROR(_get_current_batch_row_id(*read_rows));
         }
@@ -961,53 +937,6 @@ Status RowGroupReader::_get_current_batch_row_id(size_t 
read_rows) {
     return Status::OK();
 }
 
-Status RowGroupReader::fill_topn_row_id(Block* block, size_t read_rows) {
-    if (_row_id_column_iterator_pair.first != nullptr) {
-        // _get_current_batch_row_id must be called before 
fill_synthesized_columns
-        auto col = block->get_by_position(_row_id_column_iterator_pair.second)
-                           .column->assume_mutable();
-        RETURN_IF_ERROR(_row_id_column_iterator_pair.first->read_by_rowids(
-                _current_batch_row_ids.data(), _current_batch_row_ids.size(), 
col));
-    }
-
-    if (_row_lineage_columns != nullptr && 
_row_lineage_columns->need_row_ids() &&
-        _row_lineage_columns->first_row_id >= 0) {
-        auto col = 
block->get_by_position(_row_lineage_columns->row_id_column_idx)
-                           .column->assume_mutable();
-        auto* nullable_column = assert_cast<ColumnNullable*>(col.get());
-        auto& null_map = nullable_column->get_null_map_data();
-        auto& data =
-                
assert_cast<ColumnInt64&>(*nullable_column->get_nested_column_ptr()).get_data();
-        for (size_t i = 0; i < read_rows; ++i) {
-            if (null_map[i] != 0) {
-                null_map[i] = 0;
-                data[i] = _row_lineage_columns->first_row_id +
-                          static_cast<int64_t>(_current_batch_row_ids[i]);
-            }
-        }
-    }
-
-    if (_row_lineage_columns != nullptr &&
-        _row_lineage_columns->has_last_updated_sequence_number_column() &&
-        _row_lineage_columns->last_updated_sequence_number >= 0) {
-        auto col = block->get_by_position(
-                                
_row_lineage_columns->last_updated_sequence_number_column_idx)
-                           .column->assume_mutable();
-        auto* nullable_column = assert_cast<ColumnNullable*>(col.get());
-        auto& null_map = nullable_column->get_null_map_data();
-        auto& data =
-                
assert_cast<ColumnInt64&>(*nullable_column->get_nested_column_ptr()).get_data();
-        for (size_t i = 0; i < read_rows; ++i) {
-            if (null_map[i] != 0) {
-                null_map[i] = 0;
-                data[i] = _row_lineage_columns->last_updated_sequence_number;
-            }
-        }
-    }
-
-    return Status::OK();
-}
-
 Status RowGroupReader::_build_pos_delete_filter(size_t read_rows) {
     if (!_position_delete_ctx.has_filter) {
         _pos_delete_filter_ptr.reset(nullptr);
diff --git a/be/src/format/parquet/vparquet_group_reader.h 
b/be/src/format/parquet/vparquet_group_reader.h
index dfb593f1ae7..edc6cb39117 100644
--- a/be/src/format/parquet/vparquet_group_reader.h
+++ b/be/src/format/parquet/vparquet_group_reader.h
@@ -187,7 +187,6 @@ public:
 
     ParquetColumnReader::ColumnStatistics merged_column_statistics();
     void set_remaining_rows(int64_t rows) { _remaining_rows = rows; }
-    Status fill_topn_row_id(Block* block, size_t read_rows);
 
     int64_t get_remaining_rows() { return _remaining_rows; }
 
@@ -197,12 +196,6 @@ public:
                                             const std::vector<bool>& cache, 
int64_t first_row,
                                             int64_t base_granule = 0);
 
-    void set_row_id_column_iterator(
-            const 
std::pair<std::shared_ptr<segment_v2::RowIdColumnIteratorV2>, int>&
-                    iterator_pair) {
-        _row_id_column_iterator_pair = iterator_pair;
-    }
-
     void set_table_format_reader(TableFormatReader* reader) { 
_table_format_reader = reader; }
 
     // RowPositionProvider interface
@@ -304,8 +297,6 @@ private:
     bool _is_row_group_filtered = false;
 
     RowGroupIndex _current_row_group_idx {0, 0, 0};
-    std::pair<std::shared_ptr<segment_v2::RowIdColumnIteratorV2>, int>
-            _row_id_column_iterator_pair = {nullptr, -1};
     std::vector<rowid_t> _current_batch_row_ids;
 
     std::unordered_map<std::string, uint32_t>* _col_name_to_block_idx = 
nullptr;
diff --git a/be/src/format/parquet/vparquet_reader.cpp 
b/be/src/format/parquet/vparquet_reader.cpp
index 12832d06388..39b6c4ec5e0 100644
--- a/be/src/format/parquet/vparquet_reader.cpp
+++ b/be/src/format/parquet/vparquet_reader.cpp
@@ -403,6 +403,16 @@ Status 
ParquetReader::on_before_init_reader(ReaderInitContext* ctx) {
         if (desc.category == ColumnCategory::REGULAR ||
             desc.category == ColumnCategory::GENERATED) {
             ctx->column_names.push_back(desc.name);
+        } else if (desc.category == ColumnCategory::SYNTHESIZED &&
+                   desc.name.starts_with(BeConsts::GLOBAL_ROWID_COL)) {
+            auto topn_row_id_column_iter = 
_create_topn_row_id_column_iterator();
+            this->register_synthesized_column_handler(
+                    desc.name,
+                    [iter = std::move(topn_row_id_column_iter), this, &desc](
+                            Block* block, size_t rows) -> Status {
+                        return fill_topn_row_id(iter, desc.name, block, rows);
+                    });
+            continue;
         }
     }
 
@@ -473,23 +483,6 @@ Status ParquetReader::_do_init_reader(ReaderInitContext* 
base_ctx) {
     // and standalone path (_fill_missing_cols empty, _table_info_node_ptr may 
be null).
     _init_read_columns(base_ctx->column_names);
 
-    // Register row-position-based synthesized column handler.
-    // _row_id_column_iterator_pair and _row_lineage_columns are set before 
init_reader
-    // by FileScanner. This must be outside has_column_descs() guard because 
standalone
-    // readers also need synthesized column handlers.
-    if (_row_id_column_iterator_pair.first != nullptr ||
-        (_row_lineage_columns != nullptr &&
-         (_row_lineage_columns->need_row_ids() ||
-          _row_lineage_columns->has_last_updated_sequence_number_column()))) {
-        register_synthesized_column_handler(
-                BeConsts::ROWID_COL, [this](Block* block, size_t rows) -> 
Status {
-                    if (_current_group_reader) {
-                        return _current_group_reader->fill_topn_row_id(block, 
rows);
-                    }
-                    return Status::OK();
-                });
-    }
-
     // build column predicates for column lazy read
     if (ctx->conjuncts != nullptr) {
         _lazy_read_ctx.conjuncts = *ctx->conjuncts;
@@ -610,6 +603,9 @@ void 
ParquetReader::_collect_predicate_columns_from_conjuncts(
         auto and_pred = AndBlockColumnPredicate::create_unique();
         for (const auto& entry : _lazy_read_ctx.slot_id_to_predicates) {
             for (const auto& pred : entry.second) {
+                if (disable_column_opt(pred->col_name())) {
+                    continue;
+                }
                 if (!_exists_in_file(pred->col_name()) || 
!_type_matches(pred->column_id())) {
                     continue;
                 }
@@ -624,23 +620,47 @@ void 
ParquetReader::_collect_predicate_columns_from_conjuncts(
 }
 
 void ParquetReader::_classify_columns_for_lazy_read(
-        const std::unordered_map<std::string, std::pair<uint32_t, int>>& 
predicate_columns,
+        const std::unordered_map<std::string, std::pair<uint32_t, int>>&
+                predicate_conjuncts_columns,
         const std::unordered_map<std::string, std::tuple<std::string, const 
SlotDescriptor*>>&
                 partition_columns,
         const std::unordered_map<std::string, VExprContextSPtr>& 
missing_columns) {
     const FieldDescriptor& schema = _file_metadata->schema();
+    auto predicate_columns = predicate_conjuncts_columns;
 
-    auto check_iceberg_row_lineage_column_idx = [&](const auto& col_name) -> 
int {
-        if (_row_lineage_columns != nullptr) {
-            if (col_name == IcebergTableReader::ROW_LINEAGE_ROW_ID) {
-                return _row_lineage_columns->row_id_column_idx;
-            } else if (col_name == 
IcebergTableReader::ROW_LINEAGE_LAST_UPDATED_SEQ_NUMBER) {
-                return 
_row_lineage_columns->last_updated_sequence_number_column_idx;
+    for (const auto& [col_name, _] : _generated_col_handlers) {
+        int slot_id = -1;
+        for (auto slot : _tuple_descriptor->slots()) {
+            if (slot->col_name() == col_name) {
+                slot_id = slot->id();
+                break;
             }
         }
-        return -1;
-    };
+        DCHECK(slot_id != -1) << "slot id should not be -1 for generated 
column: " << col_name;
+        auto column_index = _row_descriptor->get_column_id(slot_id);
+        if (column_index == 0) {
+            _lazy_read_ctx.resize_first_column = false;
+        }
+        // assume generated columns are only used for predicate push down.
+        predicate_columns.emplace(col_name, std::make_pair(column_index, 
slot_id));
+    }
 
+    for (const auto& [col_name, _] : _synthesized_col_handlers) {
+        int slot_id = -1;
+        for (auto slot : _tuple_descriptor->slots()) {
+            if (slot->col_name() == col_name) {
+                slot_id = slot->id();
+                break;
+            }
+        }
+        DCHECK(slot_id != -1) << "slot id should not be -1 for synthesized 
column: " << col_name;
+        auto column_index = _row_descriptor->get_column_id(slot_id);
+        if (column_index == 0) {
+            _lazy_read_ctx.resize_first_column = false;
+        }
+        // synthesized columns always fill data on first phase.
+        _lazy_read_ctx.all_predicate_col_ids.emplace_back(column_index);
+    }
     for (auto& read_table_col : _read_table_columns) {
         _lazy_read_ctx.all_read_columns.emplace_back(read_table_col);
 
@@ -653,21 +673,7 @@ void ParquetReader::_classify_columns_for_lazy_read(
         if (predicate_columns.size() > 0) {
             auto iter = predicate_columns.find(read_table_col);
             if (iter == predicate_columns.end()) {
-                if (auto row_lineage_idx = 
check_iceberg_row_lineage_column_idx(read_table_col);
-                    row_lineage_idx != -1) {
-                    
_lazy_read_ctx.predicate_columns.first.emplace_back(read_table_col);
-                    // row lineage column can not dict filter.
-                    int slot_id = 0;
-                    for (auto slot : _tuple_descriptor->slots()) {
-                        if (slot->col_name_lower_case() == read_table_col) {
-                            slot_id = slot->id();
-                        }
-                    }
-                    
_lazy_read_ctx.predicate_columns.second.emplace_back(slot_id);
-                    
_lazy_read_ctx.all_predicate_col_ids.emplace_back(row_lineage_idx);
-                } else {
-                    
_lazy_read_ctx.lazy_read_columns.emplace_back(read_table_col);
-                }
+                _lazy_read_ctx.lazy_read_columns.emplace_back(read_table_col);
             } else {
                 
_lazy_read_ctx.predicate_columns.first.emplace_back(iter->first);
                 
_lazy_read_ctx.predicate_columns.second.emplace_back(iter->second.second);
@@ -675,9 +681,6 @@ void ParquetReader::_classify_columns_for_lazy_read(
             }
         }
     }
-    if (_row_id_column_iterator_pair.first != nullptr) {
-        
_lazy_read_ctx.all_predicate_col_ids.emplace_back(_row_id_column_iterator_pair.second);
-    }
 
     for (auto& kv : partition_columns) {
         auto iter = predicate_columns.find(kv.first);
@@ -701,10 +704,6 @@ void ParquetReader::_classify_columns_for_lazy_read(
             }
             _lazy_read_ctx.predicate_missing_columns.emplace(kv.first, 
kv.second);
             
_lazy_read_ctx.all_predicate_col_ids.emplace_back(iter->second.first);
-        } else if (auto row_lineage_idx = 
check_iceberg_row_lineage_column_idx(kv.first);
-                   row_lineage_idx != -1) {
-            _lazy_read_ctx.predicate_missing_columns.emplace(kv.first, 
kv.second);
-            _lazy_read_ctx.all_predicate_col_ids.emplace_back(row_lineage_idx);
         } else {
             _lazy_read_ctx.missing_columns.emplace(kv.first, kv.second);
         }
@@ -949,8 +948,6 @@ Status ParquetReader::_next_row_group_reader() {
     _row_group_eof = false;
 
     _current_group_reader->set_current_row_group_idx(_current_row_group_index);
-    
_current_group_reader->set_row_id_column_iterator(_row_id_column_iterator_pair);
-    _current_group_reader->set_row_lineage_columns(_row_lineage_columns);
     _current_group_reader->set_col_name_to_block_idx(_col_name_to_block_idx);
     if (_condition_cache_ctx) {
         
_current_group_reader->set_condition_cache_context(_condition_cache_ctx);
diff --git a/be/src/format/parquet/vparquet_reader.h 
b/be/src/format/parquet/vparquet_reader.h
index 676e76b34a1..f34c846cf00 100644
--- a/be/src/format/parquet/vparquet_reader.h
+++ b/be/src/format/parquet/vparquet_reader.h
@@ -173,9 +173,9 @@ public:
 
     Status get_file_metadata_schema(const FieldDescriptor** ptr);
 
-    void set_row_id_column_iterator(
-            std::pair<std::shared_ptr<RowIdColumnIteratorV2>, int> 
iterator_pair) {
-        _row_id_column_iterator_pair = iterator_pair;
+    void set_create_row_id_column_iterator_func(
+            
std::function<std::shared_ptr<segment_v2::RowIdColumnIteratorV2>()> 
create_func) {
+        _create_topn_row_id_column_iterator = create_func;
     }
 
     /// Access current batch row positions (delegates to RowGroupReader).
@@ -184,8 +184,20 @@ public:
         return _current_group_reader->current_batch_row_positions();
     }
 
-    void set_row_lineage_columns(std::shared_ptr<RowLineageColumns> 
row_lineage_columns) {
-        _row_lineage_columns = std::move(row_lineage_columns);
+    Status fill_topn_row_id(
+            std::shared_ptr<segment_v2::RowIdColumnIteratorV2> 
_row_id_column_iterator,
+            std::string col_name, Block* block, size_t rows) {
+        int col_pos = block->get_position_by_name(col_name);
+        DCHECK(col_pos >= 0);
+        if (col_pos < 0) {
+            return Status::InternalError("Column {} not found in block", 
col_name);
+        }
+        auto col = block->get_by_position(col_pos).column->assume_mutable();
+        const auto& row_ids = this->current_batch_row_positions();
+        RETURN_IF_ERROR(
+                _row_id_column_iterator->read_by_rowids(row_ids.data(), 
row_ids.size(), col));
+
+        return Status::OK();
     }
 
     bool count_read_rows() override { return true; }
@@ -414,10 +426,6 @@ private:
     const std::unordered_map<int, VExprContextSPtrs>* 
_slot_id_to_filter_conjuncts = nullptr;
     std::unordered_map<tparquet::Type::type, bool> _ignored_stats;
 
-    std::pair<std::shared_ptr<RowIdColumnIteratorV2>, int> 
_row_id_column_iterator_pair = {nullptr,
-                                                                               
            -1};
-    std::shared_ptr<RowLineageColumns> _row_lineage_columns;
-
 protected:
     // Used for column lazy read. Protected so Iceberg/Paimon subclasses can
     // register synthesized columns in on_before_init_reader.
@@ -425,6 +433,9 @@ protected:
     bool _filter_groups = true;
     size_t get_batch_size() const { return _batch_size; }
 
+    std::function<std::shared_ptr<segment_v2::RowIdColumnIteratorV2>()>
+            _create_topn_row_id_column_iterator;
+
 private:
     std::set<uint64_t> _column_ids;
     std::set<uint64_t> _filter_column_ids;
diff --git a/be/src/format/table/hive_reader.cpp 
b/be/src/format/table/hive_reader.cpp
index fc6dfbc025c..c6ab0126e9d 100644
--- a/be/src/format/table/hive_reader.cpp
+++ b/be/src/format/table/hive_reader.cpp
@@ -37,6 +37,16 @@ Status 
HiveOrcReader::on_before_init_reader(ReaderInitContext* ctx) {
         if (desc.category == ColumnCategory::REGULAR ||
             desc.category == ColumnCategory::GENERATED) {
             ctx->column_names.push_back(desc.name);
+        } else if (desc.category == ColumnCategory::SYNTHESIZED &&
+                   desc.name.starts_with(BeConsts::GLOBAL_ROWID_COL)) {
+            auto topn_row_id_column_iter = 
_create_topn_row_id_column_iterator();
+            this->register_synthesized_column_handler(
+                    desc.name,
+                    [iter = std::move(topn_row_id_column_iter), this, &desc](
+                            Block* block, size_t rows) -> Status {
+                        return fill_topn_row_id(iter, desc.name, block, rows);
+                    });
+            continue;
         }
     }
 
@@ -214,6 +224,16 @@ Status 
HiveParquetReader::on_before_init_reader(ReaderInitContext* ctx) {
         if (desc.category == ColumnCategory::REGULAR ||
             desc.category == ColumnCategory::GENERATED) {
             ctx->column_names.push_back(desc.name);
+        } else if (desc.category == ColumnCategory::SYNTHESIZED &&
+                   desc.name.starts_with(BeConsts::GLOBAL_ROWID_COL)) {
+            auto topn_row_id_column_iter = 
_create_topn_row_id_column_iterator();
+            this->register_synthesized_column_handler(
+                    desc.name,
+                    [iter = std::move(topn_row_id_column_iter), this, &desc](
+                            Block* block, size_t rows) -> Status {
+                        return fill_topn_row_id(iter, desc.name, block, rows);
+                    });
+            continue;
         }
     }
 
diff --git a/be/src/format/table/iceberg_reader.cpp 
b/be/src/format/table/iceberg_reader.cpp
index b9afea2fb2a..680ec463103 100644
--- a/be/src/format/table/iceberg_reader.cpp
+++ b/be/src/format/table/iceberg_reader.cpp
@@ -165,16 +165,24 @@ Status 
IcebergParquetReader::on_before_init_reader(ReaderInitContext* ctx) {
     // Single pass: classify columns, detect $row_id, handle partition 
fallback.
     bool has_partition_from_path = false;
     for (auto& desc : *ctx->column_descs) {
-        if (desc.category == ColumnCategory::SYNTHESIZED &&
-            desc.name == BeConsts::ICEBERG_ROWID_COL) {
-            _need_row_id_column = true;
-            
this->register_synthesized_column_handler(BeConsts::ICEBERG_ROWID_COL,
-                                                      [this](Block* block, 
size_t rows) -> Status {
-                                                          return 
_fill_iceberg_row_id(block, rows);
-                                                      });
-            continue;
-        }
-        if (desc.category == ColumnCategory::REGULAR) {
+        if (desc.category == ColumnCategory::SYNTHESIZED) {
+            if (desc.name == BeConsts::ICEBERG_ROWID_COL) {
+                this->register_synthesized_column_handler(
+                        BeConsts::ICEBERG_ROWID_COL, [this](Block* block, 
size_t rows) -> Status {
+                            return _fill_iceberg_row_id(block, rows);
+                        });
+                continue;
+            } else if (desc.name.starts_with(BeConsts::GLOBAL_ROWID_COL)) {
+                auto topn_row_id_column_iter = 
_create_topn_row_id_column_iterator();
+                this->register_synthesized_column_handler(
+                        desc.name,
+                        [iter = std::move(topn_row_id_column_iter), this, 
&desc](
+                                Block* block, size_t rows) -> Status {
+                            return fill_topn_row_id(iter, desc.name, block, 
rows);
+                        });
+                continue;
+            }
+        } else if (desc.category == ColumnCategory::REGULAR) {
             // Partition fallback: if column is a partition key and NOT in the 
file
             // (checked via field ID matching in table_info_node), read from 
path instead.
             if (partition_col_names.contains(desc.name) &&
@@ -187,7 +195,23 @@ Status 
IcebergParquetReader::on_before_init_reader(ReaderInitContext* ctx) {
             }
             ctx->column_names.push_back(desc.name);
         } else if (desc.category == ColumnCategory::GENERATED) {
-            ctx->column_names.push_back(desc.name);
+            _init_row_lineage_columns();
+            if (desc.name == ROW_LINEAGE_ROW_ID) {
+                ctx->column_names.push_back(desc.name);
+                this->register_generated_column_handler(
+                        ROW_LINEAGE_ROW_ID, [this](Block* block, size_t rows) 
-> Status {
+                            return _fill_row_lineage_row_id(block, rows);
+                        });
+                continue;
+            } else if (desc.name == ROW_LINEAGE_LAST_UPDATED_SEQ_NUMBER) {
+                ctx->column_names.push_back(desc.name);
+                this->register_generated_column_handler(
+                        ROW_LINEAGE_LAST_UPDATED_SEQ_NUMBER,
+                        [this](Block* block, size_t rows) -> Status {
+                            return 
_fill_row_lineage_last_updated_sequence_number(block, rows);
+                        });
+                continue;
+            }
         }
     }
 
@@ -436,16 +460,24 @@ Status 
IcebergOrcReader::on_before_init_reader(ReaderInitContext* ctx) {
     // Single pass: classify columns, detect $row_id, handle partition 
fallback.
     bool has_partition_from_path = false;
     for (auto& desc : *ctx->column_descs) {
-        if (desc.category == ColumnCategory::SYNTHESIZED &&
-            desc.name == BeConsts::ICEBERG_ROWID_COL) {
-            _need_row_id_column = true;
-            
this->register_synthesized_column_handler(BeConsts::ICEBERG_ROWID_COL,
-                                                      [this](Block* block, 
size_t rows) -> Status {
-                                                          return 
_fill_iceberg_row_id(block, rows);
-                                                      });
-            continue;
-        }
-        if (desc.category == ColumnCategory::REGULAR) {
+        if (desc.category == ColumnCategory::SYNTHESIZED) {
+            if (desc.name == BeConsts::ICEBERG_ROWID_COL) {
+                this->register_synthesized_column_handler(
+                        BeConsts::ICEBERG_ROWID_COL, [this](Block* block, 
size_t rows) -> Status {
+                            return _fill_iceberg_row_id(block, rows);
+                        });
+                continue;
+            } else if (desc.name.starts_with(BeConsts::GLOBAL_ROWID_COL)) {
+                auto topn_row_id_column_iter = 
_create_topn_row_id_column_iterator();
+                this->register_synthesized_column_handler(
+                        desc.name,
+                        [iter = std::move(topn_row_id_column_iter), this, 
&desc](
+                                Block* block, size_t rows) -> Status {
+                            return fill_topn_row_id(iter, desc.name, block, 
rows);
+                        });
+                continue;
+            }
+        } else if (desc.category == ColumnCategory::REGULAR) {
             // Partition fallback: if column is a partition key and NOT in the 
file
             // (checked via field ID matching in table_info_node), read from 
path instead.
             if (partition_col_names.contains(desc.name) &&
@@ -458,7 +490,23 @@ Status 
IcebergOrcReader::on_before_init_reader(ReaderInitContext* ctx) {
             }
             ctx->column_names.push_back(desc.name);
         } else if (desc.category == ColumnCategory::GENERATED) {
-            ctx->column_names.push_back(desc.name);
+            _init_row_lineage_columns();
+            if (desc.name == ROW_LINEAGE_ROW_ID) {
+                ctx->column_names.push_back(desc.name);
+                this->register_generated_column_handler(
+                        ROW_LINEAGE_ROW_ID, [this](Block* block, size_t rows) 
-> Status {
+                            return _fill_row_lineage_row_id(block, rows);
+                        });
+                continue;
+            } else if (desc.name == ROW_LINEAGE_LAST_UPDATED_SEQ_NUMBER) {
+                ctx->column_names.push_back(desc.name);
+                this->register_generated_column_handler(
+                        ROW_LINEAGE_LAST_UPDATED_SEQ_NUMBER,
+                        [this](Block* block, size_t rows) -> Status {
+                            return 
_fill_row_lineage_last_updated_sequence_number(block, rows);
+                        });
+                continue;
+            }
         }
     }
 
diff --git a/be/src/format/table/iceberg_reader.h 
b/be/src/format/table/iceberg_reader.h
index d21c661f207..39d38097657 100644
--- a/be/src/format/table/iceberg_reader.h
+++ b/be/src/format/table/iceberg_reader.h
@@ -61,23 +61,7 @@ class GenericReader;
 class ShardedKVCache;
 class VExprContext;
 
-struct RowLineageColumns {
-    int row_id_column_idx = -1;
-    int last_updated_sequence_number_column_idx = -1;
-    int64_t first_row_id = -1;
-    int64_t last_updated_sequence_number = -1;
-
-    bool need_row_ids() const { return row_id_column_idx >= 0; }
-    bool has_last_updated_sequence_number_column() const {
-        return last_updated_sequence_number_column_idx >= 0;
-    }
-};
-
 struct IcebergTableReader {
-    static constexpr const char* ROW_LINEAGE_ROW_ID = "_row_id";
-    static constexpr const char* ROW_LINEAGE_LAST_UPDATED_SEQ_NUMBER =
-            "_last_updated_sequence_number";
-
     static bool _is_fully_dictionary_encoded(const tparquet::ColumnMetaData& 
column_metadata);
 };
 
diff --git a/be/src/format/table/iceberg_reader_mixin.h 
b/be/src/format/table/iceberg_reader_mixin.h
index 57d3ec36ebf..a4855501612 100644
--- a/be/src/format/table/iceberg_reader_mixin.h
+++ b/be/src/format/table/iceberg_reader_mixin.h
@@ -104,6 +104,11 @@ public:
         return BaseReader::_do_get_next_block(block, read_rows, eof);
     }
 
+    void set_create_row_id_column_iterator_func(
+            
std::function<std::shared_ptr<segment_v2::RowIdColumnIteratorV2>()> 
create_func) {
+        _create_topn_row_id_column_iterator = create_func;
+    }
+
 protected:
     // ---- Hook implementations ----
 
@@ -141,6 +146,66 @@ protected:
         return Status::OK();
     }
 
+    void _init_row_lineage_columns() {
+        const auto& table_desc = 
this->get_scan_range().table_format_params.iceberg_params;
+        if (table_desc.__isset.first_row_id) {
+            _row_lineage_columns.first_row_id = table_desc.first_row_id;
+        }
+        if (table_desc.__isset.last_updated_sequence_number) {
+            _row_lineage_columns.last_updated_sequence_number =
+                    table_desc.last_updated_sequence_number;
+        }
+    }
+
+    Status _fill_row_lineage_row_id(Block* block, size_t rows) {
+        int col_pos = block->get_position_by_name(ROW_LINEAGE_ROW_ID);
+        DCHECK(col_pos >= 0);
+        if (col_pos < 0) {
+            return Status::InternalError("Row lineage column {} not found in 
block",
+                                         ROW_LINEAGE_ROW_ID);
+        }
+
+        if (_row_lineage_columns.first_row_id >= 0) {
+            auto col = 
block->get_by_position(col_pos).column->assume_mutable();
+            auto* nullable_column = assert_cast<ColumnNullable*>(col.get());
+            auto& null_map = nullable_column->get_null_map_data();
+            auto& data =
+                    
assert_cast<ColumnInt64&>(*nullable_column->get_nested_column_ptr()).get_data();
+            const auto& row_ids = this->current_batch_row_positions();
+            for (size_t i = 0; i < rows; ++i) {
+                if (null_map[i] != 0) {
+                    null_map[i] = 0;
+                    data[i] = _row_lineage_columns.first_row_id + 
static_cast<int64_t>(row_ids[i]);
+                }
+            }
+        }
+        return Status::OK();
+    }
+
+    Status _fill_row_lineage_last_updated_sequence_number(Block* block, size_t 
rows) {
+        int col_pos = 
block->get_position_by_name(ROW_LINEAGE_LAST_UPDATED_SEQ_NUMBER);
+        DCHECK(col_pos >= 0);
+        if (col_pos < 0) {
+            return Status::InternalError("Row lineage column {} not found in 
block",
+                                         ROW_LINEAGE_LAST_UPDATED_SEQ_NUMBER);
+        }
+
+        if (_row_lineage_columns.last_updated_sequence_number >= 0) {
+            auto col = 
block->get_by_position(col_pos).column->assume_mutable();
+            auto* nullable_column = assert_cast<ColumnNullable*>(col.get());
+            auto& null_map = nullable_column->get_null_map_data();
+            auto& data =
+                    
assert_cast<ColumnInt64&>(*nullable_column->get_nested_column_ptr()).get_data();
+            for (size_t i = 0; i < rows; ++i) {
+                if (null_map[i] != 0) {
+                    null_map[i] = 0;
+                    data[i] = 
_row_lineage_columns.last_updated_sequence_number;
+                }
+            }
+        }
+        return Status::OK();
+    }
+
     // Called after reading a block: apply equality delete filter + shrink 
block
     Status on_after_read_block(Block* block, size_t* read_rows) override {
         if (!_equality_delete_impls.empty()) {
@@ -304,6 +369,18 @@ protected:
 
     // File column names used during init
     std::vector<std::string> _file_col_names;
+
+    std::function<std::shared_ptr<segment_v2::RowIdColumnIteratorV2>()>
+            _create_topn_row_id_column_iterator;
+
+    static constexpr const char* ROW_LINEAGE_ROW_ID = "_row_id";
+    static constexpr const char* ROW_LINEAGE_LAST_UPDATED_SEQ_NUMBER =
+            "_last_updated_sequence_number";
+    struct RowLineageColumns {
+        int64_t first_row_id = -1;
+        int64_t last_updated_sequence_number = -1;
+    };
+    RowLineageColumns _row_lineage_columns;
 };
 
 // ============================================================================
diff --git a/be/src/format/table/table_format_reader.h 
b/be/src/format/table/table_format_reader.h
index d8894209442..fe55f53ad5c 100644
--- a/be/src/format/table/table_format_reader.h
+++ b/be/src/format/table/table_format_reader.h
@@ -142,6 +142,53 @@ public:
         return Status::OK();
     }
 
+    Status clear_synthesized_columns(Block* block) {
+        for (auto& [name, handler] : _synthesized_col_handlers) {
+            int col_pos = block->get_position_by_name(name);
+            if (col_pos < 0) {
+                continue;
+            }
+            
block->get_by_position(static_cast<size_t>(col_pos)).column->assume_mutable()->clear();
+        }
+        return Status::OK();
+    }
+
+    using GeneratedColumnHandler = std::function<Status(Block* block, size_t 
rows)>;
+
+    // disable column lazy read, dict filter and min-max filter.
+    virtual bool disable_column_opt(std::string col_name) {
+        // generated columns may have complex expressions that are not 
compatible with lazy read, dict filter, or min-max filter.
+        for (auto& [name, handler] : _generated_col_handlers) {
+            if (name == col_name) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    void register_generated_column_handler(const std::string& col_name,
+                                           GeneratedColumnHandler handler) {
+        _generated_col_handlers.emplace_back(col_name, std::move(handler));
+    }
+
+    Status fill_generated_columns(Block* block, size_t rows) {
+        for (auto& [name, handler] : _generated_col_handlers) {
+            RETURN_IF_ERROR(handler(block, rows));
+        }
+        return Status::OK();
+    }
+
+    Status clear_generated_columns(Block* block) {
+        for (auto& [name, handler] : _generated_col_handlers) {
+            int col_pos = block->get_position_by_name(name);
+            if (col_pos < 0) {
+                continue;
+            }
+            
block->get_by_position(static_cast<size_t>(col_pos)).column->assume_mutable()->clear();
+        }
+        return Status::OK();
+    }
+
     /// Unified fill for partition + missing + synthesized columns.
     /// Called automatically by on_after_read_block for simple readers.
     /// Parquet/ORC call individual on_fill_* methods per-batch internally.
@@ -157,11 +204,14 @@ public:
         }
         RETURN_IF_ERROR(on_fill_missing_columns(block, rows, miss_col_names));
         RETURN_IF_ERROR(fill_synthesized_columns(block, rows));
+        RETURN_IF_ERROR(fill_generated_columns(block, rows));
         return Status::OK();
     }
 
     bool has_synthesized_column_handlers() const { return 
!_synthesized_col_handlers.empty(); }
 
+    bool has_generated_column_handlers() const { return 
!_generated_col_handlers.empty(); }
+
     /// Fill generated columns. Default is no-op.
     virtual Status on_fill_generated_columns(Block* block, size_t rows,
                                              const std::vector<std::string>& 
cols) {
@@ -199,6 +249,9 @@ protected:
 
     // ---- Synthesized column handlers ----
     std::vector<std::pair<std::string, SynthesizedColumnHandler>> 
_synthesized_col_handlers;
+
+    // ---- Generated column handlers ----
+    std::vector<std::pair<std::string, GeneratedColumnHandler>> 
_generated_col_handlers;
 };
 
 #include "common/compile_check_end.h"
diff --git a/be/src/format/table/transactional_hive_reader.cpp 
b/be/src/format/table/transactional_hive_reader.cpp
index 83c543448dd..3deacb3257c 100644
--- a/be/src/format/table/transactional_hive_reader.cpp
+++ b/be/src/format/table/transactional_hive_reader.cpp
@@ -63,6 +63,16 @@ Status 
TransactionalHiveReader::on_before_init_reader(ReaderInitContext* ctx) {
         if (desc.category == ColumnCategory::REGULAR ||
             desc.category == ColumnCategory::GENERATED) {
             _col_names.push_back(desc.name);
+        } else if (desc.category == ColumnCategory::SYNTHESIZED &&
+                   desc.name.starts_with(BeConsts::GLOBAL_ROWID_COL)) {
+            auto topn_row_id_column_iter = 
_create_topn_row_id_column_iterator();
+            this->register_synthesized_column_handler(
+                    desc.name,
+                    [iter = std::move(topn_row_id_column_iter), this, &desc](
+                            Block* block, size_t rows) -> Status {
+                        return fill_topn_row_id(iter, desc.name, block, rows);
+                    });
+            continue;
         }
     }
 
@@ -100,7 +110,8 @@ Status 
TransactionalHiveReader::on_before_init_reader(ReaderInitContext* ctx) {
 
         if 
(std::count(TransactionalHive::READ_ROW_COLUMN_NAMES_LOWER_CASE.begin(),
                        
TransactionalHive::READ_ROW_COLUMN_NAMES_LOWER_CASE.end(), slot_name) > 0) {
-            return Status::InternalError("xxxx");
+            return Status::InternalError("Column {} conflicts with ACID 
metadata column",
+                                         slot_name);
         }
 
         if (row_names_map.contains(slot_name)) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
index e1e437e78db..bdcd309677e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
@@ -202,9 +202,6 @@ public abstract class FileQueryScanNode extends 
FileScanNode {
      * Subclasses override this for format-specific classification.
      */
     protected TColumnCategory classifyColumn(SlotDescriptor slot, List<String> 
partitionKeys) {
-        if 
(Column.ICEBERG_ROWID_COL.equalsIgnoreCase(slot.getColumn().getName())) {
-            return TColumnCategory.SYNTHESIZED;
-        }
         if (partitionKeys.contains(slot.getColumn().getName())) {
             return TColumnCategory.PARTITION_KEY;
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
index 3707b4bb238..9a8aec052f9 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.datasource.hive.source;
 
+import org.apache.doris.analysis.SlotDescriptor;
 import org.apache.doris.analysis.TupleDescriptor;
 import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.Env;
@@ -52,6 +53,7 @@ import org.apache.doris.planner.ScanContext;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.SessionVariable;
 import org.apache.doris.spi.Split;
+import org.apache.doris.thrift.TColumnCategory;
 import org.apache.doris.thrift.TFileAttributes;
 import org.apache.doris.thrift.TFileCompressType;
 import org.apache.doris.thrift.TFileFormatType;
@@ -345,6 +347,14 @@ public class HiveScanNode extends FileQueryScanNode {
         }
     }
 
+    @Override
+    protected TColumnCategory classifyColumn(SlotDescriptor slot, List<String> 
partitionKeys) {
+        if (slot.getColumn().getName().startsWith(Column.GLOBAL_ROWID_COL)) {
+            return TColumnCategory.SYNTHESIZED;
+        }
+        return super.classifyColumn(slot, partitionKeys);
+    }
+
     private long determineTargetFileSplitSize(List<FileCacheValue> fileCaches,
             boolean isBatchMode) {
         if (sessionVariable.getFileSplitSize() > 0) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
index 121dc0ad152..adc2507e249 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
@@ -54,6 +54,7 @@ import org.apache.doris.planner.ScanContext;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.SessionVariable;
 import org.apache.doris.spi.Split;
+import org.apache.doris.thrift.TColumnCategory;
 import org.apache.doris.thrift.TExplainLevel;
 import org.apache.doris.thrift.TFileFormatType;
 import org.apache.doris.thrift.TFileRangeDesc;
@@ -877,6 +878,20 @@ public class IcebergScanNode extends FileQueryScanNode {
         return split;
     }
 
+    @Override
+    protected TColumnCategory classifyColumn(SlotDescriptor slot, List<String> 
partitionKeys) {
+        if 
(Column.ICEBERG_ROWID_COL.equalsIgnoreCase(slot.getColumn().getName())) {
+            return TColumnCategory.SYNTHESIZED;
+        }
+        if (slot.getColumn().getName().startsWith(Column.GLOBAL_ROWID_COL)) {
+            return TColumnCategory.SYNTHESIZED;
+        }
+        if (IcebergUtils.isIcebergRowLineageColumn(slot.getColumn())) {
+            return TColumnCategory.GENERATED;
+        }
+        return super.classifyColumn(slot, partitionKeys);
+    }
+
     private List<Split> doGetSplits(int numBackends) throws UserException {
         if (isSystemTable) {
             return doGetSystemTableSplits();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java
index 16037395dc3..ce22e96c80b 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java
@@ -17,7 +17,9 @@
 
 package org.apache.doris.datasource.tvf.source;
 
+import org.apache.doris.analysis.SlotDescriptor;
 import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.FunctionGenTable;
 import org.apache.doris.catalog.TableIf;
@@ -39,6 +41,7 @@ import org.apache.doris.system.Backend;
 import org.apache.doris.tablefunction.ExternalFileTableValuedFunction;
 import org.apache.doris.tablefunction.LocalTableValuedFunction;
 import org.apache.doris.thrift.TBrokerFileStatus;
+import org.apache.doris.thrift.TColumnCategory;
 import org.apache.doris.thrift.TFileAttributes;
 import org.apache.doris.thrift.TFileCompressType;
 import org.apache.doris.thrift.TFileFormatType;
@@ -169,6 +172,14 @@ public class TVFScanNode extends FileQueryScanNode {
         return splits;
     }
 
+    @Override
+    protected TColumnCategory classifyColumn(SlotDescriptor slot, List<String> 
partitionKeys) {
+        if (slot.getColumn().getName().startsWith(Column.GLOBAL_ROWID_COL)) {
+            return TColumnCategory.SYNTHESIZED;
+        }
+        return super.classifyColumn(slot, partitionKeys);
+    }
+
     private long determineTargetFileSplitSize(List<TBrokerFileStatus> 
fileStatuses) {
         if (sessionVariable.getFileSplitSize() > 0) {
             return sessionVariable.getFileSplitSize();
diff --git 
a/regression-test/suites/external_table_p0/iceberg/action/test_iceberg_v3_row_lineage_rewrite_data_files.groovy
 
b/regression-test/suites/external_table_p0/iceberg/action/test_iceberg_v3_row_lineage_rewrite_data_files.groovy
new file mode 100644
index 00000000000..438276c6946
--- /dev/null
+++ 
b/regression-test/suites/external_table_p0/iceberg/action/test_iceberg_v3_row_lineage_rewrite_data_files.groovy
@@ -0,0 +1,244 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_iceberg_v3_row_lineage_rewrite_data_files", 
"p0,external,iceberg,external_docker,external_docker_iceberg") {
+    String enabled = context.config.otherConfigs.get("enableIcebergTest")
+    if (enabled == null || !enabled.equalsIgnoreCase("true")) {
+        logger.info("Iceberg test is disabled")
+        return
+    }
+
+    String catalogName = "test_iceberg_v3_row_lineage_rewrite_data_files"
+    String dbName = "test_row_lineage_rewrite_db"
+    String restPort = context.config.otherConfigs.get("iceberg_rest_uri_port")
+    String minioPort = context.config.otherConfigs.get("iceberg_minio_port")
+    String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+    String endpoint = "http://${externalEnvIp}:${minioPort}";
+
+    def formats = ["parquet", "orc"]
+
+    def schemaContainsField = { schemaRows, fieldName ->
+        String target = fieldName.toLowerCase()
+        return schemaRows.any { row -> 
row.toString().toLowerCase().contains(target) }
+    }
+
+    def fileSchemaRows = { filePath, format ->
+        return sql("""
+            desc function s3(
+                "uri" = "${filePath}",
+                "format" = "${format}",
+                "s3.access_key" = "admin",
+                "s3.secret_key" = "password",
+                "s3.endpoint" = "${endpoint}",
+                "s3.region" = "us-east-1"
+            )
+        """)
+    }
+
+    def assertCurrentFilesContainRowLineageColumns = { tableName, format ->
+        def files = sql("""select file_path, lower(file_format) from 
${tableName}\$files order by file_path""")
+        log.info("Checking rewritten files for physical row lineage columns in 
${tableName}: ${files}")
+        assertTrue(files.size() > 0, "Current files should exist for 
${tableName}")
+        files.each { row ->
+            assertEquals(format, row[1].toString())
+            assertTrue(row[0].toString().endsWith(format == "parquet" ? 
".parquet" : ".orc"),
+                    "Current data file should match ${format} for 
${tableName}, file=${row[0]}")
+            def schemaRows = fileSchemaRows(row[0].toString(), format)
+            log.info("Rewritten ${format} schema for ${tableName}, 
file=${row[0]} -> ${schemaRows}")
+            assertTrue(schemaContainsField(schemaRows, "_row_id"),
+                    "Rewritten file should physically contain _row_id for 
${tableName}, schema=${schemaRows}")
+            assertTrue(schemaContainsField(schemaRows, 
"_last_updated_sequence_number"),
+                    "Rewritten file should physically contain 
_last_updated_sequence_number for ${tableName}, schema=${schemaRows}")
+        }
+    }
+
+    def assertCurrentFilesDoNotContainRowLineageColumns = { tableName, format 
->
+        def files = sql("""select file_path, lower(file_format) from 
${tableName}\$files order by file_path""")
+        log.info("Checking regular INSERT files for absence of physical row 
lineage columns in ${tableName}: ${files}")
+        assertTrue(files.size() > 0, "Current files should exist for 
${tableName}")
+        files.each { row ->
+            assertEquals(format, row[1].toString())
+            assertTrue(row[0].toString().endsWith(format == "parquet" ? 
".parquet" : ".orc"),
+                    "Current data file should match ${format} for 
${tableName}, file=${row[0]}")
+            def schemaRows = fileSchemaRows(row[0].toString(), format)
+            log.info("Regular INSERT ${format} schema for ${tableName}, 
file=${row[0]} -> ${schemaRows}")
+            assertTrue(!schemaContainsField(schemaRows, "_row_id"),
+                    "Normal INSERT file should not contain _row_id for 
${tableName}, schema=${schemaRows}")
+            assertTrue(!schemaContainsField(schemaRows, 
"_last_updated_sequence_number"),
+                    "Normal INSERT file should not contain 
_last_updated_sequence_number for ${tableName}, schema=${schemaRows}")
+        }
+    }
+
+    def lineageMap = { tableName ->
+        def rows = sql("""
+            select id, _row_id, _last_updated_sequence_number
+            from ${tableName}
+            order by id
+        """)
+        Map<Integer, List<String>> result = [:]
+        rows.each { row ->
+            result[row[0].toString().toInteger()] = [row[1].toString(), 
row[2].toString()]
+        }
+        log.info("Built lineage map for ${tableName}: ${result}")
+        return result
+    }
+
+    def assertLineageMapEquals = { expected, actual, tableName ->
+        log.info("Comparing lineage maps for ${tableName}: 
expected=${expected}, actual=${actual}")
+        assertEquals(expected.size(), actual.size())
+        expected.each { key, value ->
+            assertTrue(actual.containsKey(key), "Missing id=${key} after 
rewrite for ${tableName}")
+            assertEquals(value[0], actual[key][0])
+            assertEquals(value[1], actual[key][1])
+        }
+    }
+
+    def runRewriteAndAssert = { tableName, format, expectedCount ->
+        def filesBefore = sql("""select file_path from ${tableName}\$files 
order by file_path""")
+        def snapshotsBefore = sql("""select snapshot_id from 
${tableName}\$snapshots order by committed_at""")
+        log.info("Checking rewrite preconditions for ${tableName}: 
filesBefore=${filesBefore}, snapshotsBefore=${snapshotsBefore}")
+        assertTrue(filesBefore.size() >= 2,
+                "Rewrite test requires at least 2 input files for 
${tableName}, but got ${filesBefore.size()}")
+
+        def visibleBefore = sql("""select * from ${tableName} order by id""")
+        def rowLineageBefore = lineageMap(tableName)
+        log.info("Visible rows before rewrite for ${tableName}: 
${visibleBefore}")
+
+        assertCurrentFilesDoNotContainRowLineageColumns(tableName, format)
+
+        def rewriteResult = sql("""
+            alter table ${catalogName}.${dbName}.${tableName}
+            execute rewrite_data_files(
+                "target-file-size-bytes" = "10485760",
+                "min-input-files" = "1"
+            )
+        """)
+        log.info("rewrite_data_files result for ${tableName}: 
${rewriteResult}")
+        assertTrue(rewriteResult.size() > 0, "rewrite_data_files should return 
summary rows for ${tableName}")
+        int rewrittenFiles = rewriteResult[0][0] as int
+        assertTrue(rewrittenFiles > 0, "rewrite_data_files should rewrite at 
least one file for ${tableName}")
+
+        def visibleAfter = sql("""select * from ${tableName} order by id""")
+        log.info("Visible rows after rewrite for ${tableName}: 
${visibleAfter}")
+        assertEquals(visibleBefore, visibleAfter)
+
+        def rowLineageAfter = lineageMap(tableName)
+        assertLineageMapEquals(rowLineageBefore, rowLineageAfter, tableName)
+
+        def countAfter = sql("""select count(*) from ${tableName}""")
+        log.info("Checking row count after rewrite for ${tableName}: 
${countAfter}")
+        assertEquals(expectedCount, countAfter[0][0].toString().toInteger())
+
+        def snapshotsAfter = sql("""select snapshot_id from 
${tableName}\$snapshots order by committed_at""")
+        log.info("Snapshots after rewrite for ${tableName}: ${snapshotsAfter}")
+        assertTrue(snapshotsAfter.size() > snapshotsBefore.size(),
+                "rewrite_data_files should create a new snapshot for 
${tableName}")
+
+        assertCurrentFilesContainRowLineageColumns(tableName, format)
+
+        def sampleRowId = rowLineageAfter.entrySet().iterator().next().value[0]
+        def sampleQuery = sql("""select count(*) from ${tableName} where 
_row_id = ${sampleRowId}""")
+        log.info("Checking sample _row_id predicate after rewrite for 
${tableName}: sampleRowId=${sampleRowId}, result=${sampleQuery}")
+        assertEquals(1, sampleQuery[0][0].toString().toInteger())
+    }
+
+    sql """drop catalog if exists ${catalogName}"""
+    sql """
+        create catalog if not exists ${catalogName} properties (
+            "type" = "iceberg",
+            "iceberg.catalog.type" = "rest",
+            "uri" = "http://${externalEnvIp}:${restPort}";,
+            "s3.access_key" = "admin",
+            "s3.secret_key" = "password",
+            "s3.endpoint" = "${endpoint}",
+            "s3.region" = "us-east-1"
+        )
+    """
+
+    sql """switch ${catalogName}"""
+    sql """create database if not exists ${dbName}"""
+    sql """use ${dbName}"""
+    sql """set enable_fallback_to_original_planner = false"""
+    sql """set show_hidden_columns = false"""
+
+    try {
+        formats.each { format ->
+            String rewriteTable = 
"test_row_lineage_rewrite_unpartitioned_${format}"
+            String rewritePartitionTable = 
"test_row_lineage_rewrite_partitioned_${format}"
+            log.info("Run rewrite_data_files row lineage test with format 
${format}")
+
+            try {
+                sql """drop table if exists ${rewriteTable}"""
+                sql """
+                    create table ${rewriteTable} (
+                        id int,
+                        name string,
+                        score int
+                    ) engine=iceberg
+                    properties (
+                        "format-version" = "3",
+                        "write.format.default" = "${format}"
+                    )
+                """
+
+                sql """insert into ${rewriteTable} values (1, 'A', 10), (2, 
'B', 20)"""
+                sql """insert into ${rewriteTable} values (3, 'C', 30), (4, 
'D', 40)"""
+                sql """insert into ${rewriteTable} values (5, 'E', 50), (6, 
'F', 60)"""
+                log.info("Inserted three batches into ${rewriteTable} to 
prepare rewrite_data_files input files")
+
+                // Assert baseline:
+                // 1. Data files from regular INSERT do not physically contain 
the two row lineage columns.
+                // 2. After rewrite_data_files, every current data file should 
contain both row lineage columns.
+                // 3. Visible query results stay unchanged before and after 
rewrite.
+                // 4. _row_id and _last_updated_sequence_number stay stable 
for every row across rewrite.
+                runRewriteAndAssert(rewriteTable, format, 6)
+
+                sql """drop table if exists ${rewritePartitionTable}"""
+                sql """
+                    create table ${rewritePartitionTable} (
+                        id int,
+                        name string,
+                        score int,
+                        dt date
+                    ) engine=iceberg
+                    partition by list (day(dt)) ()
+                    properties (
+                        "format-version" = "3",
+                        "write.format.default" = "${format}"
+                    )
+                """
+
+                sql """insert into ${rewritePartitionTable} values (11, 'P1', 
10, '2024-01-01'), (12, 'P2', 20, '2024-01-01')"""
+                sql """insert into ${rewritePartitionTable} values (13, 'P3', 
30, '2024-01-01'), (14, 'P4', 40, '2024-02-01')"""
+                sql """insert into ${rewritePartitionTable} values (15, 'P5', 
50, '2024-02-01'), (16, 'P6', 60, '2024-01-01')"""
+                log.info("Inserted three partitioned batches into 
${rewritePartitionTable} to prepare rewrite_data_files input files")
+
+                // Assert baseline:
+                // 1. Partitioned tables also write row lineage columns 
physically only during rewrite.
+                // 2. Business data and row lineage values stay stable before 
and after rewrite.
+                // 3. _row_id predicate queries remain available after rewrite.
+                runRewriteAndAssert(rewritePartitionTable, format, 6)
+            } finally {
+                sql """drop table if exists ${rewritePartitionTable}"""
+                sql """drop table if exists ${rewriteTable}"""
+            }
+        }
+    } finally {
+        sql """drop database if exists ${dbName} force"""
+        sql """drop catalog if exists ${catalogName}"""
+    }
+}
diff --git 
a/regression-test/suites/external_table_p0/iceberg/test_iceberg_v2_to_v3_doris_spark_compare.groovy
 
b/regression-test/suites/external_table_p0/iceberg/test_iceberg_v2_to_v3_doris_spark_compare.groovy
new file mode 100644
index 00000000000..df6d1bbea20
--- /dev/null
+++ 
b/regression-test/suites/external_table_p0/iceberg/test_iceberg_v2_to_v3_doris_spark_compare.groovy
@@ -0,0 +1,223 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_iceberg_v2_to_v3_doris_spark_compare", 
"p0,external,iceberg,external_docker,external_docker_iceberg") {
+    def enabled = context.config.otherConfigs.get("enableIcebergTest")
+    if (enabled == null || !enabled.equalsIgnoreCase("true")) {
+        logger.info("Iceberg test is disabled")
+        return
+    }
+
+    def catalogName = "test_iceberg_v2_to_v3_doris_spark_compare"
+    def dbName = "test_v2_to_v3_doris_spark_compare_db"
+    def restPort = context.config.otherConfigs.get("iceberg_rest_uri_port")
+    def minioPort = context.config.otherConfigs.get("iceberg_minio_port")
+    def externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+
+    def formats = ["parquet", "orc"]
+
+    def tableNameForFormat = { baseName, format ->
+        return format == "parquet" ? baseName : "${baseName}_orc"
+    }
+
+    sql """drop catalog if exists ${catalogName}"""
+    sql """
+        create catalog if not exists ${catalogName} properties (
+            "type" = "iceberg",
+            "iceberg.catalog.type" = "rest",
+            "uri" = "http://${externalEnvIp}:${restPort}";,
+            "s3.access_key" = "admin",
+            "s3.secret_key" = "password",
+            "s3.endpoint" = "http://${externalEnvIp}:${minioPort}";,
+            "s3.region" = "us-east-1"
+        )
+    """
+
+    sql """switch ${catalogName}"""
+    sql """use ${dbName}"""
+    sql """set enable_fallback_to_original_planner = false"""
+
+    try {
+        def assertV2RowsAreNullAfterUpgrade = { tableName ->
+            def rows = sql """
+                select id, _row_id, _last_updated_sequence_number
+                from ${tableName}
+                order by id
+            """
+            assertEquals(2, rows.size())
+            rows.each { row ->
+                assertTrue(row[1] == null,
+                        "_row_id should be null for v2 rows after upgrade in 
${tableName}, row=${row}")
+                assertTrue(row[2] == null,
+                        "_last_updated_sequence_number should be null for v2 
rows after upgrade in ${tableName}, row=${row}")
+            }
+        }
+
+        def assertV23RowsNotNullAfterUpd = { tableName ->
+            def rows = sql """
+                select id, _row_id, _last_updated_sequence_number
+                from ${tableName}
+                order by id
+            """
+            rows.each { row ->
+                assertTrue(row[1] != null,
+                    "_row_id should be non-null after Doris operator for 
${tableName}")                        
+                assertTrue(row[2] != null,
+                    "_last_updated_sequence_number should be non-null after 
Doris operator for ${tableName}")
+
+            }
+        }
+
+        def upgradeV3DorisOperationInsert = { tableName ->
+            assertV2RowsAreNullAfterUpgrade(tableName)
+
+            sql """
+                insert into ${tableName} values
+                (4, 'post_v3_i', 400, date '2024-01-04')
+            """
+
+            def rows = sql """
+                select id, tag, score, _row_id, _last_updated_sequence_number
+                from ${tableName}
+                order by id
+            """
+            assertEquals(3, rows.size())
+            assertEquals(4, rows[2][0].toString().toInteger())
+            assertEquals("post_v3_i", rows[2][1])
+            assertV23RowsNotNullAfterUpd(tableName)
+        }
+
+        def upgradeV3DorisOperationDelete = { tableName ->
+            assertV2RowsAreNullAfterUpgrade(tableName)
+
+            sql """
+                delete from ${tableName}
+                where id = 3
+            """
+
+            def rows = sql """
+                select id, tag, score
+                from ${tableName}
+                order by id
+            """
+            assertEquals(1, rows.size())
+            assertEquals(1, rows[0][0].toString().toInteger())
+            assertV23RowsNotNullAfterUpd(tableName)
+
+        }
+
+        def upgradeV3DorisOperationUpdate = { tableName ->
+            assertV2RowsAreNullAfterUpgrade(tableName)
+
+            sql """
+                update ${tableName}
+                set tag = 'post_v3_u', score = score + 20
+                where id = 1
+            """
+
+            def rows = sql """
+                select id, tag, score
+                from ${tableName}
+                order by id
+            """
+            assertEquals(2, rows.size())
+            assertEquals(1, rows[0][0].toString().toInteger())
+            assertEquals("post_v3_u", rows[0][1])
+            assertV23RowsNotNullAfterUpd(tableName)
+        }
+
+        def upgradeV3DorisOperationRewrite = { tableName ->
+            assertV2RowsAreNullAfterUpgrade(tableName)
+
+            def rewriteResult = sql("""
+                alter table ${catalogName}.${dbName}.${tableName}
+                execute rewrite_data_files(
+                    "target-file-size-bytes" = "10485760",
+                    "min-input-files" = "1"
+                )
+            """)
+            assertTrue(rewriteResult.size() > 0,
+                    "rewrite_data_files should return summary rows for 
${tableName}")
+
+            def rowCount = sql """
+                select count(*)
+                from ${tableName}
+            """
+            assertEquals(2, rowCount[0][0].toString().toInteger())
+            assertV23RowsNotNullAfterUpd(tableName)
+        }
+
+        formats.each { format ->
+            def rowLineageNullTable = 
tableNameForFormat("v2v3_row_lineage_null_after_upgrade", format)
+            def sparkReferenceTable = 
tableNameForFormat("v2v3_spark_ops_reference", format)
+            def dorisTargetTable = tableNameForFormat("v2v3_doris_ops_target", 
format)
+            log.info("Run v2-to-v3 Doris/Spark compare test with format 
${format}")
+
+            def scenario1Rows = sql """
+                select id, _row_id, _last_updated_sequence_number
+                from ${rowLineageNullTable}
+                order by id
+            """
+            assertEquals(3, scenario1Rows.size())
+            scenario1Rows.each { row ->
+                assertTrue(row[1] == null,
+                        "_row_id should be null for rows written before v3 
upgrade, row=${row}")
+                assertTrue(row[2] == null,
+                        "_last_updated_sequence_number should be null for rows 
written before v3 upgrade, row=${row}")
+            }
+
+            sql """
+                update ${dorisTargetTable}
+                set tag = 'post_v3_u', score = score + 20
+                where id = 2
+            """
+
+            sql """
+                insert into ${dorisTargetTable} values
+                (4, 'post_v3_i', 400, date '2024-02-04')
+            """
+
+            def dorisRewriteResult = sql("""
+                alter table ${catalogName}.${dbName}.${dorisTargetTable}
+                execute rewrite_data_files(
+                    "target-file-size-bytes" = "10485760",
+                    "min-input-files" = "1"
+                )
+            """)
+            assertTrue(dorisRewriteResult.size() > 0,
+                    "Doris rewrite_data_files should return summary rows")
+
+            check_sqls_result_equal """
+                select *
+                from ${dorisTargetTable}
+                order by id
+            """, """
+                select *
+                from ${sparkReferenceTable}
+                order by id
+            """
+
+            
upgradeV3DorisOperationInsert(tableNameForFormat("v2v3_doris_upd_case1", 
format))
+            
upgradeV3DorisOperationDelete(tableNameForFormat("v2v3_doris_upd_case2", 
format))
+            
upgradeV3DorisOperationUpdate(tableNameForFormat("v2v3_doris_upd_case3", 
format))
+            
upgradeV3DorisOperationRewrite(tableNameForFormat("v2v3_doris_upd_case4", 
format))
+        }
+
+    } finally {
+        sql """drop catalog if exists ${catalogName}"""
+    }
+}
diff --git 
a/regression-test/suites/external_table_p0/iceberg/test_iceberg_v3_row_lineage_query_insert.groovy
 
b/regression-test/suites/external_table_p0/iceberg/test_iceberg_v3_row_lineage_query_insert.groovy
new file mode 100644
index 00000000000..7276fadba76
--- /dev/null
+++ 
b/regression-test/suites/external_table_p0/iceberg/test_iceberg_v3_row_lineage_query_insert.groovy
@@ -0,0 +1,304 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_iceberg_v3_row_lineage_query_insert", 
"p0,external,iceberg,external_docker,external_docker_iceberg") {
+    String enabled = context.config.otherConfigs.get("enableIcebergTest")
+    if (enabled == null || !enabled.equalsIgnoreCase("true")) {
+        logger.info("Iceberg test is disabled")
+        return
+    }
+
+    String catalogName = "test_iceberg_v3_row_lineage_query_insert"
+    String dbName = "test_row_lineage_query_insert_db"
+    String restPort = context.config.otherConfigs.get("iceberg_rest_uri_port")
+    String minioPort = context.config.otherConfigs.get("iceberg_minio_port")
+    String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+    String endpoint = "http://${externalEnvIp}:${minioPort}";
+
+    def formats = ["parquet", "orc"]
+
+    def collectDescColumns = { rows ->
+        return rows.collect { row -> row[0].toString().toLowerCase() }
+    }
+
+    def schemaContainsField = { schemaRows, fieldName ->
+        String target = fieldName.toLowerCase()
+        return schemaRows.any { row -> 
row.toString().toLowerCase().contains(target) }
+    }
+
+    def fileSchemaRows = { filePath, format ->
+        return sql("""
+            desc function s3(
+                "uri" = "${filePath}",
+                "format" = "${format}",
+                "s3.access_key" = "admin",
+                "s3.secret_key" = "password",
+                "s3.endpoint" = "${endpoint}",
+                "s3.region" = "us-east-1"
+            )
+        """)
+    }
+
+    def assertCurrentFilesDoNotContainRowLineageColumns = { tableName, format, 
messagePrefix ->
+        def files = sql("""select file_path, lower(file_format) from 
${tableName}\$files order by file_path""")
+        log.info("${messagePrefix}: checking ${files.size()} current data 
files for ${tableName}: ${files}")
+        assertTrue(files.size() > 0, "Current data files should exist for 
${tableName}")
+        files.each { row ->
+            assertEquals(format, row[1].toString())
+            assertTrue(row[0].toString().endsWith(format == "parquet" ? 
".parquet" : ".orc"),
+                    "${messagePrefix} should write ${format} files for 
${tableName}, file=${row[0]}")
+            def schemaRows = fileSchemaRows(row[0].toString(), format)
+            log.info("${messagePrefix}: ${format} schema for ${tableName}, 
file=${row[0]} -> ${schemaRows}")
+            assertTrue(!schemaContainsField(schemaRows, "_row_id"),
+                    "${messagePrefix} should not physically write _row_id, 
schema=${schemaRows}")
+            assertTrue(!schemaContainsField(schemaRows, 
"_last_updated_sequence_number"),
+                    "${messagePrefix} should not physically write 
_last_updated_sequence_number, schema=${schemaRows}")
+        }
+    }
+
+    def assertRowLineageHiddenColumns = { tableName, visibleColumnCount ->
+        sql("""set show_hidden_columns = false""")
+        def descDefault = sql("""desc ${tableName}""")
+        def defaultColumns = collectDescColumns(descDefault)
+        log.info("Checking hidden-column default visibility for ${tableName}: 
desc=${descDefault}")
+        assertTrue(!defaultColumns.contains("_row_id"),
+                "DESC default should hide _row_id for ${tableName}, got 
${defaultColumns}")
+        assertTrue(!defaultColumns.contains("_last_updated_sequence_number"),
+                "DESC default should hide _last_updated_sequence_number for 
${tableName}, got ${defaultColumns}")
+
+        def selectVisible = sql("""select * from ${tableName} order by id""")
+        log.info("Checking visible SELECT * layout for ${tableName}: 
rowCount=${selectVisible.size()}, firstRow=${selectVisible ? selectVisible[0] : 
'EMPTY'}")
+        assertTrue(selectVisible.size() > 0, "SELECT * should return rows for 
${tableName}")
+        assertEquals(visibleColumnCount, selectVisible[0].size())
+
+        sql("""set show_hidden_columns = true""")
+        def descHidden = sql("""desc ${tableName}""")
+        def hiddenColumns = collectDescColumns(descHidden)
+        log.info("Checking hidden-column enabled visibility for ${tableName}: 
desc=${descHidden}")
+        assertTrue(hiddenColumns.contains("_row_id"),
+                "DESC with show_hidden_columns=true should expose _row_id for 
${tableName}, got ${hiddenColumns}")
+        assertTrue(hiddenColumns.contains("_last_updated_sequence_number"),
+                "DESC with show_hidden_columns=true should expose 
_last_updated_sequence_number for ${tableName}, got ${hiddenColumns}")
+
+        def selectHidden = sql("""select * from ${tableName} order by id""")
+        log.info("Checking hidden SELECT * layout for ${tableName}: 
rowCount=${selectHidden.size()}, firstRow=${selectHidden ? selectHidden[0] : 
'EMPTY'}")
+        assertTrue(selectHidden.size() > 0, "SELECT * with hidden columns 
should return rows for ${tableName}")
+        assertEquals(visibleColumnCount + 2 + 1, selectHidden[0].size()) // 
_row_id + _last_updated_sequence_number + __DORIS_ICEBERG_ROWID_COL__
+
+        sql("""set show_hidden_columns = false""")
+    }
+
+    def assertExplicitRowLineageReadable = { tableName, expectedIds ->
+        def rowLineageRows = sql("""
+            select id, _row_id, _last_updated_sequence_number
+            from ${tableName}
+            order by id
+        """)
+        log.info("Checking explicit row lineage projection for ${tableName}: 
rows=${rowLineageRows}")
+        assertEquals(expectedIds.size(), rowLineageRows.size())
+        for (int i = 0; i < expectedIds.size(); i++) {
+            assertEquals(expectedIds[i], 
rowLineageRows[i][0].toString().toInteger())
+            assertTrue(rowLineageRows[i][1] != null,
+                    "_row_id should be non-null for ${tableName}, 
row=${rowLineageRows[i]}")
+            assertTrue(rowLineageRows[i][2] != null,
+                    "_last_updated_sequence_number should be non-null for 
${tableName}, row=${rowLineageRows[i]}")
+        }
+
+        long firstRowId = rowLineageRows[0][1].toString().toLong()
+        long secondRowId = rowLineageRows[1][1].toString().toLong()
+        assertTrue(firstRowId < secondRowId,
+                "Row lineage ids should increase with row position for 
${tableName}, rows=${rowLineageRows}")
+
+        def byRowId = sql("""select id from ${tableName} where _row_id = 
${firstRowId} order by id""")
+        log.info("Checking single _row_id predicate for ${tableName}: 
rowId=${firstRowId}, result=${byRowId}")
+        assertEquals(1, byRowId.size())
+        assertEquals(expectedIds[0], byRowId[0][0].toString().toInteger())
+
+        def combinedPredicate = sql("""
+            select id
+            from ${tableName}
+            where id >= ${expectedIds[1]} and _row_id in 
(${rowLineageRows[1][1]}, ${rowLineageRows[2][1]})
+            order by id
+        """)
+        log.info("Checking combined business + _row_id predicate for 
${tableName}: result=${combinedPredicate}")
+        assertEquals(2, combinedPredicate.size())
+        assertEquals(expectedIds[1], 
combinedPredicate[0][0].toString().toInteger())
+        assertEquals(expectedIds[2], 
combinedPredicate[1][0].toString().toInteger())
+    }
+
+    sql """drop catalog if exists ${catalogName}"""
+    sql """
+        create catalog if not exists ${catalogName} properties (
+            "type" = "iceberg",
+            "iceberg.catalog.type" = "rest",
+            "uri" = "http://${externalEnvIp}:${restPort}";,
+            "s3.access_key" = "admin",
+            "s3.secret_key" = "password",
+            "s3.endpoint" = "${endpoint}",
+            "s3.region" = "us-east-1"
+        )
+    """
+
+    sql """switch ${catalogName}"""
+    sql """create database if not exists ${dbName}"""
+    sql """use ${dbName}"""
+    sql """set enable_fallback_to_original_planner = false"""
+    sql """set show_hidden_columns = false"""
+
+    try {
+        formats.each { format ->
+            String unpartitionedTable = 
"test_row_lineage_query_insert_unpartitioned_${format}"
+            String partitionedTable = 
"test_row_lineage_query_insert_partitioned_${format}"
+            log.info("Run row lineage query/insert test with format ${format}")
+
+            try {
+                sql """drop table if exists ${unpartitionedTable}"""
+                sql """
+                    create table ${unpartitionedTable} (
+                        id int,
+                        name string,
+                        age int
+                    ) engine=iceberg
+                    properties (
+                        "format-version" = "3",
+                        "write.format.default" = "${format}"
+                    )
+                """
+
+                sql """
+                    insert into ${unpartitionedTable} values(1, 'Alice', 25);
+                """
+                sql """ insert into ${unpartitionedTable} values(2, 'Bob', 30) 
"""
+                sql """ insert into ${unpartitionedTable} values(3, 'Charlie', 
35) """
+
+                log.info("Inserted initial rows into ${unpartitionedTable}")
+
+                // Assert baseline:
+                // 1. DESC and SELECT * hide row lineage columns by default.
+                // 2. show_hidden_columns=true exposes both hidden columns in 
DESC and SELECT *.
+                // 3. Explicit SELECT on row lineage columns returns non-null 
values.
+                assertRowLineageHiddenColumns(unpartitionedTable, 3)
+                assertExplicitRowLineageReadable(unpartitionedTable, [1, 2, 3])
+
+                test {
+                    sql """insert into ${unpartitionedTable}(_row_id, id, 
name, age) values (1, 9, 'BadRow', 99)"""
+                    exception "Cannot specify row lineage column '_row_id' in 
INSERT statement"
+                }
+
+                test {
+                    sql """
+                        insert into 
${unpartitionedTable}(_last_updated_sequence_number, id, name, age)
+                        values (1, 10, 'BadSeq', 100)
+                    """
+                    exception "Cannot specify row lineage column 
'_last_updated_sequence_number' in INSERT statement"
+                }
+
+                sql """insert into ${unpartitionedTable}(id, name, age) values 
(4, 'Doris', 40)"""
+                def unpartitionedCount = sql """select count(*) from 
${unpartitionedTable}"""
+                log.info("Checking row count after regular INSERT for 
${unpartitionedTable}: result=${unpartitionedCount}")
+                assertEquals(4, 
unpartitionedCount[0][0].toString().toInteger())
+
+                assertCurrentFilesDoNotContainRowLineageColumns(
+                        unpartitionedTable,
+                        format,
+                        "Unpartitioned normal INSERT")
+
+                sql """drop table if exists ${partitionedTable}"""
+                sql """
+                    create table ${partitionedTable} (
+                        id int,
+                        name string,
+                        age int,
+                        dt date
+                    ) engine=iceberg
+                    partition by list (day(dt)) ()
+                    properties (
+                        "format-version" = "3",
+                        "write.format.default" = "${format}"
+                    )
+                """
+
+                sql """ insert into ${partitionedTable} values(11, 'Penny', 
21, '2024-01-01')"""
+                sql """ insert into ${partitionedTable} values(12, 'Quinn', 
22, '2024-01-02')"""
+                sql """ insert into ${partitionedTable} values(13, 'Rita', 23, 
'2024-01-03')"""        
+                
+                log.info("Inserted initial rows into ${partitionedTable}")
+
+                // Assert baseline:
+                // 1. Partitioned tables follow the same row lineage semantics 
as unpartitioned tables.
+                // 2. Explicit SELECT on _row_id remains readable under 
partition predicates.
+                // 3. Regular INSERT still rejects hidden columns and does not 
write them physically.
+                assertRowLineageHiddenColumns(partitionedTable, 4)
+
+                def partitionLineageRows = sql """
+                    select id, _row_id, _last_updated_sequence_number
+                    from ${partitionedTable}
+                    where dt >= '2024-01-01'
+                    order by id
+                """
+                log.info("Checking partitioned row lineage projection for 
${partitionedTable}: rows=${partitionLineageRows}")
+                assertEquals(3, partitionLineageRows.size())
+                partitionLineageRows.each { row ->
+                    assertTrue(row[1] != null, "_row_id should be non-null for 
partitioned table row=${row}")
+                    assertTrue(row[2] != null, "_last_updated_sequence_number 
should be non-null for partitioned table row=${row}")
+                }
+
+                def exactPartitionPredicate = sql """
+                    select id
+                    from ${partitionedTable}
+                    where dt = '2024-01-02' and _row_id = 
${partitionLineageRows[1][1]}
+                """
+                log.info("Checking exact partition + _row_id predicate for 
${partitionedTable}: result=${exactPartitionPredicate}")
+                assertEquals(1, exactPartitionPredicate.size())
+                assertEquals(12, 
exactPartitionPredicate[0][0].toString().toInteger())
+
+                test {
+                    sql """
+                        insert into ${partitionedTable}(_row_id, id, name, 
age, dt)
+                        values (1, 14, 'BadPartitionRow', 24, '2024-01-04')
+                    """
+                    exception "Cannot specify row lineage column '_row_id' in 
INSERT statement"
+                }
+
+                test {
+                    sql """
+                        insert into 
${partitionedTable}(_last_updated_sequence_number, id, name, age, dt)
+                        values (1, 15, 'BadPartitionSeq', 25, '2024-01-05')
+                    """
+                    exception "Cannot specify row lineage column 
'_last_updated_sequence_number' in INSERT statement"
+                }
+
+                sql """insert into ${partitionedTable}(id, name, age, dt) 
values (14, 'Sara', 24, '2024-01-04')"""
+                def partitionedCount = sql """select count(*) from 
${partitionedTable}"""
+                log.info("Checking row count after regular INSERT for 
${partitionedTable}: result=${partitionedCount}")
+                assertEquals(4, partitionedCount[0][0].toString().toInteger())
+
+                assertCurrentFilesDoNotContainRowLineageColumns(
+                        partitionedTable,
+                        format,
+                        "Partitioned normal INSERT")
+            } finally {
+                sql """drop table if exists ${partitionedTable}"""
+                sql """drop table if exists ${unpartitionedTable}"""
+            }
+        }
+    } finally {
+        sql """set show_hidden_columns = false"""
+        sql """drop database if exists ${dbName} force"""
+        sql """drop catalog if exists ${catalogName}"""
+    }
+}
diff --git 
a/regression-test/suites/external_table_p0/iceberg/test_iceberg_v3_row_lineage_update_delete_merge.groovy
 
b/regression-test/suites/external_table_p0/iceberg/test_iceberg_v3_row_lineage_update_delete_merge.groovy
new file mode 100644
index 00000000000..4bce7387f86
--- /dev/null
+++ 
b/regression-test/suites/external_table_p0/iceberg/test_iceberg_v3_row_lineage_update_delete_merge.groovy
@@ -0,0 +1,292 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_iceberg_v3_row_lineage_update_delete_merge", 
"p0,external,iceberg,external_docker,external_docker_iceberg") {
+    String enabled = context.config.otherConfigs.get("enableIcebergTest")
+    if (enabled == null || !enabled.equalsIgnoreCase("true")) {
+        logger.info("Iceberg test is disabled")
+        return
+    }
+
+    String catalogName = "test_iceberg_v3_row_lineage_update_delete_merge"
+    String dbName = "test_row_lineage_update_delete_merge_db"
+    String restPort = context.config.otherConfigs.get("iceberg_rest_uri_port")
+    String minioPort = context.config.otherConfigs.get("iceberg_minio_port")
+    String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+    String endpoint = "http://${externalEnvIp}:${minioPort}";
+
+    def formats = ["parquet", "orc"]
+
+    def schemaContainsField = { schemaRows, fieldName ->
+        String target = fieldName.toLowerCase()
+        return schemaRows.any { row -> 
row.toString().toLowerCase().contains(target) }
+    }
+
+    def fileSchemaRows = { filePath, format ->
+        return sql("""
+            desc function s3(
+                "uri" = "${filePath}",
+                "format" = "${format}",
+                "s3.access_key" = "admin",
+                "s3.secret_key" = "password",
+                "s3.endpoint" = "${endpoint}",
+                "s3.region" = "us-east-1"
+            )
+        """)
+    }
+
+    def assertDeleteFilesArePuffin = { tableName ->
+        def deleteFiles = sql("""
+            select file_path, lower(file_format)
+            from ${tableName}\$delete_files
+            order by file_path
+        """)
+        log.info("Checking delete files for ${tableName}: ${deleteFiles}")
+        assertTrue(deleteFiles.size() > 0, "V3 table ${tableName} should 
produce delete files")
+        deleteFiles.each { row ->
+            assertTrue(row[0].toString().endsWith(".puffin"),
+                    "V3 delete file should be Puffin: ${row}")
+            assertEquals("puffin", row[1].toString())
+        }
+    }
+
+    def assertAtLeastOneCurrentDataFileHasRowLineageColumns = { tableName, 
format ->
+        def currentFiles = sql("""select file_path, lower(file_format) from 
${tableName}\$data_files order by file_path""")
+        log.info("Checking current data files for physical row lineage columns 
in ${tableName}: ${currentFiles}")
+        assertTrue(currentFiles.size() > 0, "Current data files should exist 
for ${tableName}")
+
+        boolean found = false
+        currentFiles.each { row ->
+            assertEquals(format, row[1].toString())
+            assertTrue(row[0].toString().endsWith(format == "parquet" ? 
".parquet" : ".orc"),
+                    "Current data file should match ${format} for 
${tableName}, file=${row[0]}")
+            def schemaRows = fileSchemaRows(row[0].toString(), format)
+            log.info("${format} schema for ${tableName}, file=${row[0]} -> 
${schemaRows}")
+            if (schemaContainsField(schemaRows, "_row_id")
+                    && schemaContainsField(schemaRows, 
"_last_updated_sequence_number")) {
+                found = true
+            }
+        }
+        assertTrue(found, "At least one current data file should physically 
contain row lineage columns for ${tableName}")
+    }
+
+    def assertExplicitRowLineageNonNull = { tableName, expectedRowCount ->
+        def rows = sql("""
+            select id, _row_id, _last_updated_sequence_number
+            from ${tableName}
+            order by id
+        """)
+        log.info("Checking explicit row lineage projection for ${tableName}: 
rows=${rows}")
+        assertEquals(expectedRowCount, rows.size())
+        rows.each { row ->
+            assertTrue(row[1] != null, "_row_id should be non-null for 
${tableName}, row=${row}")
+            assertTrue(row[2] != null, "_last_updated_sequence_number should 
be non-null for ${tableName}, row=${row}")
+        }
+    }
+
+    def lineageMap = { tableName ->
+        def rows = sql("""
+            select id, _row_id, _last_updated_sequence_number
+            from ${tableName}
+            order by id
+        """)
+        Map<Integer, List<String>> result = [:]
+        rows.each { row ->
+            result[row[0].toString().toInteger()] = [row[1].toString(), 
row[2].toString()]
+        }
+        log.info("Built lineage map for ${tableName}: ${result}")
+        return result
+    }
+
+    sql """drop catalog if exists ${catalogName}"""
+    sql """
+        create catalog if not exists ${catalogName} properties (
+            "type" = "iceberg",
+            "iceberg.catalog.type" = "rest",
+            "uri" = "http://${externalEnvIp}:${restPort}";,
+            "s3.access_key" = "admin",
+            "s3.secret_key" = "password",
+            "s3.endpoint" = "${endpoint}",
+            "s3.region" = "us-east-1"
+        )
+    """
+
+    sql """switch ${catalogName}"""
+    sql """create database if not exists ${dbName}"""
+    sql """use ${dbName}"""
+    sql """set enable_fallback_to_original_planner = false"""
+    sql """set show_hidden_columns = false"""
+
+    try {
+        formats.each { format ->
+            String updateDeleteTable = 
"test_row_lineage_v3_update_delete_${format}"
+            String mergeTable = "test_row_lineage_v3_merge_${format}"
+            log.info("Run row lineage update/delete/merge test with format 
${format}")
+
+            try {
+                sql """drop table if exists ${updateDeleteTable}"""
+                sql """
+                    create table ${updateDeleteTable} (
+                        id int,
+                        name string,
+                        age int
+                    ) engine=iceberg
+                    properties (
+                        "format-version" = "3",
+                        "write.format.default" = "${format}"
+                    )
+                """
+
+                sql """insert into ${updateDeleteTable} values (1, 'Alice', 
25) """ 
+                sql """insert into ${updateDeleteTable} values (2, 'Bob', 30) 
""" 
+                sql """insert into ${updateDeleteTable} values (3, 'Charlie', 
35)""" 
+
+                def updateDeleteLineageBefore = lineageMap(updateDeleteTable)
+                log.info("Lineage before UPDATE/DELETE on 
${updateDeleteTable}: ${updateDeleteLineageBefore}")
+                sql """update ${updateDeleteTable} set name = 'Alice_u', age = 
26 where id = 1"""
+                sql """delete from ${updateDeleteTable} where id = 2"""
+
+                // Assert baseline:
+                // 1. UPDATE keeps rows readable and applies the new values.
+                // 2. DELETE removes the target row.
+                // 3. V3 delete files use Puffin deletion vectors instead of 
delete_pos parquet/orc files.
+                // 4. Explicit row lineage reads remain non-null after DML.
+                def updateDeleteRows = sql """select * from 
${updateDeleteTable} order by id"""
+                log.info("Checking table rows after UPDATE/DELETE on 
${updateDeleteTable}: ${updateDeleteRows}")
+                assertEquals(2, updateDeleteRows.size())
+                assertEquals(1, updateDeleteRows[0][0].toString().toInteger())
+                assertEquals("Alice_u", updateDeleteRows[0][1])
+                assertEquals(26, updateDeleteRows[0][2].toString().toInteger())
+                assertEquals(3, updateDeleteRows[1][0].toString().toInteger())
+                assertEquals("Charlie", updateDeleteRows[1][1])
+                assertEquals(35, updateDeleteRows[1][2].toString().toInteger())
+
+                assertExplicitRowLineageNonNull(updateDeleteTable, 2)
+                def updateDeleteLineageAfter = lineageMap(updateDeleteTable)
+                log.info("Lineage after UPDATE/DELETE on ${updateDeleteTable}: 
${updateDeleteLineageAfter}")
+                assertEquals(updateDeleteLineageBefore[1][0], 
updateDeleteLineageAfter[1][0])
+                assertTrue(updateDeleteLineageBefore[1][1] != 
updateDeleteLineageAfter[1][1],
+                        "UPDATE should change _last_updated_sequence_number 
for id=1")
+                assertTrue(updateDeleteLineageAfter[1][1].toLong() > 
updateDeleteLineageBefore[1][1].toLong(),
+                        "UPDATE should advance _last_updated_sequence_number 
for id=1")
+                assertEquals(updateDeleteLineageBefore[3][0], 
updateDeleteLineageAfter[3][0])
+                assertEquals(updateDeleteLineageBefore[3][1], 
updateDeleteLineageAfter[3][1])
+                assertTrue(!updateDeleteLineageAfter.containsKey(2), "Deleted 
row id=2 should not remain after DELETE")
+                assertDeleteFilesArePuffin(updateDeleteTable)
+                
assertAtLeastOneCurrentDataFileHasRowLineageColumns(updateDeleteTable, format)
+
+                def minRowIdAfterUpdate = sql """
+                    select min(_row_id)
+                    from ${updateDeleteTable}
+                """
+                def rowIdFilterResult = sql """
+                    select count(*)
+                    from ${updateDeleteTable}
+                    where _row_id = ${minRowIdAfterUpdate[0][0]}
+                """
+                log.info("Checking _row_id filter after UPDATE/DELETE on 
${updateDeleteTable}: minRowId=${minRowIdAfterUpdate}, 
result=${rowIdFilterResult}")
+                assertEquals(1, rowIdFilterResult[0][0].toString().toInteger())
+
+                sql """drop table if exists ${mergeTable}"""
+                sql """
+                    create table ${mergeTable} (
+                        id int,
+                        name string,
+                        age int,
+                        dt date
+                    ) engine=iceberg
+                    partition by list (day(dt)) ()
+                    properties (
+                        "format-version" = "3",
+                        "write.format.default" = "${format}"
+                    )
+                """
+
+                sql """ insert into ${mergeTable} values (1, 'Penny', 21, 
'2024-01-01') """
+                sql """ insert into ${mergeTable} values (2, 'Quinn', 22, 
'2024-01-02') """
+                sql """ insert into ${mergeTable} values (3, 'Rita', 23, 
'2024-01-03') """
+
+                def mergeLineageBefore = lineageMap(mergeTable)
+                log.info("Lineage before MERGE on ${mergeTable}: 
${mergeLineageBefore}")
+                sql """
+                    merge into ${mergeTable} t
+                    using (
+                        select 1 as id, 'Penny_u' as name, 31 as age, date 
'2024-01-01' as dt, 'U' as flag
+                        union all
+                        select 2, 'Quinn', 22, date '2024-01-02', 'D'
+                        union all
+                        select 4, 'Sara', 24, date '2024-01-04', 'I'
+                    ) s
+                    on t.id = s.id
+                    when matched and s.flag = 'D' then delete
+                    when matched then update set
+                        name = s.name,
+                        age = s.age
+                    when not matched then insert (id, name, age, dt)
+                    values (s.id, s.name, s.age, s.dt)
+                """
+
+                // Assert baseline:
+                // 1. MERGE applies DELETE, UPDATE, and INSERT actions in one 
statement.
+                // 2. The partitioned MERGE still writes Puffin deletion 
vectors.
+                // 3. At least one current data file written by MERGE contains 
physical row lineage columns.
+                def mergeRows = sql """select * from ${mergeTable} order by 
id"""
+                log.info("Checking table rows after MERGE on ${mergeTable}: 
${mergeRows}")
+                assertEquals(3, mergeRows.size())
+                assertEquals(1, mergeRows[0][0].toString().toInteger())
+                assertEquals("Penny_u", mergeRows[0][1])
+                assertEquals(31, mergeRows[0][2].toString().toInteger())
+                assertEquals(3, mergeRows[1][0].toString().toInteger())
+                assertEquals("Rita", mergeRows[1][1])
+                assertEquals(23, mergeRows[1][2].toString().toInteger())
+                assertEquals(4, mergeRows[2][0].toString().toInteger())
+                assertEquals("Sara", mergeRows[2][1])
+                assertEquals(24, mergeRows[2][2].toString().toInteger())
+
+                assertExplicitRowLineageNonNull(mergeTable, 3)
+                def mergeLineageAfter = lineageMap(mergeTable)
+                log.info("Lineage after MERGE on ${mergeTable}: 
${mergeLineageAfter}")
+                assertEquals(mergeLineageBefore[1][0], mergeLineageAfter[1][0])
+                assertTrue(mergeLineageBefore[1][1] != mergeLineageAfter[1][1],
+                        "MERGE UPDATE should change 
_last_updated_sequence_number for id=1")
+                assertTrue(mergeLineageAfter[1][1].toLong() > 
mergeLineageBefore[1][1].toLong(),
+                        "MERGE UPDATE should advance 
_last_updated_sequence_number for id=1")
+                assertEquals(mergeLineageBefore[3][0], mergeLineageAfter[3][0])
+                assertEquals(mergeLineageBefore[3][1], mergeLineageAfter[3][1])
+                assertTrue(!mergeLineageAfter.containsKey(2), "MERGE DELETE 
should remove id=2")
+                assertDeleteFilesArePuffin(mergeTable)
+                
assertAtLeastOneCurrentDataFileHasRowLineageColumns(mergeTable, format)
+
+                def insertedRowLineage = sql """
+                    select _row_id, _last_updated_sequence_number
+                    from ${mergeTable}
+                    where id = 4
+                """
+                log.info("Checking inserted MERGE row lineage for 
${mergeTable}: ${insertedRowLineage}")
+                assertEquals(1, insertedRowLineage.size())
+                assertTrue(insertedRowLineage[0][0] != null, "Inserted MERGE 
row should get generated _row_id")
+                assertTrue(insertedRowLineage[0][1] != null, "Inserted MERGE 
row should get generated _last_updated_sequence_number")
+            } finally {
+                sql """drop table if exists ${mergeTable}"""
+                sql """drop table if exists ${updateDeleteTable}"""
+            }
+        }
+    } finally {
+        sql """drop database if exists ${dbName} force"""
+        sql """drop catalog if exists ${catalogName}"""
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to