This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new a9ebcb80f05 branch-3.0:[fix](orc)fix core that iceberg-orc reader read 
miss iceberg.id orc file.(#49051) (#54167)
a9ebcb80f05 is described below

commit a9ebcb80f0555713fc69b5afdd6811edb95f3655
Author: daidai <[email protected]>
AuthorDate: Tue Aug 12 11:09:02 2025 +0800

    branch-3.0:[fix](orc)fix core that iceberg-orc reader read miss iceberg.id 
orc file.(#49051) (#54167)
    
    ### What problem does this PR solve?
    pick #49051
    but only fix:
    ```
    terminate called after throwing an instance of 'std::range_error'
      what():  Key not found: iceberg.id
    *** Query id: 6a93d7cdc9f44370-a40b07934a14c81b ***
    *** is nereids: 1 ***
    *** tablet id: 0 ***
    *** Aborted at 1753842428 (unix time) try "date -d @1753842428" if you are 
using GNU date ***
    *** Current BE git commitID: 910c4249c5 ***
    *** SIGABRT unknown detail explain (@0x5a46f) received by PID 369775 (TID 
371694 OR 0x7fad067ef640) from PID 369775; stack trace: ***
    terminate called recursively
    terminate called recursively
    terminate called recursively
    terminate called recursively
     0# doris::signal::(anonymous namespace)::FailureSignalHandler(int, 
siginfo_t*, void*) at 
/home/zcp/repo_center/doris_release/doris/be/src/common/signal_handler.h:421
     1# 0x00007FB12263EBF0 in /lib64/libc.so.6
     2# __pthread_kill_implementation in /lib64/libc.so.6
     3# gsignal in /lib64/libc.so.6
     4# abort in /lib64/libc.so.6
     5# __gnu_cxx::__verbose_terminate_handler() [clone .cold] at 
../../../../libstdc++-v3/libsupc++/vterminate.cc:75
     6# __cxxabiv1::__terminate(void (*)()) at 
../../../../libstdc++-v3/libsupc++/eh_terminate.cc:48
     7# 0x000055C047B28EC1 in /opt/apache-doris-3.0.6.2-bin-x64/be/lib/doris_be
     8# 0x000055C047B29014 in /opt/apache-doris-3.0.6.2-bin-x64/be/lib/doris_be
     9# orc::TypeImpl::getAttributeValue(std::__cxx11::basic_string<char, 
std::char_traits<char>, std::allocator<char> > const&) const in 
/opt/apache-doris-3.0.6.2-bin-x64/be/lib/doris_be
    10# 
doris::vectorized::OrcReader::get_schema_col_name_attribute(std::vector<std::__cxx11::basic_string<char,
 std::char_traits<char>, std::allocator<char> >, 
std::allocator<std::__cxx11::basic_string<char, std::char_traits<char>, 
std::allocator<char> > > >*, std::vector<unsigned long, std::allocator<unsigned 
long> >*, std::__cxx11::basic_string<char, std::char_traits<char>, 
std::allocator<char> >) at 
/home/zcp/repo_center/doris_release/doris/be/src/vec/exec/format/orc/vorc_reader.cpp:332
    11# 
doris::vectorized::IcebergOrcReader::_gen_col_name_maps(doris::vectorized::OrcReader*)
 at
    ```
---
 be/src/vec/exec/format/orc/vorc_reader.cpp         |  28 ++-
 be/src/vec/exec/format/orc/vorc_reader.h           |  11 +-
 be/src/vec/exec/format/parquet/schema_desc.h       |   4 +-
 be/src/vec/exec/format/table/iceberg_reader.cpp    | 190 +++++----------------
 be/src/vec/exec/format/table/iceberg_reader.h      |  29 +---
 .../vec/exec/format/table/table_format_reader.cpp  | 103 +++++++++++
 be/src/vec/exec/format/table/table_format_reader.h |  41 +++++
 .../format/table/transactional_hive_reader.cpp     |  10 +-
 be/src/vec/exec/scan/vfile_scanner.cpp             |   4 +-
 .../create_preinstalled_scripts/iceberg/run08.sql  |  48 ++++++
 .../iceberg/iceberg_schema_change2.out             | Bin 0 -> 673 bytes
 .../iceberg/iceberg_schema_change2.groovy          |  66 +++++++
 12 files changed, 347 insertions(+), 187 deletions(-)

diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp 
b/be/src/vec/exec/format/orc/vorc_reader.cpp
index acd979678fb..88c073866db 100644
--- a/be/src/vec/exec/format/orc/vorc_reader.cpp
+++ b/be/src/vec/exec/format/orc/vorc_reader.cpp
@@ -250,6 +250,10 @@ void OrcReader::_init_profile() {
 }
 
 Status OrcReader::_create_file_reader() {
+    if (_reader != nullptr) {
+        return Status::OK();
+    }
+
     if (_file_input_stream == nullptr) {
         _file_description.mtime =
                 _scan_range.__isset.modification_time ? 
_scan_range.modification_time : 0;
@@ -289,6 +293,7 @@ Status OrcReader::_create_file_reader() {
 
 Status OrcReader::init_reader(
         const std::vector<std::string>* column_names,
+        const std::vector<std::string>& missing_column_names,
         const std::unordered_map<std::string, ColumnValueRangeType>* 
colname_to_value_range,
         const VExprContextSPtrs& conjuncts, bool is_acid, const 
TupleDescriptor* tuple_descriptor,
         const RowDescriptor* row_descriptor,
@@ -296,6 +301,7 @@ Status OrcReader::init_reader(
         const std::unordered_map<int, VExprContextSPtrs>* 
slot_id_to_filter_conjuncts,
         const bool hive_use_column_names) {
     _column_names = column_names;
+    _missing_column_names_set.insert(missing_column_names.begin(), 
missing_column_names.end());
     _colname_to_value_range = colname_to_value_range;
     _lazy_read_ctx.conjuncts = conjuncts;
     _is_acid = is_acid;
@@ -332,14 +338,21 @@ Status 
OrcReader::get_parsed_schema(std::vector<std::string>* col_names,
 }
 
 Status OrcReader::get_schema_col_name_attribute(std::vector<std::string>* 
col_names,
-                                                std::vector<uint64_t>* 
col_attributes,
-                                                std::string attribute) {
+                                                std::vector<int32_t>* 
col_attributes,
+                                                const std::string& attribute,
+                                                bool* exist_attribute) {
     RETURN_IF_ERROR(_create_file_reader());
-    auto& root_type = _is_acid ? _remove_acid(_reader->getType()) : 
_reader->getType();
+    *exist_attribute = true;
+    const auto& root_type = _reader->getType();
     for (int i = 0; i < root_type.getSubtypeCount(); ++i) {
         col_names->emplace_back(get_field_name_lower_case(&root_type, i));
+
+        if (!root_type.getSubtype(i)->hasAttributeKey(attribute)) {
+            *exist_attribute = false;
+            return Status::OK();
+        }
         col_attributes->emplace_back(
-                
std::stol(root_type.getSubtype(i)->getAttributeValue(attribute)));
+                
std::stoi(root_type.getSubtype(i)->getAttributeValue(attribute)));
     }
     return Status::OK();
 }
@@ -355,8 +368,15 @@ Status OrcReader::_init_read_columns() {
     // TODO, should be removed in 2.2 or later
     _is_hive1_orc_or_use_idx = (is_hive1_orc || _is_hive1_orc_or_use_idx) &&
                                _scan_params.__isset.slot_name_to_schema_pos;
+
     for (size_t i = 0; i < _column_names->size(); ++i) {
         auto& col_name = (*_column_names)[i];
+
+        if (_missing_column_names_set.contains(col_name)) {
+            _missing_cols.emplace_back(col_name);
+            continue;
+        }
+
         if (_is_hive1_orc_or_use_idx) {
             auto iter = _scan_params.slot_name_to_schema_pos.find(col_name);
             if (iter != _scan_params.slot_name_to_schema_pos.end()) {
diff --git a/be/src/vec/exec/format/orc/vorc_reader.h 
b/be/src/vec/exec/format/orc/vorc_reader.h
index 551c000dca3..77cc22b54ed 100644
--- a/be/src/vec/exec/format/orc/vorc_reader.h
+++ b/be/src/vec/exec/format/orc/vorc_reader.h
@@ -144,6 +144,7 @@ public:
     //If you want to read the file by index instead of column name, set 
hive_use_column_names to false.
     Status init_reader(
             const std::vector<std::string>* column_names,
+            const std::vector<std::string>& missing_column_names,
             const std::unordered_map<std::string, ColumnValueRangeType>* 
colname_to_value_range,
             const VExprContextSPtrs& conjuncts, bool is_acid,
             const TupleDescriptor* tuple_descriptor, const RowDescriptor* 
row_descriptor,
@@ -184,8 +185,8 @@ public:
                              std::vector<TypeDescriptor>* col_types) override;
 
     Status get_schema_col_name_attribute(std::vector<std::string>* col_names,
-                                         std::vector<uint64_t>* col_attributes,
-                                         std::string attribute);
+                                         std::vector<int32_t>* col_attributes,
+                                         const std::string& attribute, bool* 
exist_attribute);
     void set_table_col_to_file_col(
             std::unordered_map<std::string, std::string> 
table_col_to_file_col) {
         _table_col_to_file_col = table_col_to_file_col;
@@ -566,6 +567,10 @@ private:
     int64_t _range_size;
     const std::string& _ctz;
     const std::vector<std::string>* _column_names;
+    // _missing_column_names_set: used in iceberg/hudi/paimon, the columns are 
dropped
+    // but added back(drop column a then add column a). Shouldn't read this 
column data in this case.
+    std::set<std::string> _missing_column_names_set;
+
     int32_t _offset_days = 0;
     cctz::time_zone _time_zone;
 
@@ -590,7 +595,7 @@ private:
     OrcProfile _orc_profile;
 
     std::unique_ptr<orc::ColumnVectorBatch> _batch;
-    std::unique_ptr<orc::Reader> _reader;
+    std::unique_ptr<orc::Reader> _reader = nullptr;
     std::unique_ptr<orc::RowReader> _row_reader;
     std::unique_ptr<ORCFilterImpl> _orc_filter;
     orc::RowReaderOptions _row_reader_options;
diff --git a/be/src/vec/exec/format/parquet/schema_desc.h 
b/be/src/vec/exec/format/parquet/schema_desc.h
index 2593da837c3..aed0da07000 100644
--- a/be/src/vec/exec/format/parquet/schema_desc.h
+++ b/be/src/vec/exec/format/parquet/schema_desc.h
@@ -71,7 +71,7 @@ private:
     std::unordered_map<std::string, const FieldSchema*> _name_to_field;
     // Used in from_thrift, marking the next schema position that should be 
parsed
     size_t _next_schema_pos;
-    std::unordered_map<int, std::string> _field_id_name_mapping;
+    std::map<int, std::string> _field_id_name_mapping;
 
     void parse_physical_field(const tparquet::SchemaElement& physical_schema, 
bool is_nullable,
                               FieldSchema* physical_field);
@@ -135,6 +135,8 @@ public:
 
     bool has_parquet_field_id() const { return _field_id_name_mapping.size() > 
0; }
 
+    std::map<int32, std::string> get_field_id_name_map() { return 
_field_id_name_mapping; }
+
     const doris::Slice get_column_name_from_field_id(int32_t id) const;
 };
 
diff --git a/be/src/vec/exec/format/table/iceberg_reader.cpp 
b/be/src/vec/exec/format/table/iceberg_reader.cpp
index 7dea5d99617..725ecc4dd35 100644
--- a/be/src/vec/exec/format/table/iceberg_reader.cpp
+++ b/be/src/vec/exec/format/table/iceberg_reader.cpp
@@ -120,32 +120,9 @@ Status IcebergTableReader::get_next_block(Block* block, 
size_t* read_rows, bool*
     }
     RETURN_IF_ERROR(_expand_block_if_need(block));
 
-    // To support iceberg schema evolution. We change the column name in block 
to
-    // make it match with the column name in parquet file before reading data. 
and
-    // Set the name back to table column name before return this block.
-    if (_has_schema_change) {
-        for (int i = 0; i < block->columns(); i++) {
-            ColumnWithTypeAndName& col = block->get_by_position(i);
-            auto iter = _table_col_to_file_col.find(col.name);
-            if (iter != _table_col_to_file_col.end()) {
-                col.name = iter->second;
-            }
-        }
-        block->initialize_index_by_name();
-    }
-
+    RETURN_IF_ERROR(TableSchemaChangeHelper::get_next_block_before(block));
     RETURN_IF_ERROR(_file_format_reader->get_next_block(block, read_rows, 
eof));
-    // Set the name back to table column name before return this block.
-    if (_has_schema_change) {
-        for (int i = 0; i < block->columns(); i++) {
-            ColumnWithTypeAndName& col = block->get_by_position(i);
-            auto iter = _file_col_to_table_col.find(col.name);
-            if (iter != _file_col_to_table_col.end()) {
-                col.name = iter->second;
-            }
-        }
-        block->initialize_index_by_name();
-    }
+    RETURN_IF_ERROR(TableSchemaChangeHelper::get_next_block_after(block));
 
     if (_equality_delete_impl != nullptr) {
         RETURN_IF_ERROR(_equality_delete_impl->filter_data_block(block));
@@ -228,8 +205,9 @@ Status IcebergTableReader::_equality_delete_base(
                                                         not_in_file_col_names, 
nullptr, {}, nullptr,
                                                         nullptr, nullptr, 
nullptr, nullptr, false));
         } else if (auto* orc_reader = 
typeid_cast<OrcReader*>(delete_reader.get())) {
-            
RETURN_IF_ERROR(orc_reader->init_reader(&equality_delete_col_names, nullptr, 
{}, false,
-                                                    {}, {}, nullptr, nullptr));
+            RETURN_IF_ERROR(orc_reader->init_reader(&equality_delete_col_names,
+                                                    not_in_file_col_names, 
nullptr, {}, false, {},
+                                                    {}, nullptr, nullptr));
         } else {
             return Status::InternalError("Unsupported format of delete file");
         }
@@ -439,60 +417,6 @@ void 
IcebergTableReader::_sort_delete_rows(std::vector<std::vector<int64_t>*>& d
     }
 }
 
-/*
- * Generate _all_required_col_names and _not_in_file_col_names.
- *
- * _all_required_col_names is all the columns required by user sql.
- * If the column name has been modified after the data file was written,
- * put the old name in data file to _all_required_col_names.
- *
- * _not_in_file_col_names is all the columns required by user sql but not in 
the data file.
- * e.g. New columns added after this data file was written.
- * The columns added with names used by old dropped columns should consider as 
a missing column,
- * which should be in _not_in_file_col_names.
- */
-void IcebergTableReader::_gen_file_col_names() {
-    _all_required_col_names.clear();
-    _not_in_file_col_names.clear();
-    for (int i = 0; i < _file_col_names.size(); ++i) {
-        auto name = _file_col_names[i];
-        auto iter = _table_col_to_file_col.find(name);
-        if (iter == _table_col_to_file_col.end()) {
-            // If the user creates the iceberg table, directly append the 
parquet file that already exists,
-            // there is no 'iceberg.schema' field in the footer of parquet, 
the '_table_col_to_file_col' may be empty.
-            // Because we are ignoring case, so, it is converted to lowercase 
here
-            auto name_low = to_lower(name);
-            _all_required_col_names.emplace_back(name_low);
-            if (_has_iceberg_schema) {
-                _not_in_file_col_names.emplace_back(name);
-            } else {
-                _table_col_to_file_col.emplace(name, name_low);
-                _file_col_to_table_col.emplace(name_low, name);
-                if (name != name_low) {
-                    _has_schema_change = true;
-                }
-            }
-        } else {
-            _all_required_col_names.emplace_back(iter->second);
-        }
-    }
-}
-
-/*
- * Generate _new_colname_to_value_range, by replacing the column name in
- * _colname_to_value_range with column name in data file.
- */
-void IcebergTableReader::_gen_new_colname_to_value_range() {
-    for (auto it = _colname_to_value_range->begin(); it != 
_colname_to_value_range->end(); it++) {
-        auto iter = _table_col_to_file_col.find(it->first);
-        if (iter == _table_col_to_file_col.end()) {
-            _new_colname_to_value_range.emplace(it->first, it->second);
-        } else {
-            _new_colname_to_value_range.emplace(iter->second, it->second);
-        }
-    }
-}
-
 void IcebergTableReader::_gen_position_delete_file_range(Block& block, 
DeleteFile* position_delete,
                                                          size_t read_rows,
                                                          bool 
file_path_column_dictionary_coded) {
@@ -538,13 +462,9 @@ Status IcebergParquetReader::init_reader(
         const std::unordered_map<int, VExprContextSPtrs>* 
slot_id_to_filter_conjuncts) {
     _file_format = Fileformat::PARQUET;
     ParquetReader* parquet_reader = 
static_cast<ParquetReader*>(_file_format_reader.get());
-    _col_id_name_map = col_id_name_map;
-    _file_col_names = file_col_names;
-    _colname_to_value_range = colname_to_value_range;
-    FieldDescriptor field_desc = parquet_reader->get_file_metadata_schema();
-    RETURN_IF_ERROR(_gen_col_name_maps(field_desc));
-    _gen_file_col_names();
-    _gen_new_colname_to_value_range();
+    RETURN_IF_ERROR(TableSchemaChangeHelper::init_schema_info(file_col_names, 
col_id_name_map,
+                                                              
colname_to_value_range));
+
     parquet_reader->set_table_to_file_col_map(_table_col_to_file_col);
     parquet_reader->iceberg_sanitize(_all_required_col_names);
     RETURN_IF_ERROR(init_row_filters(_range, _io_ctx));
@@ -611,18 +531,14 @@ Status IcebergOrcReader::init_reader(
         const std::unordered_map<int, VExprContextSPtrs>* 
slot_id_to_filter_conjuncts) {
     _file_format = Fileformat::ORC;
     auto* orc_reader = static_cast<OrcReader*>(_file_format_reader.get());
-    _col_id_name_map = col_id_name_map;
-    _file_col_names = file_col_names;
-    _colname_to_value_range = colname_to_value_range;
-
-    RETURN_IF_ERROR(_gen_col_name_maps(orc_reader));
-    _gen_file_col_names();
-    _gen_new_colname_to_value_range();
+    RETURN_IF_ERROR(TableSchemaChangeHelper::init_schema_info(file_col_names, 
col_id_name_map,
+                                                              
colname_to_value_range));
     orc_reader->set_table_col_to_file_col(_table_col_to_file_col);
     RETURN_IF_ERROR(init_row_filters(_range, _io_ctx));
-    return orc_reader->init_reader(&_all_required_col_names, 
&_new_colname_to_value_range,
-                                   conjuncts, false, tuple_descriptor, 
row_descriptor,
-                                   not_single_slot_filter_conjuncts, 
slot_id_to_filter_conjuncts);
+    return orc_reader->init_reader(&_all_required_col_names, 
_not_in_file_col_names,
+                                   &_new_colname_to_value_range, conjuncts, 
false, tuple_descriptor,
+                                   row_descriptor, 
not_single_slot_filter_conjuncts,
+                                   slot_id_to_filter_conjuncts);
 }
 
 Status IcebergOrcReader::_read_position_delete_file(const TFileRangeDesc* 
delete_range,
@@ -630,8 +546,9 @@ Status IcebergOrcReader::_read_position_delete_file(const 
TFileRangeDesc* delete
     OrcReader orc_delete_reader(_profile, _state, _params, *delete_range,
                                 READ_DELETE_FILE_BATCH_SIZE, 
_state->timezone(), _io_ctx);
     std::unordered_map<std::string, ColumnValueRangeType> 
colname_to_value_range;
-    RETURN_IF_ERROR(orc_delete_reader.init_reader(&delete_file_col_names, 
&colname_to_value_range,
-                                                  {}, false, {}, {}, nullptr, 
nullptr));
+    RETURN_IF_ERROR(orc_delete_reader.init_reader(&delete_file_col_names, {},
+                                                  &colname_to_value_range, {}, 
false, {}, {},
+                                                  nullptr, nullptr));
 
     std::unordered_map<std::string, std::tuple<std::string, const 
SlotDescriptor*>>
             partition_columns;
@@ -652,61 +569,36 @@ Status IcebergOrcReader::_read_position_delete_file(const 
TFileRangeDesc* delete
     return Status::OK();
 }
 
-/*
- * To support schema evolution, Iceberg write the column id to column name map 
to
- * parquet file key_value_metadata.
- * This function is to compare the table schema from FE (_col_id_name_map) with
- * the schema in key_value_metadata for the current parquet file and generate 
two maps
- * for future use:
- * 1. table column name to parquet column name.
- * 2. parquet column name to table column name.
- * For example, parquet file has a column 'col1',
- * after this file was written, iceberg changed the column name to 'col1_new'.
- * The two maps would contain:
- * 1. col1_new -> col1
- * 2. col1 -> col1_new
- */
-Status IcebergParquetReader::_gen_col_name_maps(const FieldDescriptor& 
field_desc) {
+// To support schema evolution, Iceberg write the column id to column name map 
to parquet file key_value_metadata.
+Status IcebergParquetReader::get_file_col_id_to_name(
+        bool& exist_schema, std::map<int32_t, std::string>& 
file_col_id_to_name) {
+    auto* parquet_reader = 
static_cast<ParquetReader*>(_file_format_reader.get());
+    FieldDescriptor field_desc = parquet_reader->get_file_metadata_schema();
+
     if (field_desc.has_parquet_field_id()) {
-        for (const auto& pair : _col_id_name_map) {
-            auto name_slice = 
field_desc.get_column_name_from_field_id(pair.first);
-            if (name_slice.get_size() == 0) {
-                _has_schema_change = true;
-            } else {
-                auto name_string = name_slice.to_string();
-                _table_col_to_file_col.emplace(pair.second, name_string);
-                _file_col_to_table_col.emplace(name_string, pair.second);
-                if (name_string != pair.second) {
-                    _has_schema_change = true;
-                }
-            }
-        }
+        file_col_id_to_name = field_desc.get_field_id_name_map();
+    } else {
+        //For early iceberg version, it doesn't write any schema information 
to Parquet file.
+        exist_schema = false;
     }
+
     return Status::OK();
 }
 
-Status IcebergOrcReader::_gen_col_name_maps(OrcReader* orc_reader) {
+//To support schema evolution, Iceberg write the column id to orc file 
attribute.
+Status IcebergOrcReader::get_file_col_id_to_name(
+        bool& exist_schema, std::map<int32_t, std::string>& 
file_col_id_to_name) {
+    auto* orc_reader = static_cast<OrcReader*>(_file_format_reader.get());
+
     std::vector<std::string> col_names;
-    std::vector<uint64_t> col_ids;
-    RETURN_IF_ERROR(
-            orc_reader->get_schema_col_name_attribute(&col_names, &col_ids, 
ICEBERG_ORC_ATTRIBUTE));
-    _has_iceberg_schema = true;
-    _table_col_to_file_col.clear();
-    _file_col_to_table_col.clear();
-    for (size_t i = 0; i < col_ids.size(); i++) {
-        auto col_id = col_ids[i];
-        auto& file_col_name = col_names[i];
-
-        if (_col_id_name_map.find(col_id) == _col_id_name_map.end()) {
-            _has_schema_change = true;
-            continue;
-        }
-        auto& table_col_name = _col_id_name_map[col_id];
-        _table_col_to_file_col.emplace(table_col_name, file_col_name);
-        _file_col_to_table_col.emplace(file_col_name, table_col_name);
-        if (table_col_name != file_col_name) {
-            _has_schema_change = true;
-        }
+    std::vector<int32_t> col_ids;
+    RETURN_IF_ERROR(orc_reader->get_schema_col_name_attribute(
+            &col_names, &col_ids, ICEBERG_ORC_ATTRIBUTE, &exist_schema));
+    if (!exist_schema) {
+        return Status::OK();
+    }
+    for (auto i = 0; i < col_names.size(); i++) {
+        file_col_id_to_name.emplace(col_ids[i], std::move(col_names[i]));
     }
     return Status::OK();
 }
diff --git a/be/src/vec/exec/format/table/iceberg_reader.h 
b/be/src/vec/exec/format/table/iceberg_reader.h
index b057cb0657a..61964042386 100644
--- a/be/src/vec/exec/format/table/iceberg_reader.h
+++ b/be/src/vec/exec/format/table/iceberg_reader.h
@@ -67,7 +67,7 @@ class GenericReader;
 class ShardedKVCache;
 class VExprContext;
 
-class IcebergTableReader : public TableFormatReader {
+class IcebergTableReader : public TableFormatReader, public 
TableSchemaChangeHelper {
 public:
     struct PositionDeleteRange {
         std::vector<std::string> data_file_path;
@@ -118,9 +118,6 @@ protected:
 
     PositionDeleteRange _get_range(const ColumnString& file_path_column);
 
-    void _gen_file_col_names();
-
-    void _gen_new_colname_to_value_range();
     static std::string _delet_file_cache_key(const std::string& path) { return 
"delete_" + path; }
 
     Status _position_delete_base(const std::string data_file_path,
@@ -144,28 +141,12 @@ protected:
     ShardedKVCache* _kv_cache;
     IcebergProfile _iceberg_profile;
     std::vector<int64_t> _iceberg_delete_rows;
-    // col names from _file_slot_descs
-    std::vector<std::string> _file_col_names;
-    // file column name to table column name map. For iceberg schema evolution.
-    std::unordered_map<std::string, std::string> _file_col_to_table_col;
-    // table column name to file column name map. For iceberg schema evolution.
-    std::unordered_map<std::string, std::string> _table_col_to_file_col;
-    const std::unordered_map<std::string, ColumnValueRangeType>* 
_colname_to_value_range;
-    // copy from _colname_to_value_range with new column name that is in 
parquet/orc file, to support schema evolution.
-    std::unordered_map<std::string, ColumnValueRangeType> 
_new_colname_to_value_range;
-    // column id to name map. Collect from FE slot descriptor.
-    std::unordered_map<int, std::string> _col_id_name_map;
-    // col names in the parquet,orc file
-    std::vector<std::string> _all_required_col_names;
-    // col names in table but not in parquet,orc file
-    std::vector<std::string> _not_in_file_col_names;
+
     // equality delete should read the primary columns
     std::vector<std::string> _expand_col_names;
     std::vector<ColumnWithTypeAndName> _expand_columns;
 
     io::IOContext* _io_ctx;
-    bool _has_schema_change = false;
-    bool _has_iceberg_schema = false;
 
     // the table level row count for optimizing query like:
     // select count(*) from table;
@@ -220,7 +201,8 @@ public:
         parquet_reader->set_delete_rows(&_iceberg_delete_rows);
     }
 
-    Status _gen_col_name_maps(const FieldDescriptor& field_desc);
+    Status get_file_col_id_to_name(bool& exist_schema,
+                                   std::map<int32_t, std::string>& 
file_col_id_to_name) final;
 
 protected:
     std::unique_ptr<GenericReader> _create_equality_reader(
@@ -258,7 +240,8 @@ public:
             const VExprContextSPtrs* not_single_slot_filter_conjuncts,
             const std::unordered_map<int, VExprContextSPtrs>* 
slot_id_to_filter_conjuncts);
 
-    Status _gen_col_name_maps(OrcReader* orc_reader);
+    Status get_file_col_id_to_name(bool& exist_schema,
+                                   std::map<int32_t, std::string>& 
file_col_id_to_name) final;
 
 protected:
     std::unique_ptr<GenericReader> _create_equality_reader(
diff --git a/be/src/vec/exec/format/table/table_format_reader.cpp 
b/be/src/vec/exec/format/table/table_format_reader.cpp
index ea8111d81b3..8676287ffe6 100644
--- a/be/src/vec/exec/format/table/table_format_reader.cpp
+++ b/be/src/vec/exec/format/table/table_format_reader.cpp
@@ -22,4 +22,107 @@ namespace doris::vectorized {
 TableFormatReader::TableFormatReader(std::unique_ptr<GenericReader> 
file_format_reader)
         : _file_format_reader(std::move(file_format_reader)) {}
 
+Status TableSchemaChangeHelper::init_schema_info(
+        const std::vector<std::string>& read_table_col_names,
+        const std::unordered_map<int32_t, std::string>& table_id_to_name,
+        const std::unordered_map<std::string, ColumnValueRangeType>*
+                table_col_name_to_value_range) {
+    bool exist_schema = true;
+    std::map<int32_t, std::string> file_id_to_name;
+    RETURN_IF_ERROR(get_file_col_id_to_name(exist_schema, file_id_to_name));
+    if (!exist_schema) {
+        file_id_to_name.clear();
+        for (const auto& [table_col_id, table_col_name] : table_id_to_name) {
+            file_id_to_name.emplace(table_col_id, table_col_name);
+        }
+    }
+
+    /** This is to compare the table schema from FE (table_id_to_name) with
+    * the current file schema (file_id_to_name) , generate two maps for future 
use:
+    * 1. table column name to file column name.
+    * 2. file column name to table column name.
+    * For example, file has a column 'col1',
+    * after this file was written, iceberg changed the column name to 
'col1_new'.
+    * The two maps would contain:
+    * 1. col1_new -> col1
+     * 2. col1 -> col1_new
+    */
+    for (const auto& [file_col_id, file_col_name] : file_id_to_name) {
+        if (table_id_to_name.find(file_col_id) == table_id_to_name.end()) {
+            continue;
+        }
+
+        auto& table_col_name = table_id_to_name.at(file_col_id);
+        _table_col_to_file_col.emplace(table_col_name, file_col_name);
+        _file_col_to_table_col.emplace(file_col_name, table_col_name);
+        if (table_col_name != file_col_name) {
+            _has_schema_change = true;
+        }
+    }
+
+    /** Generate _all_required_col_names and _not_in_file_col_names.
+     *
+     * _all_required_col_names is all the columns required by user sql.
+     * If the column name has been modified after the data file was written,
+     * put the old name in data file to _all_required_col_names.
+     *
+     * _not_in_file_col_names is all the columns required by user sql but not 
in the data file.
+     * e.g. New columns added after this data file was written.
+     * The columns added with names used by old dropped columns should 
consider as a missing column,
+     * which should be in _not_in_file_col_names.
+     */
+    _all_required_col_names.clear();
+    _not_in_file_col_names.clear();
+    for (auto table_col_name : read_table_col_names) {
+        auto iter = _table_col_to_file_col.find(table_col_name);
+        if (iter == _table_col_to_file_col.end()) {
+            _all_required_col_names.emplace_back(table_col_name);
+            _not_in_file_col_names.emplace_back(table_col_name);
+        } else {
+            _all_required_col_names.emplace_back(iter->second);
+        }
+    }
+
+    /** Generate _new_colname_to_value_range, by replacing the column name in
+    * _colname_to_value_range with column name in data file.
+    */
+    for (auto& it : *table_col_name_to_value_range) {
+        auto iter = _table_col_to_file_col.find(it.first);
+        if (iter == _table_col_to_file_col.end()) {
+            _new_colname_to_value_range.emplace(it.first, it.second);
+        } else {
+            _new_colname_to_value_range.emplace(iter->second, it.second);
+        }
+    }
+    return Status::OK();
+}
+
+Status TableSchemaChangeHelper::get_next_block_before(Block* block) const {
+    if (_has_schema_change) {
+        for (int i = 0; i < block->columns(); i++) {
+            ColumnWithTypeAndName& col = block->get_by_position(i);
+            auto iter = _table_col_to_file_col.find(col.name);
+            if (iter != _table_col_to_file_col.end()) {
+                col.name = iter->second;
+            }
+        }
+        block->initialize_index_by_name();
+    }
+    return Status::OK();
+}
+
+Status TableSchemaChangeHelper::get_next_block_after(Block* block) const {
+    if (_has_schema_change) {
+        for (int i = 0; i < block->columns(); i++) {
+            ColumnWithTypeAndName& col = block->get_by_position(i);
+            auto iter = _file_col_to_table_col.find(col.name);
+            if (iter != _file_col_to_table_col.end()) {
+                col.name = iter->second;
+            }
+        }
+        block->initialize_index_by_name();
+    }
+    return Status::OK();
+}
+
 } // namespace doris::vectorized
\ No newline at end of file
diff --git a/be/src/vec/exec/format/table/table_format_reader.h 
b/be/src/vec/exec/format/table/table_format_reader.h
index 5a102a7665e..2f6b8742bae 100644
--- a/be/src/vec/exec/format/table/table_format_reader.h
+++ b/be/src/vec/exec/format/table/table_format_reader.h
@@ -25,6 +25,7 @@
 #include <unordered_set>
 
 #include "common/status.h"
+#include "exec/olap_common.h"
 #include "vec/exec/format/generic_reader.h"
 
 namespace doris {
@@ -78,4 +79,44 @@ protected:
     std::unique_ptr<GenericReader> _file_format_reader; // parquet, orc
 };
 
+class TableSchemaChangeHelper {
+public:
+    /** Get the mapping from the unique ID of the column in the current file 
to the file column name.
+     * Iceberg/Hudi/Paimon usually maintains field IDs to support schema 
changes. If you cannot obtain this
+     * information (maybe the old version does not have this information), you 
need to set `exist_schema` = `false`.
+     */
+    virtual Status get_file_col_id_to_name(bool& exist_schema,
+                                           std::map<int, std::string>& 
file_col_id_to_name) = 0;
+
+    virtual ~TableSchemaChangeHelper() = default;
+
+protected:
+    /** table_id_to_name  : table column unique id to table name map */
+    Status init_schema_info(const std::vector<std::string>& 
read_table_col_names,
+                            const std::unordered_map<int32_t, std::string>& 
table_id_to_name,
+                            const std::unordered_map<std::string, 
ColumnValueRangeType>*
+                                    table_col_name_to_value_range);
+
+    /** To support schema evolution. We change the column name in block to
+     * make it match with the column name in file before reading data. and
+     * set the name back to table column name before return this block.
+     */
+    Status get_next_block_before(Block* block) const;
+
+    /** Set the name back to table column name before return this block.*/
+    Status get_next_block_after(Block* block) const;
+
+    // copy from _colname_to_value_range with new column name that is in 
parquet/orc file
+    std::unordered_map<std::string, ColumnValueRangeType> 
_new_colname_to_value_range;
+    // all the columns required by user sql.
+    std::vector<std::string> _all_required_col_names;
+    // col names in table but not in parquet,orc file
+    std::vector<std::string> _not_in_file_col_names;
+    bool _has_schema_change = false;
+    // file column name to table column name map
+    std::unordered_map<std::string, std::string> _file_col_to_table_col;
+    // table column name to file column name map.
+    std::unordered_map<std::string, std::string> _table_col_to_file_col;
+};
+
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/table/transactional_hive_reader.cpp 
b/be/src/vec/exec/format/table/transactional_hive_reader.cpp
index 8be11f6773a..dc13dd51aaf 100644
--- a/be/src/vec/exec/format/table/transactional_hive_reader.cpp
+++ b/be/src/vec/exec/format/table/transactional_hive_reader.cpp
@@ -66,8 +66,8 @@ Status TransactionalHiveReader::init_reader(
     _col_names.insert(_col_names.end(), 
TransactionalHive::READ_ROW_COLUMN_NAMES_LOWER_CASE.begin(),
                       
TransactionalHive::READ_ROW_COLUMN_NAMES_LOWER_CASE.end());
     Status status = orc_reader->init_reader(
-            &_col_names, colname_to_value_range, conjuncts, true, 
tuple_descriptor, row_descriptor,
-            not_single_slot_filter_conjuncts, slot_id_to_filter_conjuncts);
+            &_col_names, {}, colname_to_value_range, conjuncts, true, 
tuple_descriptor,
+            row_descriptor, not_single_slot_filter_conjuncts, 
slot_id_to_filter_conjuncts);
     return status;
 }
 
@@ -129,9 +129,9 @@ Status TransactionalHiveReader::init_row_filters(const 
TFileRangeDesc& range,
         OrcReader delete_reader(_profile, _state, _params, delete_range, 
_MIN_BATCH_SIZE,
                                 _state->timezone(), _io_ctx, false);
 
-        RETURN_IF_ERROR(
-                
delete_reader.init_reader(&TransactionalHive::DELETE_ROW_COLUMN_NAMES_LOWER_CASE,
-                                          nullptr, {}, false, nullptr, 
nullptr, nullptr, nullptr));
+        RETURN_IF_ERROR(delete_reader.init_reader(
+                &TransactionalHive::DELETE_ROW_COLUMN_NAMES_LOWER_CASE, {}, 
nullptr, {}, false,
+                nullptr, nullptr, nullptr, nullptr));
 
         std::unordered_map<std::string, std::tuple<std::string, const 
SlotDescriptor*>>
                 partition_columns;
diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp 
b/be/src/vec/exec/scan/vfile_scanner.cpp
index a22777672b6..7cbe6116b63 100644
--- a/be/src/vec/exec/scan/vfile_scanner.cpp
+++ b/be/src/vec/exec/scan/vfile_scanner.cpp
@@ -881,7 +881,7 @@ Status VFileScanner::_get_next_reader() {
             } else if (range.__isset.table_format_params &&
                        range.table_format_params.table_format_type == 
"paimon") {
                 init_status = orc_reader->init_reader(
-                        &_file_col_names, _colname_to_value_range, 
_push_down_conjuncts, false,
+                        &_file_col_names, {}, _colname_to_value_range, 
_push_down_conjuncts, false,
                         _real_tuple_desc, _default_val_row_desc.get(),
                         &_not_single_slot_filter_conjuncts, 
&_slot_id_to_filter_conjuncts);
                 std::unique_ptr<PaimonOrcReader> paimon_reader =
@@ -897,7 +897,7 @@ Status VFileScanner::_get_next_reader() {
                     hive_orc_use_column_names = 
_state->query_options().hive_orc_use_column_names;
                 }
                 init_status = orc_reader->init_reader(
-                        &_file_col_names, _colname_to_value_range, 
_push_down_conjuncts, false,
+                        &_file_col_names, {}, _colname_to_value_range, 
_push_down_conjuncts, false,
                         _real_tuple_desc, _default_val_row_desc.get(),
                         &_not_single_slot_filter_conjuncts, 
&_slot_id_to_filter_conjuncts,
                         hive_orc_use_column_names);
diff --git 
a/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/iceberg/run08.sql
 
b/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/iceberg/run08.sql
new file mode 100644
index 00000000000..1a3d844ef60
--- /dev/null
+++ 
b/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/iceberg/run08.sql
@@ -0,0 +1,48 @@
+
+use demo.test_db;
+
+CREATE TABLE sc_drop_add_orc (
+    id BIGINT,
+    name STRING,
+    age INT
+)
+USING iceberg
+PARTITIONED BY (id)
+TBLPROPERTIES ('format'='orc');
+
+INSERT INTO sc_drop_add_orc VALUES (1, 'Alice', 25);
+INSERT INTO sc_drop_add_orc VALUES (2, 'Bob', 30);
+
+ALTER TABLE sc_drop_add_orc DROP COLUMN age;
+
+INSERT INTO sc_drop_add_orc (id, name) VALUES (3, 'Charlie');
+INSERT INTO sc_drop_add_orc (id, name) VALUES (4, 'David');
+
+ALTER TABLE sc_drop_add_orc ADD COLUMN age INT;
+
+INSERT INTO sc_drop_add_orc VALUES (5, 'Eve', 28);
+INSERT INTO sc_drop_add_orc VALUES (6, 'Frank', 35);
+
+
+
+CREATE TABLE sc_drop_add_parquet (
+    id BIGINT,
+    name STRING,
+    age INT
+)
+USING iceberg
+PARTITIONED BY (id)
+TBLPROPERTIES ('format'='parquet');
+
+INSERT INTO sc_drop_add_parquet VALUES (1, 'Alice', 25);
+INSERT INTO sc_drop_add_parquet VALUES (2, 'Bob', 30);
+
+ALTER TABLE sc_drop_add_parquet DROP COLUMN age;
+
+INSERT INTO sc_drop_add_parquet (id, name) VALUES (3, 'Charlie');
+INSERT INTO sc_drop_add_parquet (id, name) VALUES (4, 'David');
+
+ALTER TABLE sc_drop_add_parquet ADD COLUMN age INT;
+
+INSERT INTO sc_drop_add_parquet VALUES (5, 'Eve', 28);
+INSERT INTO sc_drop_add_parquet VALUES (6, 'Frank', 35);
\ No newline at end of file
diff --git 
a/regression-test/data/external_table_p0/iceberg/iceberg_schema_change2.out 
b/regression-test/data/external_table_p0/iceberg/iceberg_schema_change2.out
new file mode 100644
index 00000000000..d68cde9a50e
Binary files /dev/null and 
b/regression-test/data/external_table_p0/iceberg/iceberg_schema_change2.out 
differ
diff --git 
a/regression-test/suites/external_table_p0/iceberg/iceberg_schema_change2.groovy
 
b/regression-test/suites/external_table_p0/iceberg/iceberg_schema_change2.groovy
new file mode 100644
index 00000000000..295d14b246e
--- /dev/null
+++ 
b/regression-test/suites/external_table_p0/iceberg/iceberg_schema_change2.groovy
@@ -0,0 +1,66 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("iceberg_schema_change2", 
"p0,external,doris,external_docker,external_docker_doris") {
+
+    String enabled = context.config.otherConfigs.get("enableIcebergTest")
+    if (enabled == null || !enabled.equalsIgnoreCase("true")) {
+        logger.info("disable iceberg test.")
+        return
+    }
+
+    String catalog_name = "iceberg_schema_change2"
+    String rest_port = context.config.otherConfigs.get("iceberg_rest_uri_port")
+    String minio_port = context.config.otherConfigs.get("iceberg_minio_port")
+    String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+
+    sql """drop catalog if exists ${catalog_name}"""
+    sql """
+    CREATE CATALOG ${catalog_name} PROPERTIES (
+        'type'='iceberg',
+        'iceberg.catalog.type'='rest',
+        'uri' = 'http://${externalEnvIp}:${rest_port}',
+        "s3.access_key" = "admin",
+        "s3.secret_key" = "password",
+        "s3.endpoint" = "http://${externalEnvIp}:${minio_port}";,
+        "s3.region" = "us-east-1"
+    );"""
+
+    logger.info("catalog " + catalog_name + " created")
+    sql """switch ${catalog_name};"""
+    logger.info("switched to catalog " + catalog_name)
+    sql """ use test_db;""" 
+
+    qt_parquet_1 """ select * from sc_drop_add_parquet order by id; """
+    qt_parquet_2 """ select * from sc_drop_add_parquet where age is NULL order 
by id; """
+    qt_parquet_3 """ select * from sc_drop_add_parquet where age is not NULL 
order by id; """
+    qt_parquet_4 """ select * from sc_drop_add_parquet where age > 28 order by 
id; """
+    qt_parquet_5 """ select * from sc_drop_add_parquet where age >= 28 order 
by id; """
+    qt_parquet_6 """ select id, name from sc_drop_add_parquet where age >= 28 
order by id; """
+    qt_parquet_7 """ select id, age from sc_drop_add_parquet where name="Eve" 
order by id; """
+
+
+
+    qt_orc_1 """ select * from sc_drop_add_orc order by id; """
+    qt_orc_2 """ select * from sc_drop_add_orc where age is NULL order by id; 
"""
+    qt_orc_3 """ select * from sc_drop_add_orc where age is not NULL order by 
id; """
+    qt_orc_4 """ select * from sc_drop_add_orc where age > 28 order by id; """
+    qt_orc_5 """ select * from sc_drop_add_orc where age >= 28 order by id; """
+    qt_orc_6 """ select id, name from sc_drop_add_orc where age >= 28 order by 
id; """
+    qt_orc_7 """ select id, age from sc_drop_add_orc where name="Eve" order by 
id; """
+
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to