This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new a9ebcb80f05 branch-3.0:[fix](orc)fix core that iceberg-orc reader read
miss iceberg.id orc file.(#49051) (#54167)
a9ebcb80f05 is described below
commit a9ebcb80f0555713fc69b5afdd6811edb95f3655
Author: daidai <[email protected]>
AuthorDate: Tue Aug 12 11:09:02 2025 +0800
branch-3.0:[fix](orc)fix core that iceberg-orc reader read miss iceberg.id
orc file.(#49051) (#54167)
### What problem does this PR solve?
pick #49051
but only fix:
```
terminate called after throwing an instance of 'std::range_error'
what(): Key not found: iceberg.id
*** Query id: 6a93d7cdc9f44370-a40b07934a14c81b ***
*** is nereids: 1 ***
*** tablet id: 0 ***
*** Aborted at 1753842428 (unix time) try "date -d @1753842428" if you are
using GNU date ***
*** Current BE git commitID: 910c4249c5 ***
*** SIGABRT unknown detail explain (@0x5a46f) received by PID 369775 (TID
371694 OR 0x7fad067ef640) from PID 369775; stack trace: ***
terminate called recursively
terminate called recursively
terminate called recursively
terminate called recursively
0# doris::signal::(anonymous namespace)::FailureSignalHandler(int,
siginfo_t*, void*) at
/home/zcp/repo_center/doris_release/doris/be/src/common/signal_handler.h:421
1# 0x00007FB12263EBF0 in /lib64/libc.so.6
2# __pthread_kill_implementation in /lib64/libc.so.6
3# gsignal in /lib64/libc.so.6
4# abort in /lib64/libc.so.6
5# __gnu_cxx::__verbose_terminate_handler() [clone .cold] at
../../../../libstdc++-v3/libsupc++/vterminate.cc:75
6# __cxxabiv1::__terminate(void (*)()) at
../../../../libstdc++-v3/libsupc++/eh_terminate.cc:48
7# 0x000055C047B28EC1 in /opt/apache-doris-3.0.6.2-bin-x64/be/lib/doris_be
8# 0x000055C047B29014 in /opt/apache-doris-3.0.6.2-bin-x64/be/lib/doris_be
9# orc::TypeImpl::getAttributeValue(std::__cxx11::basic_string<char,
std::char_traits<char>, std::allocator<char> > const&) const in
/opt/apache-doris-3.0.6.2-bin-x64/be/lib/doris_be
10#
doris::vectorized::OrcReader::get_schema_col_name_attribute(std::vector<std::__cxx11::basic_string<char,
std::char_traits<char>, std::allocator<char> >,
std::allocator<std::__cxx11::basic_string<char, std::char_traits<char>,
std::allocator<char> > > >*, std::vector<unsigned long, std::allocator<unsigned
long> >*, std::__cxx11::basic_string<char, std::char_traits<char>,
std::allocator<char> >) at
/home/zcp/repo_center/doris_release/doris/be/src/vec/exec/format/orc/vorc_reader.cpp:332
11#
doris::vectorized::IcebergOrcReader::_gen_col_name_maps(doris::vectorized::OrcReader*)
at
```
---
be/src/vec/exec/format/orc/vorc_reader.cpp | 28 ++-
be/src/vec/exec/format/orc/vorc_reader.h | 11 +-
be/src/vec/exec/format/parquet/schema_desc.h | 4 +-
be/src/vec/exec/format/table/iceberg_reader.cpp | 190 +++++----------------
be/src/vec/exec/format/table/iceberg_reader.h | 29 +---
.../vec/exec/format/table/table_format_reader.cpp | 103 +++++++++++
be/src/vec/exec/format/table/table_format_reader.h | 41 +++++
.../format/table/transactional_hive_reader.cpp | 10 +-
be/src/vec/exec/scan/vfile_scanner.cpp | 4 +-
.../create_preinstalled_scripts/iceberg/run08.sql | 48 ++++++
.../iceberg/iceberg_schema_change2.out | Bin 0 -> 673 bytes
.../iceberg/iceberg_schema_change2.groovy | 66 +++++++
12 files changed, 347 insertions(+), 187 deletions(-)
diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp
b/be/src/vec/exec/format/orc/vorc_reader.cpp
index acd979678fb..88c073866db 100644
--- a/be/src/vec/exec/format/orc/vorc_reader.cpp
+++ b/be/src/vec/exec/format/orc/vorc_reader.cpp
@@ -250,6 +250,10 @@ void OrcReader::_init_profile() {
}
Status OrcReader::_create_file_reader() {
+ if (_reader != nullptr) {
+ return Status::OK();
+ }
+
if (_file_input_stream == nullptr) {
_file_description.mtime =
_scan_range.__isset.modification_time ?
_scan_range.modification_time : 0;
@@ -289,6 +293,7 @@ Status OrcReader::_create_file_reader() {
Status OrcReader::init_reader(
const std::vector<std::string>* column_names,
+ const std::vector<std::string>& missing_column_names,
const std::unordered_map<std::string, ColumnValueRangeType>*
colname_to_value_range,
const VExprContextSPtrs& conjuncts, bool is_acid, const
TupleDescriptor* tuple_descriptor,
const RowDescriptor* row_descriptor,
@@ -296,6 +301,7 @@ Status OrcReader::init_reader(
const std::unordered_map<int, VExprContextSPtrs>*
slot_id_to_filter_conjuncts,
const bool hive_use_column_names) {
_column_names = column_names;
+ _missing_column_names_set.insert(missing_column_names.begin(),
missing_column_names.end());
_colname_to_value_range = colname_to_value_range;
_lazy_read_ctx.conjuncts = conjuncts;
_is_acid = is_acid;
@@ -332,14 +338,21 @@ Status
OrcReader::get_parsed_schema(std::vector<std::string>* col_names,
}
Status OrcReader::get_schema_col_name_attribute(std::vector<std::string>*
col_names,
- std::vector<uint64_t>*
col_attributes,
- std::string attribute) {
+ std::vector<int32_t>*
col_attributes,
+ const std::string& attribute,
+ bool* exist_attribute) {
RETURN_IF_ERROR(_create_file_reader());
- auto& root_type = _is_acid ? _remove_acid(_reader->getType()) :
_reader->getType();
+ *exist_attribute = true;
+ const auto& root_type = _reader->getType();
for (int i = 0; i < root_type.getSubtypeCount(); ++i) {
col_names->emplace_back(get_field_name_lower_case(&root_type, i));
+
+ if (!root_type.getSubtype(i)->hasAttributeKey(attribute)) {
+ *exist_attribute = false;
+ return Status::OK();
+ }
col_attributes->emplace_back(
-
std::stol(root_type.getSubtype(i)->getAttributeValue(attribute)));
+
std::stoi(root_type.getSubtype(i)->getAttributeValue(attribute)));
}
return Status::OK();
}
@@ -355,8 +368,15 @@ Status OrcReader::_init_read_columns() {
// TODO, should be removed in 2.2 or later
_is_hive1_orc_or_use_idx = (is_hive1_orc || _is_hive1_orc_or_use_idx) &&
_scan_params.__isset.slot_name_to_schema_pos;
+
for (size_t i = 0; i < _column_names->size(); ++i) {
auto& col_name = (*_column_names)[i];
+
+ if (_missing_column_names_set.contains(col_name)) {
+ _missing_cols.emplace_back(col_name);
+ continue;
+ }
+
if (_is_hive1_orc_or_use_idx) {
auto iter = _scan_params.slot_name_to_schema_pos.find(col_name);
if (iter != _scan_params.slot_name_to_schema_pos.end()) {
diff --git a/be/src/vec/exec/format/orc/vorc_reader.h
b/be/src/vec/exec/format/orc/vorc_reader.h
index 551c000dca3..77cc22b54ed 100644
--- a/be/src/vec/exec/format/orc/vorc_reader.h
+++ b/be/src/vec/exec/format/orc/vorc_reader.h
@@ -144,6 +144,7 @@ public:
//If you want to read the file by index instead of column name, set
hive_use_column_names to false.
Status init_reader(
const std::vector<std::string>* column_names,
+ const std::vector<std::string>& missing_column_names,
const std::unordered_map<std::string, ColumnValueRangeType>*
colname_to_value_range,
const VExprContextSPtrs& conjuncts, bool is_acid,
const TupleDescriptor* tuple_descriptor, const RowDescriptor*
row_descriptor,
@@ -184,8 +185,8 @@ public:
std::vector<TypeDescriptor>* col_types) override;
Status get_schema_col_name_attribute(std::vector<std::string>* col_names,
- std::vector<uint64_t>* col_attributes,
- std::string attribute);
+ std::vector<int32_t>* col_attributes,
+ const std::string& attribute, bool*
exist_attribute);
void set_table_col_to_file_col(
std::unordered_map<std::string, std::string>
table_col_to_file_col) {
_table_col_to_file_col = table_col_to_file_col;
@@ -566,6 +567,10 @@ private:
int64_t _range_size;
const std::string& _ctz;
const std::vector<std::string>* _column_names;
+ // _missing_column_names_set: used in iceberg/hudi/paimon, the columns are
dropped
+ // but added back(drop column a then add column a). Shouldn't read this
column data in this case.
+ std::set<std::string> _missing_column_names_set;
+
int32_t _offset_days = 0;
cctz::time_zone _time_zone;
@@ -590,7 +595,7 @@ private:
OrcProfile _orc_profile;
std::unique_ptr<orc::ColumnVectorBatch> _batch;
- std::unique_ptr<orc::Reader> _reader;
+ std::unique_ptr<orc::Reader> _reader = nullptr;
std::unique_ptr<orc::RowReader> _row_reader;
std::unique_ptr<ORCFilterImpl> _orc_filter;
orc::RowReaderOptions _row_reader_options;
diff --git a/be/src/vec/exec/format/parquet/schema_desc.h
b/be/src/vec/exec/format/parquet/schema_desc.h
index 2593da837c3..aed0da07000 100644
--- a/be/src/vec/exec/format/parquet/schema_desc.h
+++ b/be/src/vec/exec/format/parquet/schema_desc.h
@@ -71,7 +71,7 @@ private:
std::unordered_map<std::string, const FieldSchema*> _name_to_field;
// Used in from_thrift, marking the next schema position that should be
parsed
size_t _next_schema_pos;
- std::unordered_map<int, std::string> _field_id_name_mapping;
+ std::map<int, std::string> _field_id_name_mapping;
void parse_physical_field(const tparquet::SchemaElement& physical_schema,
bool is_nullable,
FieldSchema* physical_field);
@@ -135,6 +135,8 @@ public:
bool has_parquet_field_id() const { return _field_id_name_mapping.size() >
0; }
+ std::map<int32, std::string> get_field_id_name_map() { return
_field_id_name_mapping; }
+
const doris::Slice get_column_name_from_field_id(int32_t id) const;
};
diff --git a/be/src/vec/exec/format/table/iceberg_reader.cpp
b/be/src/vec/exec/format/table/iceberg_reader.cpp
index 7dea5d99617..725ecc4dd35 100644
--- a/be/src/vec/exec/format/table/iceberg_reader.cpp
+++ b/be/src/vec/exec/format/table/iceberg_reader.cpp
@@ -120,32 +120,9 @@ Status IcebergTableReader::get_next_block(Block* block,
size_t* read_rows, bool*
}
RETURN_IF_ERROR(_expand_block_if_need(block));
- // To support iceberg schema evolution. We change the column name in block
to
- // make it match with the column name in parquet file before reading data.
and
- // Set the name back to table column name before return this block.
- if (_has_schema_change) {
- for (int i = 0; i < block->columns(); i++) {
- ColumnWithTypeAndName& col = block->get_by_position(i);
- auto iter = _table_col_to_file_col.find(col.name);
- if (iter != _table_col_to_file_col.end()) {
- col.name = iter->second;
- }
- }
- block->initialize_index_by_name();
- }
-
+ RETURN_IF_ERROR(TableSchemaChangeHelper::get_next_block_before(block));
RETURN_IF_ERROR(_file_format_reader->get_next_block(block, read_rows,
eof));
- // Set the name back to table column name before return this block.
- if (_has_schema_change) {
- for (int i = 0; i < block->columns(); i++) {
- ColumnWithTypeAndName& col = block->get_by_position(i);
- auto iter = _file_col_to_table_col.find(col.name);
- if (iter != _file_col_to_table_col.end()) {
- col.name = iter->second;
- }
- }
- block->initialize_index_by_name();
- }
+ RETURN_IF_ERROR(TableSchemaChangeHelper::get_next_block_after(block));
if (_equality_delete_impl != nullptr) {
RETURN_IF_ERROR(_equality_delete_impl->filter_data_block(block));
@@ -228,8 +205,9 @@ Status IcebergTableReader::_equality_delete_base(
not_in_file_col_names,
nullptr, {}, nullptr,
nullptr, nullptr,
nullptr, nullptr, false));
} else if (auto* orc_reader =
typeid_cast<OrcReader*>(delete_reader.get())) {
-
RETURN_IF_ERROR(orc_reader->init_reader(&equality_delete_col_names, nullptr,
{}, false,
- {}, {}, nullptr, nullptr));
+ RETURN_IF_ERROR(orc_reader->init_reader(&equality_delete_col_names,
+ not_in_file_col_names,
nullptr, {}, false, {},
+ {}, nullptr, nullptr));
} else {
return Status::InternalError("Unsupported format of delete file");
}
@@ -439,60 +417,6 @@ void
IcebergTableReader::_sort_delete_rows(std::vector<std::vector<int64_t>*>& d
}
}
-/*
- * Generate _all_required_col_names and _not_in_file_col_names.
- *
- * _all_required_col_names is all the columns required by user sql.
- * If the column name has been modified after the data file was written,
- * put the old name in data file to _all_required_col_names.
- *
- * _not_in_file_col_names is all the columns required by user sql but not in
the data file.
- * e.g. New columns added after this data file was written.
- * The columns added with names used by old dropped columns should consider as
a missing column,
- * which should be in _not_in_file_col_names.
- */
-void IcebergTableReader::_gen_file_col_names() {
- _all_required_col_names.clear();
- _not_in_file_col_names.clear();
- for (int i = 0; i < _file_col_names.size(); ++i) {
- auto name = _file_col_names[i];
- auto iter = _table_col_to_file_col.find(name);
- if (iter == _table_col_to_file_col.end()) {
- // If the user creates the iceberg table, directly append the
parquet file that already exists,
- // there is no 'iceberg.schema' field in the footer of parquet,
the '_table_col_to_file_col' may be empty.
- // Because we are ignoring case, so, it is converted to lowercase
here
- auto name_low = to_lower(name);
- _all_required_col_names.emplace_back(name_low);
- if (_has_iceberg_schema) {
- _not_in_file_col_names.emplace_back(name);
- } else {
- _table_col_to_file_col.emplace(name, name_low);
- _file_col_to_table_col.emplace(name_low, name);
- if (name != name_low) {
- _has_schema_change = true;
- }
- }
- } else {
- _all_required_col_names.emplace_back(iter->second);
- }
- }
-}
-
-/*
- * Generate _new_colname_to_value_range, by replacing the column name in
- * _colname_to_value_range with column name in data file.
- */
-void IcebergTableReader::_gen_new_colname_to_value_range() {
- for (auto it = _colname_to_value_range->begin(); it !=
_colname_to_value_range->end(); it++) {
- auto iter = _table_col_to_file_col.find(it->first);
- if (iter == _table_col_to_file_col.end()) {
- _new_colname_to_value_range.emplace(it->first, it->second);
- } else {
- _new_colname_to_value_range.emplace(iter->second, it->second);
- }
- }
-}
-
void IcebergTableReader::_gen_position_delete_file_range(Block& block,
DeleteFile* position_delete,
size_t read_rows,
bool
file_path_column_dictionary_coded) {
@@ -538,13 +462,9 @@ Status IcebergParquetReader::init_reader(
const std::unordered_map<int, VExprContextSPtrs>*
slot_id_to_filter_conjuncts) {
_file_format = Fileformat::PARQUET;
ParquetReader* parquet_reader =
static_cast<ParquetReader*>(_file_format_reader.get());
- _col_id_name_map = col_id_name_map;
- _file_col_names = file_col_names;
- _colname_to_value_range = colname_to_value_range;
- FieldDescriptor field_desc = parquet_reader->get_file_metadata_schema();
- RETURN_IF_ERROR(_gen_col_name_maps(field_desc));
- _gen_file_col_names();
- _gen_new_colname_to_value_range();
+ RETURN_IF_ERROR(TableSchemaChangeHelper::init_schema_info(file_col_names,
col_id_name_map,
+
colname_to_value_range));
+
parquet_reader->set_table_to_file_col_map(_table_col_to_file_col);
parquet_reader->iceberg_sanitize(_all_required_col_names);
RETURN_IF_ERROR(init_row_filters(_range, _io_ctx));
@@ -611,18 +531,14 @@ Status IcebergOrcReader::init_reader(
const std::unordered_map<int, VExprContextSPtrs>*
slot_id_to_filter_conjuncts) {
_file_format = Fileformat::ORC;
auto* orc_reader = static_cast<OrcReader*>(_file_format_reader.get());
- _col_id_name_map = col_id_name_map;
- _file_col_names = file_col_names;
- _colname_to_value_range = colname_to_value_range;
-
- RETURN_IF_ERROR(_gen_col_name_maps(orc_reader));
- _gen_file_col_names();
- _gen_new_colname_to_value_range();
+ RETURN_IF_ERROR(TableSchemaChangeHelper::init_schema_info(file_col_names,
col_id_name_map,
+
colname_to_value_range));
orc_reader->set_table_col_to_file_col(_table_col_to_file_col);
RETURN_IF_ERROR(init_row_filters(_range, _io_ctx));
- return orc_reader->init_reader(&_all_required_col_names,
&_new_colname_to_value_range,
- conjuncts, false, tuple_descriptor,
row_descriptor,
- not_single_slot_filter_conjuncts,
slot_id_to_filter_conjuncts);
+ return orc_reader->init_reader(&_all_required_col_names,
_not_in_file_col_names,
+ &_new_colname_to_value_range, conjuncts,
false, tuple_descriptor,
+ row_descriptor,
not_single_slot_filter_conjuncts,
+ slot_id_to_filter_conjuncts);
}
Status IcebergOrcReader::_read_position_delete_file(const TFileRangeDesc*
delete_range,
@@ -630,8 +546,9 @@ Status IcebergOrcReader::_read_position_delete_file(const
TFileRangeDesc* delete
OrcReader orc_delete_reader(_profile, _state, _params, *delete_range,
READ_DELETE_FILE_BATCH_SIZE,
_state->timezone(), _io_ctx);
std::unordered_map<std::string, ColumnValueRangeType>
colname_to_value_range;
- RETURN_IF_ERROR(orc_delete_reader.init_reader(&delete_file_col_names,
&colname_to_value_range,
- {}, false, {}, {}, nullptr,
nullptr));
+ RETURN_IF_ERROR(orc_delete_reader.init_reader(&delete_file_col_names, {},
+ &colname_to_value_range, {},
false, {}, {},
+ nullptr, nullptr));
std::unordered_map<std::string, std::tuple<std::string, const
SlotDescriptor*>>
partition_columns;
@@ -652,61 +569,36 @@ Status IcebergOrcReader::_read_position_delete_file(const
TFileRangeDesc* delete
return Status::OK();
}
-/*
- * To support schema evolution, Iceberg write the column id to column name map
to
- * parquet file key_value_metadata.
- * This function is to compare the table schema from FE (_col_id_name_map) with
- * the schema in key_value_metadata for the current parquet file and generate
two maps
- * for future use:
- * 1. table column name to parquet column name.
- * 2. parquet column name to table column name.
- * For example, parquet file has a column 'col1',
- * after this file was written, iceberg changed the column name to 'col1_new'.
- * The two maps would contain:
- * 1. col1_new -> col1
- * 2. col1 -> col1_new
- */
-Status IcebergParquetReader::_gen_col_name_maps(const FieldDescriptor&
field_desc) {
+// To support schema evolution, Iceberg write the column id to column name map
to parquet file key_value_metadata.
+Status IcebergParquetReader::get_file_col_id_to_name(
+ bool& exist_schema, std::map<int32_t, std::string>&
file_col_id_to_name) {
+ auto* parquet_reader =
static_cast<ParquetReader*>(_file_format_reader.get());
+ FieldDescriptor field_desc = parquet_reader->get_file_metadata_schema();
+
if (field_desc.has_parquet_field_id()) {
- for (const auto& pair : _col_id_name_map) {
- auto name_slice =
field_desc.get_column_name_from_field_id(pair.first);
- if (name_slice.get_size() == 0) {
- _has_schema_change = true;
- } else {
- auto name_string = name_slice.to_string();
- _table_col_to_file_col.emplace(pair.second, name_string);
- _file_col_to_table_col.emplace(name_string, pair.second);
- if (name_string != pair.second) {
- _has_schema_change = true;
- }
- }
- }
+ file_col_id_to_name = field_desc.get_field_id_name_map();
+ } else {
+ //For early iceberg version, it doesn't write any schema information
to Parquet file.
+ exist_schema = false;
}
+
return Status::OK();
}
-Status IcebergOrcReader::_gen_col_name_maps(OrcReader* orc_reader) {
+//To support schema evolution, Iceberg write the column id to orc file
attribute.
+Status IcebergOrcReader::get_file_col_id_to_name(
+ bool& exist_schema, std::map<int32_t, std::string>&
file_col_id_to_name) {
+ auto* orc_reader = static_cast<OrcReader*>(_file_format_reader.get());
+
std::vector<std::string> col_names;
- std::vector<uint64_t> col_ids;
- RETURN_IF_ERROR(
- orc_reader->get_schema_col_name_attribute(&col_names, &col_ids,
ICEBERG_ORC_ATTRIBUTE));
- _has_iceberg_schema = true;
- _table_col_to_file_col.clear();
- _file_col_to_table_col.clear();
- for (size_t i = 0; i < col_ids.size(); i++) {
- auto col_id = col_ids[i];
- auto& file_col_name = col_names[i];
-
- if (_col_id_name_map.find(col_id) == _col_id_name_map.end()) {
- _has_schema_change = true;
- continue;
- }
- auto& table_col_name = _col_id_name_map[col_id];
- _table_col_to_file_col.emplace(table_col_name, file_col_name);
- _file_col_to_table_col.emplace(file_col_name, table_col_name);
- if (table_col_name != file_col_name) {
- _has_schema_change = true;
- }
+ std::vector<int32_t> col_ids;
+ RETURN_IF_ERROR(orc_reader->get_schema_col_name_attribute(
+ &col_names, &col_ids, ICEBERG_ORC_ATTRIBUTE, &exist_schema));
+ if (!exist_schema) {
+ return Status::OK();
+ }
+ for (auto i = 0; i < col_names.size(); i++) {
+ file_col_id_to_name.emplace(col_ids[i], std::move(col_names[i]));
}
return Status::OK();
}
diff --git a/be/src/vec/exec/format/table/iceberg_reader.h
b/be/src/vec/exec/format/table/iceberg_reader.h
index b057cb0657a..61964042386 100644
--- a/be/src/vec/exec/format/table/iceberg_reader.h
+++ b/be/src/vec/exec/format/table/iceberg_reader.h
@@ -67,7 +67,7 @@ class GenericReader;
class ShardedKVCache;
class VExprContext;
-class IcebergTableReader : public TableFormatReader {
+class IcebergTableReader : public TableFormatReader, public
TableSchemaChangeHelper {
public:
struct PositionDeleteRange {
std::vector<std::string> data_file_path;
@@ -118,9 +118,6 @@ protected:
PositionDeleteRange _get_range(const ColumnString& file_path_column);
- void _gen_file_col_names();
-
- void _gen_new_colname_to_value_range();
static std::string _delet_file_cache_key(const std::string& path) { return
"delete_" + path; }
Status _position_delete_base(const std::string data_file_path,
@@ -144,28 +141,12 @@ protected:
ShardedKVCache* _kv_cache;
IcebergProfile _iceberg_profile;
std::vector<int64_t> _iceberg_delete_rows;
- // col names from _file_slot_descs
- std::vector<std::string> _file_col_names;
- // file column name to table column name map. For iceberg schema evolution.
- std::unordered_map<std::string, std::string> _file_col_to_table_col;
- // table column name to file column name map. For iceberg schema evolution.
- std::unordered_map<std::string, std::string> _table_col_to_file_col;
- const std::unordered_map<std::string, ColumnValueRangeType>*
_colname_to_value_range;
- // copy from _colname_to_value_range with new column name that is in
parquet/orc file, to support schema evolution.
- std::unordered_map<std::string, ColumnValueRangeType>
_new_colname_to_value_range;
- // column id to name map. Collect from FE slot descriptor.
- std::unordered_map<int, std::string> _col_id_name_map;
- // col names in the parquet,orc file
- std::vector<std::string> _all_required_col_names;
- // col names in table but not in parquet,orc file
- std::vector<std::string> _not_in_file_col_names;
+
// equality delete should read the primary columns
std::vector<std::string> _expand_col_names;
std::vector<ColumnWithTypeAndName> _expand_columns;
io::IOContext* _io_ctx;
- bool _has_schema_change = false;
- bool _has_iceberg_schema = false;
// the table level row count for optimizing query like:
// select count(*) from table;
@@ -220,7 +201,8 @@ public:
parquet_reader->set_delete_rows(&_iceberg_delete_rows);
}
- Status _gen_col_name_maps(const FieldDescriptor& field_desc);
+ Status get_file_col_id_to_name(bool& exist_schema,
+ std::map<int32_t, std::string>&
file_col_id_to_name) final;
protected:
std::unique_ptr<GenericReader> _create_equality_reader(
@@ -258,7 +240,8 @@ public:
const VExprContextSPtrs* not_single_slot_filter_conjuncts,
const std::unordered_map<int, VExprContextSPtrs>*
slot_id_to_filter_conjuncts);
- Status _gen_col_name_maps(OrcReader* orc_reader);
+ Status get_file_col_id_to_name(bool& exist_schema,
+ std::map<int32_t, std::string>&
file_col_id_to_name) final;
protected:
std::unique_ptr<GenericReader> _create_equality_reader(
diff --git a/be/src/vec/exec/format/table/table_format_reader.cpp
b/be/src/vec/exec/format/table/table_format_reader.cpp
index ea8111d81b3..8676287ffe6 100644
--- a/be/src/vec/exec/format/table/table_format_reader.cpp
+++ b/be/src/vec/exec/format/table/table_format_reader.cpp
@@ -22,4 +22,107 @@ namespace doris::vectorized {
TableFormatReader::TableFormatReader(std::unique_ptr<GenericReader>
file_format_reader)
: _file_format_reader(std::move(file_format_reader)) {}
+Status TableSchemaChangeHelper::init_schema_info(
+ const std::vector<std::string>& read_table_col_names,
+ const std::unordered_map<int32_t, std::string>& table_id_to_name,
+ const std::unordered_map<std::string, ColumnValueRangeType>*
+ table_col_name_to_value_range) {
+ bool exist_schema = true;
+ std::map<int32_t, std::string> file_id_to_name;
+ RETURN_IF_ERROR(get_file_col_id_to_name(exist_schema, file_id_to_name));
+ if (!exist_schema) {
+ file_id_to_name.clear();
+ for (const auto& [table_col_id, table_col_name] : table_id_to_name) {
+ file_id_to_name.emplace(table_col_id, table_col_name);
+ }
+ }
+
+ /** This is to compare the table schema from FE (table_id_to_name) with
+ * the current file schema (file_id_to_name) , generate two maps for future
use:
+ * 1. table column name to file column name.
+ * 2. file column name to table column name.
+ * For example, file has a column 'col1',
+ * after this file was written, iceberg changed the column name to
'col1_new'.
+ * The two maps would contain:
+ * 1. col1_new -> col1
+ * 2. col1 -> col1_new
+ */
+ for (const auto& [file_col_id, file_col_name] : file_id_to_name) {
+ if (table_id_to_name.find(file_col_id) == table_id_to_name.end()) {
+ continue;
+ }
+
+ auto& table_col_name = table_id_to_name.at(file_col_id);
+ _table_col_to_file_col.emplace(table_col_name, file_col_name);
+ _file_col_to_table_col.emplace(file_col_name, table_col_name);
+ if (table_col_name != file_col_name) {
+ _has_schema_change = true;
+ }
+ }
+
+ /** Generate _all_required_col_names and _not_in_file_col_names.
+ *
+ * _all_required_col_names is all the columns required by user sql.
+ * If the column name has been modified after the data file was written,
+ * put the old name in data file to _all_required_col_names.
+ *
+ * _not_in_file_col_names is all the columns required by user sql but not
in the data file.
+ * e.g. New columns added after this data file was written.
+ * The columns added with names used by old dropped columns should
consider as a missing column,
+ * which should be in _not_in_file_col_names.
+ */
+ _all_required_col_names.clear();
+ _not_in_file_col_names.clear();
+ for (auto table_col_name : read_table_col_names) {
+ auto iter = _table_col_to_file_col.find(table_col_name);
+ if (iter == _table_col_to_file_col.end()) {
+ _all_required_col_names.emplace_back(table_col_name);
+ _not_in_file_col_names.emplace_back(table_col_name);
+ } else {
+ _all_required_col_names.emplace_back(iter->second);
+ }
+ }
+
+ /** Generate _new_colname_to_value_range, by replacing the column name in
+ * _colname_to_value_range with column name in data file.
+ */
+ for (auto& it : *table_col_name_to_value_range) {
+ auto iter = _table_col_to_file_col.find(it.first);
+ if (iter == _table_col_to_file_col.end()) {
+ _new_colname_to_value_range.emplace(it.first, it.second);
+ } else {
+ _new_colname_to_value_range.emplace(iter->second, it.second);
+ }
+ }
+ return Status::OK();
+}
+
+Status TableSchemaChangeHelper::get_next_block_before(Block* block) const {
+ if (_has_schema_change) {
+ for (int i = 0; i < block->columns(); i++) {
+ ColumnWithTypeAndName& col = block->get_by_position(i);
+ auto iter = _table_col_to_file_col.find(col.name);
+ if (iter != _table_col_to_file_col.end()) {
+ col.name = iter->second;
+ }
+ }
+ block->initialize_index_by_name();
+ }
+ return Status::OK();
+}
+
+Status TableSchemaChangeHelper::get_next_block_after(Block* block) const {
+ if (_has_schema_change) {
+ for (int i = 0; i < block->columns(); i++) {
+ ColumnWithTypeAndName& col = block->get_by_position(i);
+ auto iter = _file_col_to_table_col.find(col.name);
+ if (iter != _file_col_to_table_col.end()) {
+ col.name = iter->second;
+ }
+ }
+ block->initialize_index_by_name();
+ }
+ return Status::OK();
+}
+
} // namespace doris::vectorized
\ No newline at end of file
diff --git a/be/src/vec/exec/format/table/table_format_reader.h
b/be/src/vec/exec/format/table/table_format_reader.h
index 5a102a7665e..2f6b8742bae 100644
--- a/be/src/vec/exec/format/table/table_format_reader.h
+++ b/be/src/vec/exec/format/table/table_format_reader.h
@@ -25,6 +25,7 @@
#include <unordered_set>
#include "common/status.h"
+#include "exec/olap_common.h"
#include "vec/exec/format/generic_reader.h"
namespace doris {
@@ -78,4 +79,44 @@ protected:
std::unique_ptr<GenericReader> _file_format_reader; // parquet, orc
};
+class TableSchemaChangeHelper {
+public:
+ /** Get the mapping from the unique ID of the column in the current file
to the file column name.
+ * Iceberg/Hudi/Paimon usually maintains field IDs to support schema
changes. If you cannot obtain this
+ * information (maybe the old version does not have this information), you
need to set `exist_schema` = `false`.
+ */
+ virtual Status get_file_col_id_to_name(bool& exist_schema,
+ std::map<int, std::string>&
file_col_id_to_name) = 0;
+
+ virtual ~TableSchemaChangeHelper() = default;
+
+protected:
+ /** table_id_to_name : table column unique id to table name map */
+ Status init_schema_info(const std::vector<std::string>&
read_table_col_names,
+ const std::unordered_map<int32_t, std::string>&
table_id_to_name,
+ const std::unordered_map<std::string,
ColumnValueRangeType>*
+ table_col_name_to_value_range);
+
+ /** To support schema evolution. We change the column name in block to
+ * make it match with the column name in file before reading data. and
+ * set the name back to table column name before return this block.
+ */
+ Status get_next_block_before(Block* block) const;
+
+ /** Set the name back to table column name before return this block.*/
+ Status get_next_block_after(Block* block) const;
+
+ // copy from _colname_to_value_range with new column name that is in
parquet/orc file
+ std::unordered_map<std::string, ColumnValueRangeType>
_new_colname_to_value_range;
+ // all the columns required by user sql.
+ std::vector<std::string> _all_required_col_names;
+ // col names in table but not in parquet,orc file
+ std::vector<std::string> _not_in_file_col_names;
+ bool _has_schema_change = false;
+ // file column name to table column name map
+ std::unordered_map<std::string, std::string> _file_col_to_table_col;
+ // table column name to file column name map.
+ std::unordered_map<std::string, std::string> _table_col_to_file_col;
+};
+
} // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/table/transactional_hive_reader.cpp
b/be/src/vec/exec/format/table/transactional_hive_reader.cpp
index 8be11f6773a..dc13dd51aaf 100644
--- a/be/src/vec/exec/format/table/transactional_hive_reader.cpp
+++ b/be/src/vec/exec/format/table/transactional_hive_reader.cpp
@@ -66,8 +66,8 @@ Status TransactionalHiveReader::init_reader(
_col_names.insert(_col_names.end(),
TransactionalHive::READ_ROW_COLUMN_NAMES_LOWER_CASE.begin(),
TransactionalHive::READ_ROW_COLUMN_NAMES_LOWER_CASE.end());
Status status = orc_reader->init_reader(
- &_col_names, colname_to_value_range, conjuncts, true,
tuple_descriptor, row_descriptor,
- not_single_slot_filter_conjuncts, slot_id_to_filter_conjuncts);
+ &_col_names, {}, colname_to_value_range, conjuncts, true,
tuple_descriptor,
+ row_descriptor, not_single_slot_filter_conjuncts,
slot_id_to_filter_conjuncts);
return status;
}
@@ -129,9 +129,9 @@ Status TransactionalHiveReader::init_row_filters(const
TFileRangeDesc& range,
OrcReader delete_reader(_profile, _state, _params, delete_range,
_MIN_BATCH_SIZE,
_state->timezone(), _io_ctx, false);
- RETURN_IF_ERROR(
-
delete_reader.init_reader(&TransactionalHive::DELETE_ROW_COLUMN_NAMES_LOWER_CASE,
- nullptr, {}, false, nullptr,
nullptr, nullptr, nullptr));
+ RETURN_IF_ERROR(delete_reader.init_reader(
+ &TransactionalHive::DELETE_ROW_COLUMN_NAMES_LOWER_CASE, {},
nullptr, {}, false,
+ nullptr, nullptr, nullptr, nullptr));
std::unordered_map<std::string, std::tuple<std::string, const
SlotDescriptor*>>
partition_columns;
diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp
b/be/src/vec/exec/scan/vfile_scanner.cpp
index a22777672b6..7cbe6116b63 100644
--- a/be/src/vec/exec/scan/vfile_scanner.cpp
+++ b/be/src/vec/exec/scan/vfile_scanner.cpp
@@ -881,7 +881,7 @@ Status VFileScanner::_get_next_reader() {
} else if (range.__isset.table_format_params &&
range.table_format_params.table_format_type ==
"paimon") {
init_status = orc_reader->init_reader(
- &_file_col_names, _colname_to_value_range,
_push_down_conjuncts, false,
+ &_file_col_names, {}, _colname_to_value_range,
_push_down_conjuncts, false,
_real_tuple_desc, _default_val_row_desc.get(),
&_not_single_slot_filter_conjuncts,
&_slot_id_to_filter_conjuncts);
std::unique_ptr<PaimonOrcReader> paimon_reader =
@@ -897,7 +897,7 @@ Status VFileScanner::_get_next_reader() {
hive_orc_use_column_names =
_state->query_options().hive_orc_use_column_names;
}
init_status = orc_reader->init_reader(
- &_file_col_names, _colname_to_value_range,
_push_down_conjuncts, false,
+ &_file_col_names, {}, _colname_to_value_range,
_push_down_conjuncts, false,
_real_tuple_desc, _default_val_row_desc.get(),
&_not_single_slot_filter_conjuncts,
&_slot_id_to_filter_conjuncts,
hive_orc_use_column_names);
diff --git
a/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/iceberg/run08.sql
b/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/iceberg/run08.sql
new file mode 100644
index 00000000000..1a3d844ef60
--- /dev/null
+++
b/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/iceberg/run08.sql
@@ -0,0 +1,48 @@
+
+use demo.test_db;
+
+CREATE TABLE sc_drop_add_orc (
+ id BIGINT,
+ name STRING,
+ age INT
+)
+USING iceberg
+PARTITIONED BY (id)
+TBLPROPERTIES ('format'='orc');
+
+INSERT INTO sc_drop_add_orc VALUES (1, 'Alice', 25);
+INSERT INTO sc_drop_add_orc VALUES (2, 'Bob', 30);
+
+ALTER TABLE sc_drop_add_orc DROP COLUMN age;
+
+INSERT INTO sc_drop_add_orc (id, name) VALUES (3, 'Charlie');
+INSERT INTO sc_drop_add_orc (id, name) VALUES (4, 'David');
+
+ALTER TABLE sc_drop_add_orc ADD COLUMN age INT;
+
+INSERT INTO sc_drop_add_orc VALUES (5, 'Eve', 28);
+INSERT INTO sc_drop_add_orc VALUES (6, 'Frank', 35);
+
+
+
+CREATE TABLE sc_drop_add_parquet (
+ id BIGINT,
+ name STRING,
+ age INT
+)
+USING iceberg
+PARTITIONED BY (id)
+TBLPROPERTIES ('format'='parquet');
+
+INSERT INTO sc_drop_add_parquet VALUES (1, 'Alice', 25);
+INSERT INTO sc_drop_add_parquet VALUES (2, 'Bob', 30);
+
+ALTER TABLE sc_drop_add_parquet DROP COLUMN age;
+
+INSERT INTO sc_drop_add_parquet (id, name) VALUES (3, 'Charlie');
+INSERT INTO sc_drop_add_parquet (id, name) VALUES (4, 'David');
+
+ALTER TABLE sc_drop_add_parquet ADD COLUMN age INT;
+
+INSERT INTO sc_drop_add_parquet VALUES (5, 'Eve', 28);
+INSERT INTO sc_drop_add_parquet VALUES (6, 'Frank', 35);
\ No newline at end of file
diff --git
a/regression-test/data/external_table_p0/iceberg/iceberg_schema_change2.out
b/regression-test/data/external_table_p0/iceberg/iceberg_schema_change2.out
new file mode 100644
index 00000000000..d68cde9a50e
Binary files /dev/null and
b/regression-test/data/external_table_p0/iceberg/iceberg_schema_change2.out
differ
diff --git
a/regression-test/suites/external_table_p0/iceberg/iceberg_schema_change2.groovy
b/regression-test/suites/external_table_p0/iceberg/iceberg_schema_change2.groovy
new file mode 100644
index 00000000000..295d14b246e
--- /dev/null
+++
b/regression-test/suites/external_table_p0/iceberg/iceberg_schema_change2.groovy
@@ -0,0 +1,66 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("iceberg_schema_change2",
"p0,external,doris,external_docker,external_docker_doris") {
+
+ String enabled = context.config.otherConfigs.get("enableIcebergTest")
+ if (enabled == null || !enabled.equalsIgnoreCase("true")) {
+ logger.info("disable iceberg test.")
+ return
+ }
+
+ String catalog_name = "iceberg_schema_change2"
+ String rest_port = context.config.otherConfigs.get("iceberg_rest_uri_port")
+ String minio_port = context.config.otherConfigs.get("iceberg_minio_port")
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+
+ sql """drop catalog if exists ${catalog_name}"""
+ sql """
+ CREATE CATALOG ${catalog_name} PROPERTIES (
+ 'type'='iceberg',
+ 'iceberg.catalog.type'='rest',
+ 'uri' = 'http://${externalEnvIp}:${rest_port}',
+ "s3.access_key" = "admin",
+ "s3.secret_key" = "password",
+ "s3.endpoint" = "http://${externalEnvIp}:${minio_port}",
+ "s3.region" = "us-east-1"
+ );"""
+
+ logger.info("catalog " + catalog_name + " created")
+ sql """switch ${catalog_name};"""
+ logger.info("switched to catalog " + catalog_name)
+ sql """ use test_db;"""
+
+ qt_parquet_1 """ select * from sc_drop_add_parquet order by id; """
+ qt_parquet_2 """ select * from sc_drop_add_parquet where age is NULL order
by id; """
+ qt_parquet_3 """ select * from sc_drop_add_parquet where age is not NULL
order by id; """
+ qt_parquet_4 """ select * from sc_drop_add_parquet where age > 28 order by
id; """
+ qt_parquet_5 """ select * from sc_drop_add_parquet where age >= 28 order
by id; """
+ qt_parquet_6 """ select id, name from sc_drop_add_parquet where age >= 28
order by id; """
+ qt_parquet_7 """ select id, age from sc_drop_add_parquet where name="Eve"
order by id; """
+
+
+
+ qt_orc_1 """ select * from sc_drop_add_orc order by id; """
+ qt_orc_2 """ select * from sc_drop_add_orc where age is NULL order by id;
"""
+ qt_orc_3 """ select * from sc_drop_add_orc where age is not NULL order by
id; """
+ qt_orc_4 """ select * from sc_drop_add_orc where age > 28 order by id; """
+ qt_orc_5 """ select * from sc_drop_add_orc where age >= 28 order by id; """
+ qt_orc_6 """ select id, name from sc_drop_add_orc where age >= 28 order by
id; """
+ qt_orc_7 """ select id, age from sc_drop_add_orc where name="Eve" order by
id; """
+
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]