This is an automated email from the ASF dual-hosted git repository.
kakachen pushed a commit to branch data_lake_reader_refactoring
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/data_lake_reader_refactoring
by this push:
new 4541ae1b562 fix v3 (#62305)
4541ae1b562 is described below
commit 4541ae1b562325668243cd6b2a1f5ec50a711331
Author: daidai <[email protected]>
AuthorDate: Fri Apr 10 10:56:42 2026 +0800
fix v3 (#62305)
### What problem does this PR solve?
Issue Number: close #xxx
Related PR: #xxx
Problem Summary:
### Release note
None
### Check List (For Author)
- Test <!-- At least one of them must be included. -->
- [ ] Regression test
- [ ] Unit Test
- [ ] Manual test (add detailed scripts or steps below)
- [ ] No need to test or manual test. Explain why:
- [ ] This is a refactor/code format and no logic has been changed.
- [ ] Previous test can cover this change.
- [ ] No code files have been changed.
- [ ] Other reason <!-- Add your reason? -->
- Behavior changed:
- [ ] No.
- [ ] Yes. <!-- Explain the behavior change -->
- Does this need documentation?
- [ ] No.
- [ ] Yes. <!-- Add document PR link here. eg:
https://github.com/apache/doris-website/pull/1214 -->
### Check List (For Reviewer who merge this PR)
- [ ] Confirm the release note
- [ ] Confirm test cases
- [ ] Confirm document
- [ ] Add branch pick label <!-- Add branch pick label that this PR
should merge into -->
---
be/src/exec/scan/file_scanner.cpp | 134 +++------
be/src/exec/scan/file_scanner.h | 6 +-
be/src/format/orc/vorc_reader.cpp | 102 ++-----
be/src/format/orc/vorc_reader.h | 33 ++-
be/src/format/parquet/vparquet_group_reader.cpp | 105 ++-----
be/src/format/parquet/vparquet_group_reader.h | 9 -
be/src/format/parquet/vparquet_reader.cpp | 97 ++++---
be/src/format/parquet/vparquet_reader.h | 29 +-
be/src/format/table/hive_reader.cpp | 20 ++
be/src/format/table/iceberg_reader.cpp | 92 +++++--
be/src/format/table/iceberg_reader.h | 16 --
be/src/format/table/iceberg_reader_mixin.h | 77 ++++++
be/src/format/table/table_format_reader.h | 53 ++++
be/src/format/table/transactional_hive_reader.cpp | 13 +-
.../apache/doris/datasource/FileQueryScanNode.java | 3 -
.../doris/datasource/hive/source/HiveScanNode.java | 10 +
.../datasource/iceberg/source/IcebergScanNode.java | 15 +
.../doris/datasource/tvf/source/TVFScanNode.java | 11 +
...ceberg_v3_row_lineage_rewrite_data_files.groovy | 244 +++++++++++++++++
...est_iceberg_v2_to_v3_doris_spark_compare.groovy | 223 +++++++++++++++
...test_iceberg_v3_row_lineage_query_insert.groovy | 304 +++++++++++++++++++++
...eberg_v3_row_lineage_update_delete_merge.groovy | 292 ++++++++++++++++++++
22 files changed, 1490 insertions(+), 398 deletions(-)
diff --git a/be/src/exec/scan/file_scanner.cpp
b/be/src/exec/scan/file_scanner.cpp
index 841d29bf183..300d103b54a 100644
--- a/be/src/exec/scan/file_scanner.cpp
+++ b/be/src/exec/scan/file_scanner.cpp
@@ -868,14 +868,13 @@ void FileScanner::_truncate_char_or_varchar_column(Block*
block, int idx, int le
Block::erase_useless_column(block, num_columns_without_result);
}
-Status FileScanner::_create_row_id_column_iterator() {
+std::shared_ptr<segment_v2::RowIdColumnIteratorV2>
FileScanner::_create_row_id_column_iterator() {
auto& id_file_map = _state->get_id_file_map();
auto file_id = id_file_map->get_file_mapping_id(
std::make_shared<FileMapping>(((FileScanLocalState*)_local_state)->parent_id(),
_current_range,
_should_enable_file_meta_cache()));
- _row_id_column_iterator_pair.first =
std::make_shared<RowIdColumnIteratorV2>(
- IdManager::ID_VERSION, BackendOptions::get_backend_id(), file_id);
- return Status::OK();
+ return std::make_shared<RowIdColumnIteratorV2>(IdManager::ID_VERSION,
+
BackendOptions::get_backend_id(), file_id);
}
void FileScanner::_fill_base_init_context(ReaderInitContext* ctx) {
@@ -1236,28 +1235,10 @@ Status FileScanner::_init_parquet_reader(FileMetaCache*
file_meta_cache_ptr,
std::unique_ptr<IcebergParquetReader> iceberg_reader =
IcebergParquetReader::create_unique(
_kv_cache, _profile, *_params, range,
_state->query_options().batch_size,
&_state->timezone_obj(), _io_ctx.get(), _state,
file_meta_cache_ptr);
-
- // Transfer properties
- if (_row_id_column_iterator_pair.second != -1) {
- RETURN_IF_ERROR(_create_row_id_column_iterator());
-
iceberg_reader->set_row_id_column_iterator(_row_id_column_iterator_pair);
- }
- if (_row_lineage_columns.row_id_column_idx != -1 ||
- _row_lineage_columns.last_updated_sequence_number_column_idx !=
-1) {
- auto row_lineage_columns = std::make_shared<RowLineageColumns>();
- row_lineage_columns->row_id_column_idx =
_row_lineage_columns.row_id_column_idx;
- row_lineage_columns->last_updated_sequence_number_column_idx =
-
_row_lineage_columns.last_updated_sequence_number_column_idx;
- const auto& iceberg_params =
range.table_format_params.iceberg_params;
- row_lineage_columns->first_row_id =
- iceberg_params.__isset.first_row_id ?
iceberg_params.first_row_id : -1;
- row_lineage_columns->last_updated_sequence_number =
- iceberg_params.__isset.last_updated_sequence_number
- ? iceberg_params.last_updated_sequence_number
- : -1;
-
iceberg_reader->set_row_lineage_columns(std::move(row_lineage_columns));
- }
-
+ iceberg_reader->set_create_row_id_column_iterator_func(
+ [this]() -> std::shared_ptr<segment_v2::RowIdColumnIteratorV2>
{
+ return _create_row_id_column_iterator();
+ });
init_status =
static_cast<GenericReader*>(iceberg_reader.get())->init_reader(&pctx);
_cur_reader = std::move(iceberg_reader);
} else if (range.__isset.table_format_params &&
@@ -1266,10 +1247,6 @@ Status FileScanner::_init_parquet_reader(FileMetaCache*
file_meta_cache_ptr,
auto paimon_reader = PaimonParquetReader::create_unique(
_profile, *_params, range, _state->query_options().batch_size,
&_state->timezone_obj(), _kv_cache, _io_ctx.get(), _state,
file_meta_cache_ptr);
- if (_row_id_column_iterator_pair.second != -1) {
- RETURN_IF_ERROR(_create_row_id_column_iterator());
-
paimon_reader->set_row_id_column_iterator(_row_id_column_iterator_pair);
- }
init_status =
static_cast<GenericReader*>(paimon_reader.get())->init_reader(&pctx);
_cur_reader = std::move(paimon_reader);
} else if (range.__isset.table_format_params &&
@@ -1278,10 +1255,6 @@ Status FileScanner::_init_parquet_reader(FileMetaCache*
file_meta_cache_ptr,
auto hudi_reader = HudiParquetReader::create_unique(
_profile, *_params, range, _state->query_options().batch_size,
&_state->timezone_obj(), _io_ctx.get(), _state,
file_meta_cache_ptr);
- if (_row_id_column_iterator_pair.second != -1) {
- RETURN_IF_ERROR(_create_row_id_column_iterator());
-
hudi_reader->set_row_id_column_iterator(_row_id_column_iterator_pair);
- }
init_status =
static_cast<GenericReader*>(hudi_reader.get())->init_reader(&pctx);
_cur_reader = std::move(hudi_reader);
} else if (range.table_format_params.table_format_type == "hive") {
@@ -1289,10 +1262,10 @@ Status FileScanner::_init_parquet_reader(FileMetaCache*
file_meta_cache_ptr,
_profile, *_params, range, _state->query_options().batch_size,
&_state->timezone_obj(), _io_ctx.get(), _state,
&_is_file_slot, file_meta_cache_ptr,
_state->query_options().enable_parquet_lazy_mat);
- if (_row_id_column_iterator_pair.second != -1) {
- RETURN_IF_ERROR(_create_row_id_column_iterator());
-
hive_reader->set_row_id_column_iterator(_row_id_column_iterator_pair);
- }
+ hive_reader->set_create_row_id_column_iterator_func(
+ [this]() -> std::shared_ptr<segment_v2::RowIdColumnIteratorV2>
{
+ return _create_row_id_column_iterator();
+ });
init_status =
static_cast<GenericReader*>(hive_reader.get())->init_reader(&pctx);
_cur_reader = std::move(hive_reader);
} else if (range.table_format_params.table_format_type == "tvf") {
@@ -1302,10 +1275,10 @@ Status FileScanner::_init_parquet_reader(FileMetaCache*
file_meta_cache_ptr,
&_state->timezone_obj(), _io_ctx.get(), _state,
file_meta_cache_ptr,
_state->query_options().enable_parquet_lazy_mat);
}
- if (_row_id_column_iterator_pair.second != -1) {
- RETURN_IF_ERROR(_create_row_id_column_iterator());
-
parquet_reader->set_row_id_column_iterator(_row_id_column_iterator_pair);
- }
+ parquet_reader->set_create_row_id_column_iterator_func(
+ [this]() -> std::shared_ptr<segment_v2::RowIdColumnIteratorV2>
{
+ return _create_row_id_column_iterator();
+ });
init_status =
static_cast<GenericReader*>(parquet_reader.get())->init_reader(&pctx);
_cur_reader = std::move(parquet_reader);
} else if (_is_load) {
@@ -1340,10 +1313,10 @@ Status FileScanner::_init_orc_reader(FileMetaCache*
file_meta_cache_ptr,
auto tran_orc_reader = TransactionalHiveReader::create_unique(
_profile, _state, *_params, range,
_state->query_options().batch_size,
_state->timezone(), _io_ctx.get(), file_meta_cache_ptr);
- if (_row_id_column_iterator_pair.second != -1) {
- RETURN_IF_ERROR(_create_row_id_column_iterator());
-
tran_orc_reader->set_row_id_column_iterator(_row_id_column_iterator_pair);
- }
+ tran_orc_reader->set_create_row_id_column_iterator_func(
+ [this]() -> std::shared_ptr<segment_v2::RowIdColumnIteratorV2>
{
+ return _create_row_id_column_iterator();
+ });
init_status =
static_cast<GenericReader*>(tran_orc_reader.get())->init_reader(&octx);
_cur_reader = std::move(tran_orc_reader);
@@ -1353,27 +1326,10 @@ Status FileScanner::_init_orc_reader(FileMetaCache*
file_meta_cache_ptr,
std::unique_ptr<IcebergOrcReader> iceberg_reader =
IcebergOrcReader::create_unique(
_kv_cache, _profile, _state, *_params, range,
_state->query_options().batch_size,
_state->timezone(), _io_ctx.get(), file_meta_cache_ptr);
-
- // Transfer properties
- if (_row_id_column_iterator_pair.second != -1) {
- RETURN_IF_ERROR(_create_row_id_column_iterator());
-
iceberg_reader->set_row_id_column_iterator(_row_id_column_iterator_pair);
- }
- if (_row_lineage_columns.row_id_column_idx != -1 ||
- _row_lineage_columns.last_updated_sequence_number_column_idx !=
-1) {
- auto row_lineage_columns = std::make_shared<RowLineageColumns>();
- row_lineage_columns->row_id_column_idx =
_row_lineage_columns.row_id_column_idx;
- row_lineage_columns->last_updated_sequence_number_column_idx =
-
_row_lineage_columns.last_updated_sequence_number_column_idx;
- const auto& iceberg_params =
range.table_format_params.iceberg_params;
- row_lineage_columns->first_row_id =
- iceberg_params.__isset.first_row_id ?
iceberg_params.first_row_id : -1;
- row_lineage_columns->last_updated_sequence_number =
- iceberg_params.__isset.last_updated_sequence_number
- ? iceberg_params.last_updated_sequence_number
- : -1;
-
iceberg_reader->set_row_lineage_columns(std::move(row_lineage_columns));
- }
+ iceberg_reader->set_create_row_id_column_iterator_func(
+ [this]() -> std::shared_ptr<segment_v2::RowIdColumnIteratorV2>
{
+ return _create_row_id_column_iterator();
+ });
init_status =
static_cast<GenericReader*>(iceberg_reader.get())->init_reader(&octx);
_cur_reader = std::move(iceberg_reader);
@@ -1383,10 +1339,6 @@ Status FileScanner::_init_orc_reader(FileMetaCache*
file_meta_cache_ptr,
auto paimon_reader = PaimonOrcReader::create_unique(
_profile, _state, *_params, range,
_state->query_options().batch_size,
_state->timezone(), _kv_cache, _io_ctx.get(),
file_meta_cache_ptr);
- if (_row_id_column_iterator_pair.second != -1) {
- RETURN_IF_ERROR(_create_row_id_column_iterator());
-
paimon_reader->set_row_id_column_iterator(_row_id_column_iterator_pair);
- }
init_status =
static_cast<GenericReader*>(paimon_reader.get())->init_reader(&octx);
_cur_reader = std::move(paimon_reader);
@@ -1396,10 +1348,6 @@ Status FileScanner::_init_orc_reader(FileMetaCache*
file_meta_cache_ptr,
auto hudi_reader = HudiOrcReader::create_unique(
_profile, _state, *_params, range,
_state->query_options().batch_size,
_state->timezone(), _io_ctx.get(), file_meta_cache_ptr);
- if (_row_id_column_iterator_pair.second != -1) {
- RETURN_IF_ERROR(_create_row_id_column_iterator());
-
hudi_reader->set_row_id_column_iterator(_row_id_column_iterator_pair);
- }
init_status =
static_cast<GenericReader*>(hudi_reader.get())->init_reader(&octx);
_cur_reader = std::move(hudi_reader);
@@ -1409,10 +1357,10 @@ Status FileScanner::_init_orc_reader(FileMetaCache*
file_meta_cache_ptr,
_profile, _state, *_params, range,
_state->query_options().batch_size,
_state->timezone(), _io_ctx.get(), &_is_file_slot,
file_meta_cache_ptr,
_state->query_options().enable_orc_lazy_mat);
- if (_row_id_column_iterator_pair.second != -1) {
- RETURN_IF_ERROR(_create_row_id_column_iterator());
-
hive_reader->set_row_id_column_iterator(_row_id_column_iterator_pair);
- }
+ hive_reader->set_create_row_id_column_iterator_func(
+ [this]() -> std::shared_ptr<segment_v2::RowIdColumnIteratorV2>
{
+ return _create_row_id_column_iterator();
+ });
init_status =
static_cast<GenericReader*>(hive_reader.get())->init_reader(&octx);
_cur_reader = std::move(hive_reader);
@@ -1424,10 +1372,10 @@ Status FileScanner::_init_orc_reader(FileMetaCache*
file_meta_cache_ptr,
_state->timezone(), _io_ctx.get(), file_meta_cache_ptr,
_state->query_options().enable_orc_lazy_mat);
}
- if (_row_id_column_iterator_pair.second != -1) {
- RETURN_IF_ERROR(_create_row_id_column_iterator());
-
orc_reader->set_row_id_column_iterator(_row_id_column_iterator_pair);
- }
+ orc_reader->set_create_row_id_column_iterator_func(
+ [this]() -> std::shared_ptr<segment_v2::RowIdColumnIteratorV2>
{
+ return _create_row_id_column_iterator();
+ });
init_status =
static_cast<GenericReader*>(orc_reader.get())->init_reader(&octx);
_cur_reader = std::move(orc_reader);
} else if (_is_load) {
@@ -1683,26 +1631,6 @@ Status FileScanner::_init_expr_ctxes() {
col_desc.category = ColumnCategory::PARTITION_KEY;
}
- if (it->second->col_name().starts_with(BeConsts::GLOBAL_ROWID_COL)) {
- _row_id_column_iterator_pair.second =
_default_val_row_desc->get_column_id(slot_id);
- continue;
- }
-
- bool is_row_lineage_col = false;
- if (it->second->col_name() == IcebergTableReader::ROW_LINEAGE_ROW_ID) {
- _row_lineage_columns.row_id_column_idx =
_default_val_row_desc->get_column_id(slot_id);
- is_row_lineage_col = true;
- }
-
- if (it->second->col_name() ==
IcebergTableReader::ROW_LINEAGE_LAST_UPDATED_SEQ_NUMBER) {
- _row_lineage_columns.last_updated_sequence_number_column_idx =
- _default_val_row_desc->get_column_id(slot_id);
- is_row_lineage_col = true;
- }
- if (is_row_lineage_col) {
- col_desc.category = ColumnCategory::SYNTHESIZED;
- }
-
// Derive is_file_slot from category
bool is_file_slot = (col_desc.category == ColumnCategory::REGULAR ||
col_desc.category == ColumnCategory::GENERATED);
diff --git a/be/src/exec/scan/file_scanner.h b/be/src/exec/scan/file_scanner.h
index 865872f1166..f3362380599 100644
--- a/be/src/exec/scan/file_scanner.h
+++ b/be/src/exec/scan/file_scanner.h
@@ -232,10 +232,6 @@ private:
// otherwise, point to _output_tuple_desc
const TupleDescriptor* _real_tuple_desc = nullptr;
- std::pair<std::shared_ptr<RowIdColumnIteratorV2>, int>
_row_id_column_iterator_pair = {nullptr,
-
-1};
- // for iceberg row lineage
- RowLineageColumns _row_lineage_columns;
int64_t _last_bytes_read_from_local = 0;
int64_t _last_bytes_read_from_remote = 0;
@@ -282,7 +278,7 @@ private:
std::unique_ptr<OrcReader> orc_reader = nullptr);
Status _init_parquet_reader(FileMetaCache* file_meta_cache_ptr,
std::unique_ptr<ParquetReader> parquet_reader
= nullptr);
- Status _create_row_id_column_iterator();
+ std::shared_ptr<segment_v2::RowIdColumnIteratorV2>
_create_row_id_column_iterator();
TFileFormatType::type _get_current_format_type() {
// for compatibility, if format_type is not set in range, use the
format type of params
diff --git a/be/src/format/orc/vorc_reader.cpp
b/be/src/format/orc/vorc_reader.cpp
index c23f27bf495..1ecc88e22c7 100644
--- a/be/src/format/orc/vorc_reader.cpp
+++ b/be/src/format/orc/vorc_reader.cpp
@@ -467,20 +467,6 @@ Status OrcReader::_do_init_reader(ReaderInitContext*
base_ctx) {
// and standalone path (_fill_missing_cols empty, _table_info_node_ptr may
be null).
_init_file_column_mapping();
- // Register row-position-based synthesized column handler.
- // _row_id_column_iterator_pair and _row_lineage_columns are set before
init_reader
- // by FileScanner. This must be outside has_column_descs() guard because
standalone
- // readers (e.g., orc_read_lines tests) also use row_id columns.
- if (_row_id_column_iterator_pair.first != nullptr ||
- (_row_lineage_columns != nullptr &&
- (_row_lineage_columns->need_row_ids() ||
- _row_lineage_columns->has_last_updated_sequence_number_column()))) {
- register_synthesized_column_handler(
- BeConsts::ROWID_COL, [this](Block* block, size_t rows) ->
Status {
- return _fill_row_id_columns(block,
_row_reader->getRowNumber());
- });
- }
-
// ---- Inlined set_fill_columns logic (partition/missing/synthesized
classification) ----
// 1. Collect predicate columns from conjuncts for lazy materialization
@@ -542,6 +528,16 @@ Status OrcReader::on_before_init_reader(ReaderInitContext*
ctx) {
if (desc.category == ColumnCategory::REGULAR ||
desc.category == ColumnCategory::GENERATED) {
ctx->column_names.push_back(desc.name);
+ } else if (desc.category == ColumnCategory::SYNTHESIZED &&
+ desc.name.starts_with(BeConsts::GLOBAL_ROWID_COL)) {
+ auto topn_row_id_column_iter =
_create_topn_row_id_column_iterator();
+ this->register_synthesized_column_handler(
+ desc.name,
+ [iter = std::move(topn_row_id_column_iter), this, &desc](
+ Block* block, size_t rows) -> Status {
+ return fill_topn_row_id(iter, desc.name, block, rows);
+ });
+ continue;
}
}
@@ -905,6 +901,10 @@ bool OrcReader::_check_slot_can_push_down(const VExprSPtr&
expr) {
return false;
}
+ if (disable_column_opt(slot_ref->expr_name())) {
+ return false;
+ }
+
// Directly use _get_orc_predicate_type since we only need the type
auto [valid, predicate_type] = _get_orc_predicate_type(slot_ref);
if (valid) {
@@ -1273,17 +1273,6 @@ void OrcReader::_classify_columns_for_lazy_read(
TransactionalHive::READ_ROW_COLUMN_NAMES.end());
}
- auto check_iceberg_row_lineage_column_idx = [&](const auto& col_name) ->
int {
- if (_row_lineage_columns != nullptr) {
- if (col_name == IcebergTableReader::ROW_LINEAGE_ROW_ID) {
- return _row_lineage_columns->row_id_column_idx;
- } else if (col_name ==
IcebergTableReader::ROW_LINEAGE_LAST_UPDATED_SEQ_NUMBER) {
- return
_row_lineage_columns->last_updated_sequence_number_column_idx;
- }
- }
- return -1;
- };
-
for (auto& read_table_col : _read_table_cols) {
_lazy_read_ctx.all_read_columns.emplace_back(read_table_col);
if (!predicate_table_columns.empty()) {
@@ -1301,7 +1290,7 @@ void OrcReader::_classify_columns_for_lazy_read(
_lazy_read_ctx.predicate_columns.second.emplace_back(iter->second.second);
_lazy_read_ctx.predicate_orc_columns.emplace_back(
_table_info_node_ptr->children_file_column_name(iter->first));
- if (check_iceberg_row_lineage_column_idx(read_table_col) !=
-1) {
+ if (disable_column_opt(read_table_col)) {
// Todo : enable lazy mat where filter iceberg row lineage
column.
_enable_lazy_mat = false;
}
@@ -1331,7 +1320,7 @@ void OrcReader::_classify_columns_for_lazy_read(
}
}
_lazy_read_ctx.predicate_missing_columns.emplace(kv.first,
kv.second);
- if (check_iceberg_row_lineage_column_idx(kv.first) != -1) {
+ if (disable_column_opt(kv.first)) {
_enable_lazy_mat = false;
}
}
@@ -1481,52 +1470,6 @@ Status OrcReader::_init_orc_row_reader() {
return Status::OK();
}
-Status OrcReader::_fill_row_id_columns(Block* block, int64_t start_row) {
- size_t fill_size = _batch->numElements;
- if (_row_id_column_iterator_pair.first != nullptr) {
-
RETURN_IF_ERROR(_row_id_column_iterator_pair.first->seek_to_ordinal(start_row));
- auto col = block->get_by_position(_row_id_column_iterator_pair.second)
- .column->assume_mutable();
-
RETURN_IF_ERROR(_row_id_column_iterator_pair.first->next_batch(&fill_size,
col));
- }
-
- if (_row_lineage_columns != nullptr &&
_row_lineage_columns->need_row_ids() &&
- _row_lineage_columns->first_row_id >= 0) {
- auto col =
block->get_by_position(_row_lineage_columns->row_id_column_idx)
- .column->assume_mutable();
- auto* nullable_column = assert_cast<ColumnNullable*>(col.get());
- auto& null_map = nullable_column->get_null_map_data();
- auto& data =
-
assert_cast<ColumnInt64&>(*nullable_column->get_nested_column_ptr()).get_data();
- for (size_t i = 0; i < fill_size; ++i) {
- if (null_map[i] != 0) {
- null_map[i] = 0;
- data[i] = _row_lineage_columns->first_row_id + start_row +
static_cast<int64_t>(i);
- }
- }
- }
-
- if (_row_lineage_columns != nullptr &&
- _row_lineage_columns->has_last_updated_sequence_number_column() &&
- _row_lineage_columns->last_updated_sequence_number >= 0) {
- auto col = block->get_by_position(
-
_row_lineage_columns->last_updated_sequence_number_column_idx)
- .column->assume_mutable();
- auto* nullable_column = assert_cast<ColumnNullable*>(col.get());
- auto& null_map = nullable_column->get_null_map_data();
- auto& data =
-
assert_cast<ColumnInt64&>(*nullable_column->get_nested_column_ptr()).get_data();
- for (size_t i = 0; i < fill_size; ++i) {
- if (null_map[i] != 0) {
- null_map[i] = 0;
- data[i] = _row_lineage_columns->last_updated_sequence_number;
- }
- }
- }
-
- return Status::OK();
-}
-
void OrcReader::_init_system_properties() {
if (_scan_range.__isset.file_type) {
// for compatibility
@@ -2455,9 +2398,8 @@ Status OrcReader::_get_next_block_impl(Block* block,
size_t* read_rows, bool* eo
_current_batch_row_positions[i] =
static_cast<rowid_t>(start_row + static_cast<int64_t>(i));
}
- if (has_synthesized_column_handlers()) {
- RETURN_IF_ERROR(fill_synthesized_columns(block, block->rows()));
- }
+ RETURN_IF_ERROR(fill_synthesized_columns(block, block->rows()));
+ RETURN_IF_ERROR(fill_generated_columns(block, block->rows()));
if (block->rows() == 0) {
RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block, nullptr));
@@ -2606,9 +2548,8 @@ Status OrcReader::_get_next_block_impl(Block* block,
size_t* read_rows, bool* eo
_current_batch_row_positions[i] =
static_cast<rowid_t>(start_row + static_cast<int64_t>(i));
}
- if (has_synthesized_column_handlers()) {
- RETURN_IF_ERROR(fill_synthesized_columns(block, block->rows()));
- }
+ RETURN_IF_ERROR(fill_synthesized_columns(block, block->rows()));
+ RETURN_IF_ERROR(fill_generated_columns(block, block->rows()));
if (block->rows() == 0) {
RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block, nullptr));
@@ -2932,7 +2873,8 @@ Status OrcReader::fill_dict_filter_column_names(
int i = 0;
for (const auto& predicate_col_name : predicate_col_names) {
int slot_id = predicate_col_slot_ids[i];
- if (!_disable_dict_filter && _can_filter_by_dict(slot_id)) {
+ if (!_disable_dict_filter && !disable_column_opt(predicate_col_name) &&
+ _can_filter_by_dict(slot_id)) {
_dict_filter_cols.emplace_back(predicate_col_name, slot_id);
column_names.emplace_back(
_table_info_node_ptr->children_file_column_name(predicate_col_name));
diff --git a/be/src/format/orc/vorc_reader.h b/be/src/format/orc/vorc_reader.h
index d4b38e15d17..44fd0e8df52 100644
--- a/be/src/format/orc/vorc_reader.h
+++ b/be/src/format/orc/vorc_reader.h
@@ -212,13 +212,25 @@ public:
static std::string get_field_name_lower_case(const orc::Type* orc_type,
int pos);
- void set_row_id_column_iterator(
- const
std::pair<std::shared_ptr<segment_v2::RowIdColumnIteratorV2>, int>&
- iterator_pair) {
- _row_id_column_iterator_pair = iterator_pair;
+ void set_create_row_id_column_iterator_func(
+
std::function<std::shared_ptr<segment_v2::RowIdColumnIteratorV2>()>
create_func) {
+ _create_topn_row_id_column_iterator = create_func;
}
- void set_row_lineage_columns(std::shared_ptr<RowLineageColumns>
row_lineage_columns) {
- _row_lineage_columns = std::move(row_lineage_columns);
+
+ Status fill_topn_row_id(
+ std::shared_ptr<segment_v2::RowIdColumnIteratorV2>
_row_id_column_iterator,
+ std::string col_name, Block* block, size_t rows) {
+ int col_pos = block->get_position_by_name(col_name);
+ DCHECK(col_pos >= 0);
+ if (col_pos < 0) {
+ return Status::InternalError("Column {} not found in block",
col_name);
+ }
+ auto col = block->get_by_position(col_pos).column->assume_mutable();
+ const auto& row_ids = this->current_batch_row_positions();
+ RETURN_IF_ERROR(
+ _row_id_column_iterator->read_by_rowids(row_ids.data(),
row_ids.size(), col));
+
+ return Status::OK();
}
static bool inline is_hive1_col_name(const orc::Type* orc_type_ptr) {
@@ -687,8 +699,6 @@ private:
return true;
}
- Status _fill_row_id_columns(Block* block, int64_t start_row);
-
bool _seek_to_read_one_line() {
if (_read_by_rows) {
if (_row_ids.empty()) {
@@ -795,6 +805,9 @@ protected:
// in on_before_init_reader.
LazyReadContext _lazy_read_ctx;
+ std::function<std::shared_ptr<segment_v2::RowIdColumnIteratorV2>()>
+ _create_topn_row_id_column_iterator;
+
private:
std::unique_ptr<IColumn::Filter> _filter;
const AcidRowIDSet* _delete_rows = nullptr;
@@ -826,10 +839,6 @@ private:
int64_t _orc_once_max_read_bytes = 8L * 1024L * 1024L;
int64_t _orc_max_merge_distance_bytes = 1L * 1024L * 1024L;
- std::pair<std::shared_ptr<segment_v2::RowIdColumnIteratorV2>, int>
- _row_id_column_iterator_pair = {nullptr, -1};
- std::shared_ptr<RowLineageColumns> _row_lineage_columns;
-
std::vector<rowid_t> _current_batch_row_positions;
// Through this node, you can find the file column based on the table
column.
diff --git a/be/src/format/parquet/vparquet_group_reader.cpp
b/be/src/format/parquet/vparquet_group_reader.cpp
index 0efeea70c10..a5b89207a24 100644
--- a/be/src/format/parquet/vparquet_group_reader.cpp
+++ b/be/src/format/parquet/vparquet_group_reader.cpp
@@ -157,8 +157,8 @@ Status RowGroupReader::init(
for (size_t i = 0; i < predicate_col_names.size(); ++i) {
const std::string& predicate_col_name = predicate_col_names[i];
int slot_id = predicate_col_slot_ids[i];
- if (predicate_col_name == IcebergTableReader::ROW_LINEAGE_ROW_ID ||
- predicate_col_name ==
IcebergTableReader::ROW_LINEAGE_LAST_UPDATED_SEQ_NUMBER) {
+
+ if (_table_format_reader->disable_column_opt(predicate_col_name)) {
// row lineage column can not dict filter.
if (_slot_id_to_filter_conjuncts->find(slot_id) !=
_slot_id_to_filter_conjuncts->end()) {
@@ -334,7 +334,7 @@ Status RowGroupReader::next_batch(Block* block, size_t
batch_size, size_t* read_
RETURN_IF_ERROR(_get_current_batch_row_id(*read_rows));
}
RETURN_IF_ERROR(_table_format_reader->fill_synthesized_columns(block,
*read_rows));
-
+ RETURN_IF_ERROR(_table_format_reader->fill_generated_columns(block,
*read_rows));
Status st = VExprContext::filter_block(_lazy_read_ctx.conjuncts,
block, block->columns());
*read_rows = block->rows();
return st;
@@ -353,10 +353,12 @@ Status RowGroupReader::next_batch(Block* block, size_t
batch_size, size_t* read_
RETURN_IF_ERROR(_table_format_reader->on_fill_missing_columns(
block, *read_rows, _lazy_read_ctx.missing_col_names));
- if (_table_format_reader->has_synthesized_column_handlers()) {
+ if (_table_format_reader->has_synthesized_column_handlers() ||
+ _table_format_reader->has_generated_column_handlers()) {
RETURN_IF_ERROR(_get_current_batch_row_id(*read_rows));
}
RETURN_IF_ERROR(_table_format_reader->fill_synthesized_columns(block,
*read_rows));
+ RETURN_IF_ERROR(_table_format_reader->fill_generated_columns(block,
*read_rows));
#ifndef NDEBUG
for (auto col : *block) {
@@ -635,11 +637,12 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t
batch_size, size_t* re
block, pre_read_rows,
_lazy_read_ctx.predicate_partition_col_names));
RETURN_IF_ERROR(_table_format_reader->on_fill_missing_columns(
block, pre_read_rows,
_lazy_read_ctx.predicate_missing_col_names));
- if (_table_format_reader->has_synthesized_column_handlers()) {
+ if (_table_format_reader->has_synthesized_column_handlers() ||
+ _table_format_reader->has_generated_column_handlers()) {
RETURN_IF_ERROR(_get_current_batch_row_id(pre_read_rows));
}
RETURN_IF_ERROR(_table_format_reader->fill_synthesized_columns(block,
pre_read_rows));
-
+ RETURN_IF_ERROR(_table_format_reader->fill_generated_columns(block,
pre_read_rows));
RETURN_IF_ERROR(_build_pos_delete_filter(pre_read_rows));
#ifndef NDEBUG
@@ -655,13 +658,6 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t
batch_size, size_t* re
#endif
bool can_filter_all = false;
- bool resize_first_column = _lazy_read_ctx.resize_first_column;
- if (resize_first_column &&
_table_format_reader->has_synthesized_column_handlers()) {
- int row_id_idx =
block->get_position_by_name(doris::BeConsts::ICEBERG_ROWID_COL);
- if (row_id_idx == 0) {
- resize_first_column = false;
- }
- }
{
SCOPED_RAW_TIMER(&_predicate_filter_time);
@@ -720,20 +716,8 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t
batch_size, size_t* re
.column->assume_mutable()
->clear();
}
- if (_row_id_column_iterator_pair.first != nullptr) {
- block->get_by_position(_row_id_column_iterator_pair.second)
- .column->assume_mutable()
- ->clear();
- }
- if (_table_format_reader->has_synthesized_column_handlers()) {
- int row_id_idx =
-
block->get_position_by_name(doris::BeConsts::ICEBERG_ROWID_COL);
- if (row_id_idx >= 0) {
- block->get_by_position(static_cast<size_t>(row_id_idx))
- .column->assume_mutable()
- ->clear();
- }
- }
+
RETURN_IF_ERROR(_table_format_reader->clear_synthesized_columns(block));
+
RETURN_IF_ERROR(_table_format_reader->clear_generated_columns(block));
Block::erase_useless_column(block, origin_column_num);
}
@@ -792,17 +776,8 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t
batch_size, size_t* re
{
SCOPED_RAW_TIMER(&_predicate_filter_time);
if (filter_map.has_filter()) {
- std::vector<uint32_t> predicate_columns =
_lazy_read_ctx.all_predicate_col_ids;
- if (_table_format_reader->has_synthesized_column_handlers()) {
- int row_id_idx =
block->get_position_by_name(doris::BeConsts::ICEBERG_ROWID_COL);
- if (row_id_idx >= 0 &&
- std::find(predicate_columns.begin(),
predicate_columns.end(),
- static_cast<uint32_t>(row_id_idx)) ==
predicate_columns.end()) {
-
predicate_columns.push_back(static_cast<uint32_t>(row_id_idx));
- }
- }
- RETURN_IF_CATCH_EXCEPTION(
- Block::filter_block_internal(block, predicate_columns,
result_filter));
+ RETURN_IF_CATCH_EXCEPTION(Block::filter_block_internal(
+ block, _lazy_read_ctx.all_predicate_col_ids,
result_filter));
Block::erase_useless_column(block, origin_column_num);
} else {
@@ -899,8 +874,8 @@ Status RowGroupReader::_read_empty_batch(size_t batch_size,
size_t* read_rows, b
_position_delete_ctx.current_row_id = end_row_id;
*batch_eof = _position_delete_ctx.current_row_id ==
_position_delete_ctx.last_row_id;
- if (_row_id_column_iterator_pair.first != nullptr ||
- _table_format_reader->has_synthesized_column_handlers()) {
+ if (_table_format_reader->has_synthesized_column_handlers() ||
+ _table_format_reader->has_generated_column_handlers()) {
*modify_row_ids = true;
_current_batch_row_ids.clear();
_current_batch_row_ids.resize(*read_rows);
@@ -924,7 +899,8 @@ Status RowGroupReader::_read_empty_batch(size_t batch_size,
size_t* read_rows, b
_remaining_rows = 0;
*batch_eof = true;
}
- if (_table_format_reader->has_synthesized_column_handlers()) {
+ if (_table_format_reader->has_synthesized_column_handlers() ||
+ _table_format_reader->has_generated_column_handlers()) {
*modify_row_ids = true;
RETURN_IF_ERROR(_get_current_batch_row_id(*read_rows));
}
@@ -961,53 +937,6 @@ Status RowGroupReader::_get_current_batch_row_id(size_t
read_rows) {
return Status::OK();
}
-Status RowGroupReader::fill_topn_row_id(Block* block, size_t read_rows) {
- if (_row_id_column_iterator_pair.first != nullptr) {
- // _get_current_batch_row_id must be called before
fill_synthesized_columns
- auto col = block->get_by_position(_row_id_column_iterator_pair.second)
- .column->assume_mutable();
- RETURN_IF_ERROR(_row_id_column_iterator_pair.first->read_by_rowids(
- _current_batch_row_ids.data(), _current_batch_row_ids.size(),
col));
- }
-
- if (_row_lineage_columns != nullptr &&
_row_lineage_columns->need_row_ids() &&
- _row_lineage_columns->first_row_id >= 0) {
- auto col =
block->get_by_position(_row_lineage_columns->row_id_column_idx)
- .column->assume_mutable();
- auto* nullable_column = assert_cast<ColumnNullable*>(col.get());
- auto& null_map = nullable_column->get_null_map_data();
- auto& data =
-
assert_cast<ColumnInt64&>(*nullable_column->get_nested_column_ptr()).get_data();
- for (size_t i = 0; i < read_rows; ++i) {
- if (null_map[i] != 0) {
- null_map[i] = 0;
- data[i] = _row_lineage_columns->first_row_id +
- static_cast<int64_t>(_current_batch_row_ids[i]);
- }
- }
- }
-
- if (_row_lineage_columns != nullptr &&
- _row_lineage_columns->has_last_updated_sequence_number_column() &&
- _row_lineage_columns->last_updated_sequence_number >= 0) {
- auto col = block->get_by_position(
-
_row_lineage_columns->last_updated_sequence_number_column_idx)
- .column->assume_mutable();
- auto* nullable_column = assert_cast<ColumnNullable*>(col.get());
- auto& null_map = nullable_column->get_null_map_data();
- auto& data =
-
assert_cast<ColumnInt64&>(*nullable_column->get_nested_column_ptr()).get_data();
- for (size_t i = 0; i < read_rows; ++i) {
- if (null_map[i] != 0) {
- null_map[i] = 0;
- data[i] = _row_lineage_columns->last_updated_sequence_number;
- }
- }
- }
-
- return Status::OK();
-}
-
Status RowGroupReader::_build_pos_delete_filter(size_t read_rows) {
if (!_position_delete_ctx.has_filter) {
_pos_delete_filter_ptr.reset(nullptr);
diff --git a/be/src/format/parquet/vparquet_group_reader.h
b/be/src/format/parquet/vparquet_group_reader.h
index dfb593f1ae7..edc6cb39117 100644
--- a/be/src/format/parquet/vparquet_group_reader.h
+++ b/be/src/format/parquet/vparquet_group_reader.h
@@ -187,7 +187,6 @@ public:
ParquetColumnReader::ColumnStatistics merged_column_statistics();
void set_remaining_rows(int64_t rows) { _remaining_rows = rows; }
- Status fill_topn_row_id(Block* block, size_t read_rows);
int64_t get_remaining_rows() { return _remaining_rows; }
@@ -197,12 +196,6 @@ public:
const std::vector<bool>& cache,
int64_t first_row,
int64_t base_granule = 0);
- void set_row_id_column_iterator(
- const
std::pair<std::shared_ptr<segment_v2::RowIdColumnIteratorV2>, int>&
- iterator_pair) {
- _row_id_column_iterator_pair = iterator_pair;
- }
-
void set_table_format_reader(TableFormatReader* reader) {
_table_format_reader = reader; }
// RowPositionProvider interface
@@ -304,8 +297,6 @@ private:
bool _is_row_group_filtered = false;
RowGroupIndex _current_row_group_idx {0, 0, 0};
- std::pair<std::shared_ptr<segment_v2::RowIdColumnIteratorV2>, int>
- _row_id_column_iterator_pair = {nullptr, -1};
std::vector<rowid_t> _current_batch_row_ids;
std::unordered_map<std::string, uint32_t>* _col_name_to_block_idx =
nullptr;
diff --git a/be/src/format/parquet/vparquet_reader.cpp
b/be/src/format/parquet/vparquet_reader.cpp
index 12832d06388..39b6c4ec5e0 100644
--- a/be/src/format/parquet/vparquet_reader.cpp
+++ b/be/src/format/parquet/vparquet_reader.cpp
@@ -403,6 +403,16 @@ Status
ParquetReader::on_before_init_reader(ReaderInitContext* ctx) {
if (desc.category == ColumnCategory::REGULAR ||
desc.category == ColumnCategory::GENERATED) {
ctx->column_names.push_back(desc.name);
+ } else if (desc.category == ColumnCategory::SYNTHESIZED &&
+ desc.name.starts_with(BeConsts::GLOBAL_ROWID_COL)) {
+ auto topn_row_id_column_iter =
_create_topn_row_id_column_iterator();
+ this->register_synthesized_column_handler(
+ desc.name,
+ [iter = std::move(topn_row_id_column_iter), this, &desc](
+ Block* block, size_t rows) -> Status {
+ return fill_topn_row_id(iter, desc.name, block, rows);
+ });
+ continue;
}
}
@@ -473,23 +483,6 @@ Status ParquetReader::_do_init_reader(ReaderInitContext*
base_ctx) {
// and standalone path (_fill_missing_cols empty, _table_info_node_ptr may
be null).
_init_read_columns(base_ctx->column_names);
- // Register row-position-based synthesized column handler.
- // _row_id_column_iterator_pair and _row_lineage_columns are set before
init_reader
- // by FileScanner. This must be outside has_column_descs() guard because
standalone
- // readers also need synthesized column handlers.
- if (_row_id_column_iterator_pair.first != nullptr ||
- (_row_lineage_columns != nullptr &&
- (_row_lineage_columns->need_row_ids() ||
- _row_lineage_columns->has_last_updated_sequence_number_column()))) {
- register_synthesized_column_handler(
- BeConsts::ROWID_COL, [this](Block* block, size_t rows) ->
Status {
- if (_current_group_reader) {
- return _current_group_reader->fill_topn_row_id(block,
rows);
- }
- return Status::OK();
- });
- }
-
// build column predicates for column lazy read
if (ctx->conjuncts != nullptr) {
_lazy_read_ctx.conjuncts = *ctx->conjuncts;
@@ -610,6 +603,9 @@ void
ParquetReader::_collect_predicate_columns_from_conjuncts(
auto and_pred = AndBlockColumnPredicate::create_unique();
for (const auto& entry : _lazy_read_ctx.slot_id_to_predicates) {
for (const auto& pred : entry.second) {
+ if (disable_column_opt(pred->col_name())) {
+ continue;
+ }
if (!_exists_in_file(pred->col_name()) ||
!_type_matches(pred->column_id())) {
continue;
}
@@ -624,23 +620,47 @@ void
ParquetReader::_collect_predicate_columns_from_conjuncts(
}
void ParquetReader::_classify_columns_for_lazy_read(
- const std::unordered_map<std::string, std::pair<uint32_t, int>>&
predicate_columns,
+ const std::unordered_map<std::string, std::pair<uint32_t, int>>&
+ predicate_conjuncts_columns,
const std::unordered_map<std::string, std::tuple<std::string, const
SlotDescriptor*>>&
partition_columns,
const std::unordered_map<std::string, VExprContextSPtr>&
missing_columns) {
const FieldDescriptor& schema = _file_metadata->schema();
+ auto predicate_columns = predicate_conjuncts_columns;
- auto check_iceberg_row_lineage_column_idx = [&](const auto& col_name) ->
int {
- if (_row_lineage_columns != nullptr) {
- if (col_name == IcebergTableReader::ROW_LINEAGE_ROW_ID) {
- return _row_lineage_columns->row_id_column_idx;
- } else if (col_name ==
IcebergTableReader::ROW_LINEAGE_LAST_UPDATED_SEQ_NUMBER) {
- return
_row_lineage_columns->last_updated_sequence_number_column_idx;
+ for (const auto& [col_name, _] : _generated_col_handlers) {
+ int slot_id = -1;
+ for (auto slot : _tuple_descriptor->slots()) {
+ if (slot->col_name() == col_name) {
+ slot_id = slot->id();
+ break;
}
}
- return -1;
- };
+ DCHECK(slot_id != -1) << "slot id should not be -1 for generated
column: " << col_name;
+ auto column_index = _row_descriptor->get_column_id(slot_id);
+ if (column_index == 0) {
+ _lazy_read_ctx.resize_first_column = false;
+ }
+ // assume generated columns are only used for predicate push down.
+ predicate_columns.emplace(col_name, std::make_pair(column_index,
slot_id));
+ }
+ for (const auto& [col_name, _] : _synthesized_col_handlers) {
+ int slot_id = -1;
+ for (auto slot : _tuple_descriptor->slots()) {
+ if (slot->col_name() == col_name) {
+ slot_id = slot->id();
+ break;
+ }
+ }
+ DCHECK(slot_id != -1) << "slot id should not be -1 for synthesized
column: " << col_name;
+ auto column_index = _row_descriptor->get_column_id(slot_id);
+ if (column_index == 0) {
+ _lazy_read_ctx.resize_first_column = false;
+ }
+ // synthesized columns always fill data on first phase.
+ _lazy_read_ctx.all_predicate_col_ids.emplace_back(column_index);
+ }
for (auto& read_table_col : _read_table_columns) {
_lazy_read_ctx.all_read_columns.emplace_back(read_table_col);
@@ -653,21 +673,7 @@ void ParquetReader::_classify_columns_for_lazy_read(
if (predicate_columns.size() > 0) {
auto iter = predicate_columns.find(read_table_col);
if (iter == predicate_columns.end()) {
- if (auto row_lineage_idx =
check_iceberg_row_lineage_column_idx(read_table_col);
- row_lineage_idx != -1) {
-
_lazy_read_ctx.predicate_columns.first.emplace_back(read_table_col);
- // row lineage column can not dict filter.
- int slot_id = 0;
- for (auto slot : _tuple_descriptor->slots()) {
- if (slot->col_name_lower_case() == read_table_col) {
- slot_id = slot->id();
- }
- }
-
_lazy_read_ctx.predicate_columns.second.emplace_back(slot_id);
-
_lazy_read_ctx.all_predicate_col_ids.emplace_back(row_lineage_idx);
- } else {
-
_lazy_read_ctx.lazy_read_columns.emplace_back(read_table_col);
- }
+ _lazy_read_ctx.lazy_read_columns.emplace_back(read_table_col);
} else {
_lazy_read_ctx.predicate_columns.first.emplace_back(iter->first);
_lazy_read_ctx.predicate_columns.second.emplace_back(iter->second.second);
@@ -675,9 +681,6 @@ void ParquetReader::_classify_columns_for_lazy_read(
}
}
}
- if (_row_id_column_iterator_pair.first != nullptr) {
-
_lazy_read_ctx.all_predicate_col_ids.emplace_back(_row_id_column_iterator_pair.second);
- }
for (auto& kv : partition_columns) {
auto iter = predicate_columns.find(kv.first);
@@ -701,10 +704,6 @@ void ParquetReader::_classify_columns_for_lazy_read(
}
_lazy_read_ctx.predicate_missing_columns.emplace(kv.first,
kv.second);
_lazy_read_ctx.all_predicate_col_ids.emplace_back(iter->second.first);
- } else if (auto row_lineage_idx =
check_iceberg_row_lineage_column_idx(kv.first);
- row_lineage_idx != -1) {
- _lazy_read_ctx.predicate_missing_columns.emplace(kv.first,
kv.second);
- _lazy_read_ctx.all_predicate_col_ids.emplace_back(row_lineage_idx);
} else {
_lazy_read_ctx.missing_columns.emplace(kv.first, kv.second);
}
@@ -949,8 +948,6 @@ Status ParquetReader::_next_row_group_reader() {
_row_group_eof = false;
_current_group_reader->set_current_row_group_idx(_current_row_group_index);
-
_current_group_reader->set_row_id_column_iterator(_row_id_column_iterator_pair);
- _current_group_reader->set_row_lineage_columns(_row_lineage_columns);
_current_group_reader->set_col_name_to_block_idx(_col_name_to_block_idx);
if (_condition_cache_ctx) {
_current_group_reader->set_condition_cache_context(_condition_cache_ctx);
diff --git a/be/src/format/parquet/vparquet_reader.h
b/be/src/format/parquet/vparquet_reader.h
index 676e76b34a1..f34c846cf00 100644
--- a/be/src/format/parquet/vparquet_reader.h
+++ b/be/src/format/parquet/vparquet_reader.h
@@ -173,9 +173,9 @@ public:
Status get_file_metadata_schema(const FieldDescriptor** ptr);
- void set_row_id_column_iterator(
- std::pair<std::shared_ptr<RowIdColumnIteratorV2>, int>
iterator_pair) {
- _row_id_column_iterator_pair = iterator_pair;
+ void set_create_row_id_column_iterator_func(
+
std::function<std::shared_ptr<segment_v2::RowIdColumnIteratorV2>()>
create_func) {
+ _create_topn_row_id_column_iterator = create_func;
}
/// Access current batch row positions (delegates to RowGroupReader).
@@ -184,8 +184,20 @@ public:
return _current_group_reader->current_batch_row_positions();
}
- void set_row_lineage_columns(std::shared_ptr<RowLineageColumns>
row_lineage_columns) {
- _row_lineage_columns = std::move(row_lineage_columns);
+ Status fill_topn_row_id(
+ std::shared_ptr<segment_v2::RowIdColumnIteratorV2>
_row_id_column_iterator,
+ std::string col_name, Block* block, size_t rows) {
+ int col_pos = block->get_position_by_name(col_name);
+ DCHECK(col_pos >= 0);
+ if (col_pos < 0) {
+ return Status::InternalError("Column {} not found in block",
col_name);
+ }
+ auto col = block->get_by_position(col_pos).column->assume_mutable();
+ const auto& row_ids = this->current_batch_row_positions();
+ RETURN_IF_ERROR(
+ _row_id_column_iterator->read_by_rowids(row_ids.data(),
row_ids.size(), col));
+
+ return Status::OK();
}
bool count_read_rows() override { return true; }
@@ -414,10 +426,6 @@ private:
const std::unordered_map<int, VExprContextSPtrs>*
_slot_id_to_filter_conjuncts = nullptr;
std::unordered_map<tparquet::Type::type, bool> _ignored_stats;
- std::pair<std::shared_ptr<RowIdColumnIteratorV2>, int>
_row_id_column_iterator_pair = {nullptr,
-
-1};
- std::shared_ptr<RowLineageColumns> _row_lineage_columns;
-
protected:
// Used for column lazy read. Protected so Iceberg/Paimon subclasses can
// register synthesized columns in on_before_init_reader.
@@ -425,6 +433,9 @@ protected:
bool _filter_groups = true;
size_t get_batch_size() const { return _batch_size; }
+ std::function<std::shared_ptr<segment_v2::RowIdColumnIteratorV2>()>
+ _create_topn_row_id_column_iterator;
+
private:
std::set<uint64_t> _column_ids;
std::set<uint64_t> _filter_column_ids;
diff --git a/be/src/format/table/hive_reader.cpp
b/be/src/format/table/hive_reader.cpp
index fc6dfbc025c..c6ab0126e9d 100644
--- a/be/src/format/table/hive_reader.cpp
+++ b/be/src/format/table/hive_reader.cpp
@@ -37,6 +37,16 @@ Status
HiveOrcReader::on_before_init_reader(ReaderInitContext* ctx) {
if (desc.category == ColumnCategory::REGULAR ||
desc.category == ColumnCategory::GENERATED) {
ctx->column_names.push_back(desc.name);
+ } else if (desc.category == ColumnCategory::SYNTHESIZED &&
+ desc.name.starts_with(BeConsts::GLOBAL_ROWID_COL)) {
+ auto topn_row_id_column_iter =
_create_topn_row_id_column_iterator();
+ this->register_synthesized_column_handler(
+ desc.name,
+ [iter = std::move(topn_row_id_column_iter), this, &desc](
+ Block* block, size_t rows) -> Status {
+ return fill_topn_row_id(iter, desc.name, block, rows);
+ });
+ continue;
}
}
@@ -214,6 +224,16 @@ Status
HiveParquetReader::on_before_init_reader(ReaderInitContext* ctx) {
if (desc.category == ColumnCategory::REGULAR ||
desc.category == ColumnCategory::GENERATED) {
ctx->column_names.push_back(desc.name);
+ } else if (desc.category == ColumnCategory::SYNTHESIZED &&
+ desc.name.starts_with(BeConsts::GLOBAL_ROWID_COL)) {
+ auto topn_row_id_column_iter =
_create_topn_row_id_column_iterator();
+ this->register_synthesized_column_handler(
+ desc.name,
+ [iter = std::move(topn_row_id_column_iter), this, &desc](
+ Block* block, size_t rows) -> Status {
+ return fill_topn_row_id(iter, desc.name, block, rows);
+ });
+ continue;
}
}
diff --git a/be/src/format/table/iceberg_reader.cpp
b/be/src/format/table/iceberg_reader.cpp
index b9afea2fb2a..680ec463103 100644
--- a/be/src/format/table/iceberg_reader.cpp
+++ b/be/src/format/table/iceberg_reader.cpp
@@ -165,16 +165,24 @@ Status
IcebergParquetReader::on_before_init_reader(ReaderInitContext* ctx) {
// Single pass: classify columns, detect $row_id, handle partition
fallback.
bool has_partition_from_path = false;
for (auto& desc : *ctx->column_descs) {
- if (desc.category == ColumnCategory::SYNTHESIZED &&
- desc.name == BeConsts::ICEBERG_ROWID_COL) {
- _need_row_id_column = true;
-
this->register_synthesized_column_handler(BeConsts::ICEBERG_ROWID_COL,
- [this](Block* block,
size_t rows) -> Status {
- return
_fill_iceberg_row_id(block, rows);
- });
- continue;
- }
- if (desc.category == ColumnCategory::REGULAR) {
+ if (desc.category == ColumnCategory::SYNTHESIZED) {
+ if (desc.name == BeConsts::ICEBERG_ROWID_COL) {
+ this->register_synthesized_column_handler(
+ BeConsts::ICEBERG_ROWID_COL, [this](Block* block,
size_t rows) -> Status {
+ return _fill_iceberg_row_id(block, rows);
+ });
+ continue;
+ } else if (desc.name.starts_with(BeConsts::GLOBAL_ROWID_COL)) {
+ auto topn_row_id_column_iter =
_create_topn_row_id_column_iterator();
+ this->register_synthesized_column_handler(
+ desc.name,
+ [iter = std::move(topn_row_id_column_iter), this,
&desc](
+ Block* block, size_t rows) -> Status {
+ return fill_topn_row_id(iter, desc.name, block,
rows);
+ });
+ continue;
+ }
+ } else if (desc.category == ColumnCategory::REGULAR) {
// Partition fallback: if column is a partition key and NOT in the
file
// (checked via field ID matching in table_info_node), read from
path instead.
if (partition_col_names.contains(desc.name) &&
@@ -187,7 +195,23 @@ Status
IcebergParquetReader::on_before_init_reader(ReaderInitContext* ctx) {
}
ctx->column_names.push_back(desc.name);
} else if (desc.category == ColumnCategory::GENERATED) {
- ctx->column_names.push_back(desc.name);
+ _init_row_lineage_columns();
+ if (desc.name == ROW_LINEAGE_ROW_ID) {
+ ctx->column_names.push_back(desc.name);
+ this->register_generated_column_handler(
+ ROW_LINEAGE_ROW_ID, [this](Block* block, size_t rows)
-> Status {
+ return _fill_row_lineage_row_id(block, rows);
+ });
+ continue;
+ } else if (desc.name == ROW_LINEAGE_LAST_UPDATED_SEQ_NUMBER) {
+ ctx->column_names.push_back(desc.name);
+ this->register_generated_column_handler(
+ ROW_LINEAGE_LAST_UPDATED_SEQ_NUMBER,
+ [this](Block* block, size_t rows) -> Status {
+ return
_fill_row_lineage_last_updated_sequence_number(block, rows);
+ });
+ continue;
+ }
}
}
@@ -436,16 +460,24 @@ Status
IcebergOrcReader::on_before_init_reader(ReaderInitContext* ctx) {
// Single pass: classify columns, detect $row_id, handle partition
fallback.
bool has_partition_from_path = false;
for (auto& desc : *ctx->column_descs) {
- if (desc.category == ColumnCategory::SYNTHESIZED &&
- desc.name == BeConsts::ICEBERG_ROWID_COL) {
- _need_row_id_column = true;
-
this->register_synthesized_column_handler(BeConsts::ICEBERG_ROWID_COL,
- [this](Block* block,
size_t rows) -> Status {
- return
_fill_iceberg_row_id(block, rows);
- });
- continue;
- }
- if (desc.category == ColumnCategory::REGULAR) {
+ if (desc.category == ColumnCategory::SYNTHESIZED) {
+ if (desc.name == BeConsts::ICEBERG_ROWID_COL) {
+ this->register_synthesized_column_handler(
+ BeConsts::ICEBERG_ROWID_COL, [this](Block* block,
size_t rows) -> Status {
+ return _fill_iceberg_row_id(block, rows);
+ });
+ continue;
+ } else if (desc.name.starts_with(BeConsts::GLOBAL_ROWID_COL)) {
+ auto topn_row_id_column_iter =
_create_topn_row_id_column_iterator();
+ this->register_synthesized_column_handler(
+ desc.name,
+ [iter = std::move(topn_row_id_column_iter), this,
&desc](
+ Block* block, size_t rows) -> Status {
+ return fill_topn_row_id(iter, desc.name, block,
rows);
+ });
+ continue;
+ }
+ } else if (desc.category == ColumnCategory::REGULAR) {
// Partition fallback: if column is a partition key and NOT in the
file
// (checked via field ID matching in table_info_node), read from
path instead.
if (partition_col_names.contains(desc.name) &&
@@ -458,7 +490,23 @@ Status
IcebergOrcReader::on_before_init_reader(ReaderInitContext* ctx) {
}
ctx->column_names.push_back(desc.name);
} else if (desc.category == ColumnCategory::GENERATED) {
- ctx->column_names.push_back(desc.name);
+ _init_row_lineage_columns();
+ if (desc.name == ROW_LINEAGE_ROW_ID) {
+ ctx->column_names.push_back(desc.name);
+ this->register_generated_column_handler(
+ ROW_LINEAGE_ROW_ID, [this](Block* block, size_t rows)
-> Status {
+ return _fill_row_lineage_row_id(block, rows);
+ });
+ continue;
+ } else if (desc.name == ROW_LINEAGE_LAST_UPDATED_SEQ_NUMBER) {
+ ctx->column_names.push_back(desc.name);
+ this->register_generated_column_handler(
+ ROW_LINEAGE_LAST_UPDATED_SEQ_NUMBER,
+ [this](Block* block, size_t rows) -> Status {
+ return
_fill_row_lineage_last_updated_sequence_number(block, rows);
+ });
+ continue;
+ }
}
}
diff --git a/be/src/format/table/iceberg_reader.h
b/be/src/format/table/iceberg_reader.h
index d21c661f207..39d38097657 100644
--- a/be/src/format/table/iceberg_reader.h
+++ b/be/src/format/table/iceberg_reader.h
@@ -61,23 +61,7 @@ class GenericReader;
class ShardedKVCache;
class VExprContext;
-struct RowLineageColumns {
- int row_id_column_idx = -1;
- int last_updated_sequence_number_column_idx = -1;
- int64_t first_row_id = -1;
- int64_t last_updated_sequence_number = -1;
-
- bool need_row_ids() const { return row_id_column_idx >= 0; }
- bool has_last_updated_sequence_number_column() const {
- return last_updated_sequence_number_column_idx >= 0;
- }
-};
-
struct IcebergTableReader {
- static constexpr const char* ROW_LINEAGE_ROW_ID = "_row_id";
- static constexpr const char* ROW_LINEAGE_LAST_UPDATED_SEQ_NUMBER =
- "_last_updated_sequence_number";
-
static bool _is_fully_dictionary_encoded(const tparquet::ColumnMetaData&
column_metadata);
};
diff --git a/be/src/format/table/iceberg_reader_mixin.h
b/be/src/format/table/iceberg_reader_mixin.h
index 57d3ec36ebf..a4855501612 100644
--- a/be/src/format/table/iceberg_reader_mixin.h
+++ b/be/src/format/table/iceberg_reader_mixin.h
@@ -104,6 +104,11 @@ public:
return BaseReader::_do_get_next_block(block, read_rows, eof);
}
+ void set_create_row_id_column_iterator_func(
+
std::function<std::shared_ptr<segment_v2::RowIdColumnIteratorV2>()>
create_func) {
+ _create_topn_row_id_column_iterator = create_func;
+ }
+
protected:
// ---- Hook implementations ----
@@ -141,6 +146,66 @@ protected:
return Status::OK();
}
+ void _init_row_lineage_columns() {
+ const auto& table_desc =
this->get_scan_range().table_format_params.iceberg_params;
+ if (table_desc.__isset.first_row_id) {
+ _row_lineage_columns.first_row_id = table_desc.first_row_id;
+ }
+ if (table_desc.__isset.last_updated_sequence_number) {
+ _row_lineage_columns.last_updated_sequence_number =
+ table_desc.last_updated_sequence_number;
+ }
+ }
+
+ Status _fill_row_lineage_row_id(Block* block, size_t rows) {
+ int col_pos = block->get_position_by_name(ROW_LINEAGE_ROW_ID);
+ DCHECK(col_pos >= 0);
+ if (col_pos < 0) {
+ return Status::InternalError("Row lineage column {} not found in
block",
+ ROW_LINEAGE_ROW_ID);
+ }
+
+ if (_row_lineage_columns.first_row_id >= 0) {
+ auto col =
block->get_by_position(col_pos).column->assume_mutable();
+ auto* nullable_column = assert_cast<ColumnNullable*>(col.get());
+ auto& null_map = nullable_column->get_null_map_data();
+ auto& data =
+
assert_cast<ColumnInt64&>(*nullable_column->get_nested_column_ptr()).get_data();
+ const auto& row_ids = this->current_batch_row_positions();
+ for (size_t i = 0; i < rows; ++i) {
+ if (null_map[i] != 0) {
+ null_map[i] = 0;
+ data[i] = _row_lineage_columns.first_row_id +
static_cast<int64_t>(row_ids[i]);
+ }
+ }
+ }
+ return Status::OK();
+ }
+
+ Status _fill_row_lineage_last_updated_sequence_number(Block* block, size_t
rows) {
+ int col_pos =
block->get_position_by_name(ROW_LINEAGE_LAST_UPDATED_SEQ_NUMBER);
+ DCHECK(col_pos >= 0);
+ if (col_pos < 0) {
+ return Status::InternalError("Row lineage column {} not found in
block",
+ ROW_LINEAGE_LAST_UPDATED_SEQ_NUMBER);
+ }
+
+ if (_row_lineage_columns.last_updated_sequence_number >= 0) {
+ auto col =
block->get_by_position(col_pos).column->assume_mutable();
+ auto* nullable_column = assert_cast<ColumnNullable*>(col.get());
+ auto& null_map = nullable_column->get_null_map_data();
+ auto& data =
+
assert_cast<ColumnInt64&>(*nullable_column->get_nested_column_ptr()).get_data();
+ for (size_t i = 0; i < rows; ++i) {
+ if (null_map[i] != 0) {
+ null_map[i] = 0;
+ data[i] =
_row_lineage_columns.last_updated_sequence_number;
+ }
+ }
+ }
+ return Status::OK();
+ }
+
// Called after reading a block: apply equality delete filter + shrink
block
Status on_after_read_block(Block* block, size_t* read_rows) override {
if (!_equality_delete_impls.empty()) {
@@ -304,6 +369,18 @@ protected:
// File column names used during init
std::vector<std::string> _file_col_names;
+
+ std::function<std::shared_ptr<segment_v2::RowIdColumnIteratorV2>()>
+ _create_topn_row_id_column_iterator;
+
+ static constexpr const char* ROW_LINEAGE_ROW_ID = "_row_id";
+ static constexpr const char* ROW_LINEAGE_LAST_UPDATED_SEQ_NUMBER =
+ "_last_updated_sequence_number";
+ struct RowLineageColumns {
+ int64_t first_row_id = -1;
+ int64_t last_updated_sequence_number = -1;
+ };
+ RowLineageColumns _row_lineage_columns;
};
// ============================================================================
diff --git a/be/src/format/table/table_format_reader.h
b/be/src/format/table/table_format_reader.h
index d8894209442..fe55f53ad5c 100644
--- a/be/src/format/table/table_format_reader.h
+++ b/be/src/format/table/table_format_reader.h
@@ -142,6 +142,53 @@ public:
return Status::OK();
}
+ Status clear_synthesized_columns(Block* block) {
+ for (auto& [name, handler] : _synthesized_col_handlers) {
+ int col_pos = block->get_position_by_name(name);
+ if (col_pos < 0) {
+ continue;
+ }
+
block->get_by_position(static_cast<size_t>(col_pos)).column->assume_mutable()->clear();
+ }
+ return Status::OK();
+ }
+
+ using GeneratedColumnHandler = std::function<Status(Block* block, size_t
rows)>;
+
+ // disable column lazy read, dict filter and min-max filter.
+ virtual bool disable_column_opt(std::string col_name) {
+ // generated columns may have complex expressions that are not
compatible with lazy read, dict filter, or min-max filter.
+ for (auto& [name, handler] : _generated_col_handlers) {
+ if (name == col_name) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ void register_generated_column_handler(const std::string& col_name,
+ GeneratedColumnHandler handler) {
+ _generated_col_handlers.emplace_back(col_name, std::move(handler));
+ }
+
+ Status fill_generated_columns(Block* block, size_t rows) {
+ for (auto& [name, handler] : _generated_col_handlers) {
+ RETURN_IF_ERROR(handler(block, rows));
+ }
+ return Status::OK();
+ }
+
+ Status clear_generated_columns(Block* block) {
+ for (auto& [name, handler] : _generated_col_handlers) {
+ int col_pos = block->get_position_by_name(name);
+ if (col_pos < 0) {
+ continue;
+ }
+
block->get_by_position(static_cast<size_t>(col_pos)).column->assume_mutable()->clear();
+ }
+ return Status::OK();
+ }
+
/// Unified fill for partition + missing + synthesized columns.
/// Called automatically by on_after_read_block for simple readers.
/// Parquet/ORC call individual on_fill_* methods per-batch internally.
@@ -157,11 +204,14 @@ public:
}
RETURN_IF_ERROR(on_fill_missing_columns(block, rows, miss_col_names));
RETURN_IF_ERROR(fill_synthesized_columns(block, rows));
+ RETURN_IF_ERROR(fill_generated_columns(block, rows));
return Status::OK();
}
bool has_synthesized_column_handlers() const { return
!_synthesized_col_handlers.empty(); }
+ bool has_generated_column_handlers() const { return
!_generated_col_handlers.empty(); }
+
/// Fill generated columns. Default is no-op.
virtual Status on_fill_generated_columns(Block* block, size_t rows,
const std::vector<std::string>&
cols) {
@@ -199,6 +249,9 @@ protected:
// ---- Synthesized column handlers ----
std::vector<std::pair<std::string, SynthesizedColumnHandler>>
_synthesized_col_handlers;
+
+ // ---- Generated column handlers ----
+ std::vector<std::pair<std::string, GeneratedColumnHandler>>
_generated_col_handlers;
};
#include "common/compile_check_end.h"
diff --git a/be/src/format/table/transactional_hive_reader.cpp
b/be/src/format/table/transactional_hive_reader.cpp
index 83c543448dd..3deacb3257c 100644
--- a/be/src/format/table/transactional_hive_reader.cpp
+++ b/be/src/format/table/transactional_hive_reader.cpp
@@ -63,6 +63,16 @@ Status
TransactionalHiveReader::on_before_init_reader(ReaderInitContext* ctx) {
if (desc.category == ColumnCategory::REGULAR ||
desc.category == ColumnCategory::GENERATED) {
_col_names.push_back(desc.name);
+ } else if (desc.category == ColumnCategory::SYNTHESIZED &&
+ desc.name.starts_with(BeConsts::GLOBAL_ROWID_COL)) {
+ auto topn_row_id_column_iter =
_create_topn_row_id_column_iterator();
+ this->register_synthesized_column_handler(
+ desc.name,
+ [iter = std::move(topn_row_id_column_iter), this, &desc](
+ Block* block, size_t rows) -> Status {
+ return fill_topn_row_id(iter, desc.name, block, rows);
+ });
+ continue;
}
}
@@ -100,7 +110,8 @@ Status
TransactionalHiveReader::on_before_init_reader(ReaderInitContext* ctx) {
if
(std::count(TransactionalHive::READ_ROW_COLUMN_NAMES_LOWER_CASE.begin(),
TransactionalHive::READ_ROW_COLUMN_NAMES_LOWER_CASE.end(), slot_name) > 0) {
- return Status::InternalError("xxxx");
+ return Status::InternalError("Column {} conflicts with ACID
metadata column",
+ slot_name);
}
if (row_names_map.contains(slot_name)) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
index e1e437e78db..bdcd309677e 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
@@ -202,9 +202,6 @@ public abstract class FileQueryScanNode extends
FileScanNode {
* Subclasses override this for format-specific classification.
*/
protected TColumnCategory classifyColumn(SlotDescriptor slot, List<String>
partitionKeys) {
- if
(Column.ICEBERG_ROWID_COL.equalsIgnoreCase(slot.getColumn().getName())) {
- return TColumnCategory.SYNTHESIZED;
- }
if (partitionKeys.contains(slot.getColumn().getName())) {
return TColumnCategory.PARTITION_KEY;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
index 3707b4bb238..9a8aec052f9 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
@@ -17,6 +17,7 @@
package org.apache.doris.datasource.hive.source;
+import org.apache.doris.analysis.SlotDescriptor;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
@@ -52,6 +53,7 @@ import org.apache.doris.planner.ScanContext;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.spi.Split;
+import org.apache.doris.thrift.TColumnCategory;
import org.apache.doris.thrift.TFileAttributes;
import org.apache.doris.thrift.TFileCompressType;
import org.apache.doris.thrift.TFileFormatType;
@@ -345,6 +347,14 @@ public class HiveScanNode extends FileQueryScanNode {
}
}
+ @Override
+ protected TColumnCategory classifyColumn(SlotDescriptor slot, List<String>
partitionKeys) {
+ if (slot.getColumn().getName().startsWith(Column.GLOBAL_ROWID_COL)) {
+ return TColumnCategory.SYNTHESIZED;
+ }
+ return super.classifyColumn(slot, partitionKeys);
+ }
+
private long determineTargetFileSplitSize(List<FileCacheValue> fileCaches,
boolean isBatchMode) {
if (sessionVariable.getFileSplitSize() > 0) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
index 121dc0ad152..adc2507e249 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
@@ -54,6 +54,7 @@ import org.apache.doris.planner.ScanContext;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.spi.Split;
+import org.apache.doris.thrift.TColumnCategory;
import org.apache.doris.thrift.TExplainLevel;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TFileRangeDesc;
@@ -877,6 +878,20 @@ public class IcebergScanNode extends FileQueryScanNode {
return split;
}
+ @Override
+ protected TColumnCategory classifyColumn(SlotDescriptor slot, List<String>
partitionKeys) {
+ if
(Column.ICEBERG_ROWID_COL.equalsIgnoreCase(slot.getColumn().getName())) {
+ return TColumnCategory.SYNTHESIZED;
+ }
+ if (slot.getColumn().getName().startsWith(Column.GLOBAL_ROWID_COL)) {
+ return TColumnCategory.SYNTHESIZED;
+ }
+ if (IcebergUtils.isIcebergRowLineageColumn(slot.getColumn())) {
+ return TColumnCategory.GENERATED;
+ }
+ return super.classifyColumn(slot, partitionKeys);
+ }
+
private List<Split> doGetSplits(int numBackends) throws UserException {
if (isSystemTable) {
return doGetSystemTableSplits();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java
index 16037395dc3..ce22e96c80b 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java
@@ -17,7 +17,9 @@
package org.apache.doris.datasource.tvf.source;
+import org.apache.doris.analysis.SlotDescriptor;
import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.FunctionGenTable;
import org.apache.doris.catalog.TableIf;
@@ -39,6 +41,7 @@ import org.apache.doris.system.Backend;
import org.apache.doris.tablefunction.ExternalFileTableValuedFunction;
import org.apache.doris.tablefunction.LocalTableValuedFunction;
import org.apache.doris.thrift.TBrokerFileStatus;
+import org.apache.doris.thrift.TColumnCategory;
import org.apache.doris.thrift.TFileAttributes;
import org.apache.doris.thrift.TFileCompressType;
import org.apache.doris.thrift.TFileFormatType;
@@ -169,6 +172,14 @@ public class TVFScanNode extends FileQueryScanNode {
return splits;
}
+ @Override
+ protected TColumnCategory classifyColumn(SlotDescriptor slot, List<String>
partitionKeys) {
+ if (slot.getColumn().getName().startsWith(Column.GLOBAL_ROWID_COL)) {
+ return TColumnCategory.SYNTHESIZED;
+ }
+ return super.classifyColumn(slot, partitionKeys);
+ }
+
private long determineTargetFileSplitSize(List<TBrokerFileStatus>
fileStatuses) {
if (sessionVariable.getFileSplitSize() > 0) {
return sessionVariable.getFileSplitSize();
diff --git
a/regression-test/suites/external_table_p0/iceberg/action/test_iceberg_v3_row_lineage_rewrite_data_files.groovy
b/regression-test/suites/external_table_p0/iceberg/action/test_iceberg_v3_row_lineage_rewrite_data_files.groovy
new file mode 100644
index 00000000000..438276c6946
--- /dev/null
+++
b/regression-test/suites/external_table_p0/iceberg/action/test_iceberg_v3_row_lineage_rewrite_data_files.groovy
@@ -0,0 +1,244 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_iceberg_v3_row_lineage_rewrite_data_files",
"p0,external,iceberg,external_docker,external_docker_iceberg") {
+ String enabled = context.config.otherConfigs.get("enableIcebergTest")
+ if (enabled == null || !enabled.equalsIgnoreCase("true")) {
+ logger.info("Iceberg test is disabled")
+ return
+ }
+
+ String catalogName = "test_iceberg_v3_row_lineage_rewrite_data_files"
+ String dbName = "test_row_lineage_rewrite_db"
+ String restPort = context.config.otherConfigs.get("iceberg_rest_uri_port")
+ String minioPort = context.config.otherConfigs.get("iceberg_minio_port")
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ String endpoint = "http://${externalEnvIp}:${minioPort}"
+
+ def formats = ["parquet", "orc"]
+
+ def schemaContainsField = { schemaRows, fieldName ->
+ String target = fieldName.toLowerCase()
+ return schemaRows.any { row ->
row.toString().toLowerCase().contains(target) }
+ }
+
+ def fileSchemaRows = { filePath, format ->
+ return sql("""
+ desc function s3(
+ "uri" = "${filePath}",
+ "format" = "${format}",
+ "s3.access_key" = "admin",
+ "s3.secret_key" = "password",
+ "s3.endpoint" = "${endpoint}",
+ "s3.region" = "us-east-1"
+ )
+ """)
+ }
+
+ def assertCurrentFilesContainRowLineageColumns = { tableName, format ->
+ def files = sql("""select file_path, lower(file_format) from
${tableName}\$files order by file_path""")
+ log.info("Checking rewritten files for physical row lineage columns in
${tableName}: ${files}")
+ assertTrue(files.size() > 0, "Current files should exist for
${tableName}")
+ files.each { row ->
+ assertEquals(format, row[1].toString())
+ assertTrue(row[0].toString().endsWith(format == "parquet" ?
".parquet" : ".orc"),
+ "Current data file should match ${format} for
${tableName}, file=${row[0]}")
+ def schemaRows = fileSchemaRows(row[0].toString(), format)
+ log.info("Rewritten ${format} schema for ${tableName},
file=${row[0]} -> ${schemaRows}")
+ assertTrue(schemaContainsField(schemaRows, "_row_id"),
+ "Rewritten file should physically contain _row_id for
${tableName}, schema=${schemaRows}")
+ assertTrue(schemaContainsField(schemaRows,
"_last_updated_sequence_number"),
+ "Rewritten file should physically contain
_last_updated_sequence_number for ${tableName}, schema=${schemaRows}")
+ }
+ }
+
+ def assertCurrentFilesDoNotContainRowLineageColumns = { tableName, format
->
+ def files = sql("""select file_path, lower(file_format) from
${tableName}\$files order by file_path""")
+ log.info("Checking regular INSERT files for absence of physical row
lineage columns in ${tableName}: ${files}")
+ assertTrue(files.size() > 0, "Current files should exist for
${tableName}")
+ files.each { row ->
+ assertEquals(format, row[1].toString())
+ assertTrue(row[0].toString().endsWith(format == "parquet" ?
".parquet" : ".orc"),
+ "Current data file should match ${format} for
${tableName}, file=${row[0]}")
+ def schemaRows = fileSchemaRows(row[0].toString(), format)
+ log.info("Regular INSERT ${format} schema for ${tableName},
file=${row[0]} -> ${schemaRows}")
+ assertTrue(!schemaContainsField(schemaRows, "_row_id"),
+ "Normal INSERT file should not contain _row_id for
${tableName}, schema=${schemaRows}")
+ assertTrue(!schemaContainsField(schemaRows,
"_last_updated_sequence_number"),
+ "Normal INSERT file should not contain
_last_updated_sequence_number for ${tableName}, schema=${schemaRows}")
+ }
+ }
+
+ def lineageMap = { tableName ->
+ def rows = sql("""
+ select id, _row_id, _last_updated_sequence_number
+ from ${tableName}
+ order by id
+ """)
+ Map<Integer, List<String>> result = [:]
+ rows.each { row ->
+ result[row[0].toString().toInteger()] = [row[1].toString(),
row[2].toString()]
+ }
+ log.info("Built lineage map for ${tableName}: ${result}")
+ return result
+ }
+
+ def assertLineageMapEquals = { expected, actual, tableName ->
+ log.info("Comparing lineage maps for ${tableName}:
expected=${expected}, actual=${actual}")
+ assertEquals(expected.size(), actual.size())
+ expected.each { key, value ->
+ assertTrue(actual.containsKey(key), "Missing id=${key} after
rewrite for ${tableName}")
+ assertEquals(value[0], actual[key][0])
+ assertEquals(value[1], actual[key][1])
+ }
+ }
+
+ def runRewriteAndAssert = { tableName, format, expectedCount ->
+ def filesBefore = sql("""select file_path from ${tableName}\$files
order by file_path""")
+ def snapshotsBefore = sql("""select snapshot_id from
${tableName}\$snapshots order by committed_at""")
+ log.info("Checking rewrite preconditions for ${tableName}:
filesBefore=${filesBefore}, snapshotsBefore=${snapshotsBefore}")
+ assertTrue(filesBefore.size() >= 2,
+ "Rewrite test requires at least 2 input files for
${tableName}, but got ${filesBefore.size()}")
+
+ def visibleBefore = sql("""select * from ${tableName} order by id""")
+ def rowLineageBefore = lineageMap(tableName)
+ log.info("Visible rows before rewrite for ${tableName}:
${visibleBefore}")
+
+ assertCurrentFilesDoNotContainRowLineageColumns(tableName, format)
+
+ def rewriteResult = sql("""
+ alter table ${catalogName}.${dbName}.${tableName}
+ execute rewrite_data_files(
+ "target-file-size-bytes" = "10485760",
+ "min-input-files" = "1"
+ )
+ """)
+ log.info("rewrite_data_files result for ${tableName}:
${rewriteResult}")
+ assertTrue(rewriteResult.size() > 0, "rewrite_data_files should return
summary rows for ${tableName}")
+ int rewrittenFiles = rewriteResult[0][0] as int
+ assertTrue(rewrittenFiles > 0, "rewrite_data_files should rewrite at
least one file for ${tableName}")
+
+ def visibleAfter = sql("""select * from ${tableName} order by id""")
+ log.info("Visible rows after rewrite for ${tableName}:
${visibleAfter}")
+ assertEquals(visibleBefore, visibleAfter)
+
+ def rowLineageAfter = lineageMap(tableName)
+ assertLineageMapEquals(rowLineageBefore, rowLineageAfter, tableName)
+
+ def countAfter = sql("""select count(*) from ${tableName}""")
+ log.info("Checking row count after rewrite for ${tableName}:
${countAfter}")
+ assertEquals(expectedCount, countAfter[0][0].toString().toInteger())
+
+ def snapshotsAfter = sql("""select snapshot_id from
${tableName}\$snapshots order by committed_at""")
+ log.info("Snapshots after rewrite for ${tableName}: ${snapshotsAfter}")
+ assertTrue(snapshotsAfter.size() > snapshotsBefore.size(),
+ "rewrite_data_files should create a new snapshot for
${tableName}")
+
+ assertCurrentFilesContainRowLineageColumns(tableName, format)
+
+ def sampleRowId = rowLineageAfter.entrySet().iterator().next().value[0]
+ def sampleQuery = sql("""select count(*) from ${tableName} where
_row_id = ${sampleRowId}""")
+ log.info("Checking sample _row_id predicate after rewrite for
${tableName}: sampleRowId=${sampleRowId}, result=${sampleQuery}")
+ assertEquals(1, sampleQuery[0][0].toString().toInteger())
+ }
+
+ sql """drop catalog if exists ${catalogName}"""
+ sql """
+ create catalog if not exists ${catalogName} properties (
+ "type" = "iceberg",
+ "iceberg.catalog.type" = "rest",
+ "uri" = "http://${externalEnvIp}:${restPort}",
+ "s3.access_key" = "admin",
+ "s3.secret_key" = "password",
+ "s3.endpoint" = "${endpoint}",
+ "s3.region" = "us-east-1"
+ )
+ """
+
+ sql """switch ${catalogName}"""
+ sql """create database if not exists ${dbName}"""
+ sql """use ${dbName}"""
+ sql """set enable_fallback_to_original_planner = false"""
+ sql """set show_hidden_columns = false"""
+
+ try {
+ formats.each { format ->
+ String rewriteTable =
"test_row_lineage_rewrite_unpartitioned_${format}"
+ String rewritePartitionTable =
"test_row_lineage_rewrite_partitioned_${format}"
+ log.info("Run rewrite_data_files row lineage test with format
${format}")
+
+ try {
+ sql """drop table if exists ${rewriteTable}"""
+ sql """
+ create table ${rewriteTable} (
+ id int,
+ name string,
+ score int
+ ) engine=iceberg
+ properties (
+ "format-version" = "3",
+ "write.format.default" = "${format}"
+ )
+ """
+
+ sql """insert into ${rewriteTable} values (1, 'A', 10), (2,
'B', 20)"""
+ sql """insert into ${rewriteTable} values (3, 'C', 30), (4,
'D', 40)"""
+ sql """insert into ${rewriteTable} values (5, 'E', 50), (6,
'F', 60)"""
+ log.info("Inserted three batches into ${rewriteTable} to
prepare rewrite_data_files input files")
+
+ // Assert baseline:
+ // 1. Data files from regular INSERT do not physically contain
the two row lineage columns.
+ // 2. After rewrite_data_files, every current data file should
contain both row lineage columns.
+ // 3. Visible query results stay unchanged before and after
rewrite.
+ // 4. _row_id and _last_updated_sequence_number stay stable
for every row across rewrite.
+ runRewriteAndAssert(rewriteTable, format, 6)
+
+ sql """drop table if exists ${rewritePartitionTable}"""
+ sql """
+ create table ${rewritePartitionTable} (
+ id int,
+ name string,
+ score int,
+ dt date
+ ) engine=iceberg
+ partition by list (day(dt)) ()
+ properties (
+ "format-version" = "3",
+ "write.format.default" = "${format}"
+ )
+ """
+
+ sql """insert into ${rewritePartitionTable} values (11, 'P1',
10, '2024-01-01'), (12, 'P2', 20, '2024-01-01')"""
+ sql """insert into ${rewritePartitionTable} values (13, 'P3',
30, '2024-01-01'), (14, 'P4', 40, '2024-02-01')"""
+ sql """insert into ${rewritePartitionTable} values (15, 'P5',
50, '2024-02-01'), (16, 'P6', 60, '2024-01-01')"""
+ log.info("Inserted three partitioned batches into
${rewritePartitionTable} to prepare rewrite_data_files input files")
+
+ // Assert baseline:
+ // 1. Partitioned tables also write row lineage columns
physically only during rewrite.
+ // 2. Business data and row lineage values stay stable before
and after rewrite.
+ // 3. _row_id predicate queries remain available after rewrite.
+ runRewriteAndAssert(rewritePartitionTable, format, 6)
+ } finally {
+ sql """drop table if exists ${rewritePartitionTable}"""
+ sql """drop table if exists ${rewriteTable}"""
+ }
+ }
+ } finally {
+ sql """drop database if exists ${dbName} force"""
+ sql """drop catalog if exists ${catalogName}"""
+ }
+}
diff --git
a/regression-test/suites/external_table_p0/iceberg/test_iceberg_v2_to_v3_doris_spark_compare.groovy
b/regression-test/suites/external_table_p0/iceberg/test_iceberg_v2_to_v3_doris_spark_compare.groovy
new file mode 100644
index 00000000000..df6d1bbea20
--- /dev/null
+++
b/regression-test/suites/external_table_p0/iceberg/test_iceberg_v2_to_v3_doris_spark_compare.groovy
@@ -0,0 +1,223 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_iceberg_v2_to_v3_doris_spark_compare",
"p0,external,iceberg,external_docker,external_docker_iceberg") {
+ def enabled = context.config.otherConfigs.get("enableIcebergTest")
+ if (enabled == null || !enabled.equalsIgnoreCase("true")) {
+ logger.info("Iceberg test is disabled")
+ return
+ }
+
+ def catalogName = "test_iceberg_v2_to_v3_doris_spark_compare"
+ def dbName = "test_v2_to_v3_doris_spark_compare_db"
+ def restPort = context.config.otherConfigs.get("iceberg_rest_uri_port")
+ def minioPort = context.config.otherConfigs.get("iceberg_minio_port")
+ def externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+
+ def formats = ["parquet", "orc"]
+
+ def tableNameForFormat = { baseName, format ->
+ return format == "parquet" ? baseName : "${baseName}_orc"
+ }
+
+ sql """drop catalog if exists ${catalogName}"""
+ sql """
+ create catalog if not exists ${catalogName} properties (
+ "type" = "iceberg",
+ "iceberg.catalog.type" = "rest",
+ "uri" = "http://${externalEnvIp}:${restPort}",
+ "s3.access_key" = "admin",
+ "s3.secret_key" = "password",
+ "s3.endpoint" = "http://${externalEnvIp}:${minioPort}",
+ "s3.region" = "us-east-1"
+ )
+ """
+
+ sql """switch ${catalogName}"""
+ sql """use ${dbName}"""
+ sql """set enable_fallback_to_original_planner = false"""
+
+ try {
+ def assertV2RowsAreNullAfterUpgrade = { tableName ->
+ def rows = sql """
+ select id, _row_id, _last_updated_sequence_number
+ from ${tableName}
+ order by id
+ """
+ assertEquals(2, rows.size())
+ rows.each { row ->
+ assertTrue(row[1] == null,
+ "_row_id should be null for v2 rows after upgrade in
${tableName}, row=${row}")
+ assertTrue(row[2] == null,
+ "_last_updated_sequence_number should be null for v2
rows after upgrade in ${tableName}, row=${row}")
+ }
+ }
+
+ def assertV23RowsNotNullAfterUpd = { tableName ->
+ def rows = sql """
+ select id, _row_id, _last_updated_sequence_number
+ from ${tableName}
+ order by id
+ """
+ rows.each { row ->
+ assertTrue(row[1] != null,
+ "_row_id should be non-null after Doris operator for
${tableName}")
+ assertTrue(row[2] != null,
+ "_last_updated_sequence_number should be non-null after
Doris operator for ${tableName}")
+
+ }
+ }
+
+ def upgradeV3DorisOperationInsert = { tableName ->
+ assertV2RowsAreNullAfterUpgrade(tableName)
+
+ sql """
+ insert into ${tableName} values
+ (4, 'post_v3_i', 400, date '2024-01-04')
+ """
+
+ def rows = sql """
+ select id, tag, score, _row_id, _last_updated_sequence_number
+ from ${tableName}
+ order by id
+ """
+ assertEquals(3, rows.size())
+ assertEquals(4, rows[2][0].toString().toInteger())
+ assertEquals("post_v3_i", rows[2][1])
+ assertV23RowsNotNullAfterUpd(tableName)
+ }
+
+ def upgradeV3DorisOperationDelete = { tableName ->
+ assertV2RowsAreNullAfterUpgrade(tableName)
+
+ sql """
+ delete from ${tableName}
+ where id = 3
+ """
+
+ def rows = sql """
+ select id, tag, score
+ from ${tableName}
+ order by id
+ """
+ assertEquals(1, rows.size())
+ assertEquals(1, rows[0][0].toString().toInteger())
+ assertV23RowsNotNullAfterUpd(tableName)
+
+ }
+
+ def upgradeV3DorisOperationUpdate = { tableName ->
+ assertV2RowsAreNullAfterUpgrade(tableName)
+
+ sql """
+ update ${tableName}
+ set tag = 'post_v3_u', score = score + 20
+ where id = 1
+ """
+
+ def rows = sql """
+ select id, tag, score
+ from ${tableName}
+ order by id
+ """
+ assertEquals(2, rows.size())
+ assertEquals(1, rows[0][0].toString().toInteger())
+ assertEquals("post_v3_u", rows[0][1])
+ assertV23RowsNotNullAfterUpd(tableName)
+ }
+
+ def upgradeV3DorisOperationRewrite = { tableName ->
+ assertV2RowsAreNullAfterUpgrade(tableName)
+
+ def rewriteResult = sql("""
+ alter table ${catalogName}.${dbName}.${tableName}
+ execute rewrite_data_files(
+ "target-file-size-bytes" = "10485760",
+ "min-input-files" = "1"
+ )
+ """)
+ assertTrue(rewriteResult.size() > 0,
+ "rewrite_data_files should return summary rows for
${tableName}")
+
+ def rowCount = sql """
+ select count(*)
+ from ${tableName}
+ """
+ assertEquals(2, rowCount[0][0].toString().toInteger())
+ assertV23RowsNotNullAfterUpd(tableName)
+ }
+
+ formats.each { format ->
+ def rowLineageNullTable =
tableNameForFormat("v2v3_row_lineage_null_after_upgrade", format)
+ def sparkReferenceTable =
tableNameForFormat("v2v3_spark_ops_reference", format)
+ def dorisTargetTable = tableNameForFormat("v2v3_doris_ops_target",
format)
+ log.info("Run v2-to-v3 Doris/Spark compare test with format
${format}")
+
+ def scenario1Rows = sql """
+ select id, _row_id, _last_updated_sequence_number
+ from ${rowLineageNullTable}
+ order by id
+ """
+ assertEquals(3, scenario1Rows.size())
+ scenario1Rows.each { row ->
+ assertTrue(row[1] == null,
+ "_row_id should be null for rows written before v3
upgrade, row=${row}")
+ assertTrue(row[2] == null,
+ "_last_updated_sequence_number should be null for rows
written before v3 upgrade, row=${row}")
+ }
+
+ sql """
+ update ${dorisTargetTable}
+ set tag = 'post_v3_u', score = score + 20
+ where id = 2
+ """
+
+ sql """
+ insert into ${dorisTargetTable} values
+ (4, 'post_v3_i', 400, date '2024-02-04')
+ """
+
+ def dorisRewriteResult = sql("""
+ alter table ${catalogName}.${dbName}.${dorisTargetTable}
+ execute rewrite_data_files(
+ "target-file-size-bytes" = "10485760",
+ "min-input-files" = "1"
+ )
+ """)
+ assertTrue(dorisRewriteResult.size() > 0,
+ "Doris rewrite_data_files should return summary rows")
+
+ check_sqls_result_equal """
+ select *
+ from ${dorisTargetTable}
+ order by id
+ """, """
+ select *
+ from ${sparkReferenceTable}
+ order by id
+ """
+
+
upgradeV3DorisOperationInsert(tableNameForFormat("v2v3_doris_upd_case1",
format))
+
upgradeV3DorisOperationDelete(tableNameForFormat("v2v3_doris_upd_case2",
format))
+
upgradeV3DorisOperationUpdate(tableNameForFormat("v2v3_doris_upd_case3",
format))
+
upgradeV3DorisOperationRewrite(tableNameForFormat("v2v3_doris_upd_case4",
format))
+ }
+
+ } finally {
+ sql """drop catalog if exists ${catalogName}"""
+ }
+}
diff --git
a/regression-test/suites/external_table_p0/iceberg/test_iceberg_v3_row_lineage_query_insert.groovy
b/regression-test/suites/external_table_p0/iceberg/test_iceberg_v3_row_lineage_query_insert.groovy
new file mode 100644
index 00000000000..7276fadba76
--- /dev/null
+++
b/regression-test/suites/external_table_p0/iceberg/test_iceberg_v3_row_lineage_query_insert.groovy
@@ -0,0 +1,304 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_iceberg_v3_row_lineage_query_insert",
"p0,external,iceberg,external_docker,external_docker_iceberg") {
+ String enabled = context.config.otherConfigs.get("enableIcebergTest")
+ if (enabled == null || !enabled.equalsIgnoreCase("true")) {
+ logger.info("Iceberg test is disabled")
+ return
+ }
+
+ String catalogName = "test_iceberg_v3_row_lineage_query_insert"
+ String dbName = "test_row_lineage_query_insert_db"
+ String restPort = context.config.otherConfigs.get("iceberg_rest_uri_port")
+ String minioPort = context.config.otherConfigs.get("iceberg_minio_port")
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ String endpoint = "http://${externalEnvIp}:${minioPort}"
+
+ def formats = ["parquet", "orc"]
+
+ def collectDescColumns = { rows ->
+ return rows.collect { row -> row[0].toString().toLowerCase() }
+ }
+
+ def schemaContainsField = { schemaRows, fieldName ->
+ String target = fieldName.toLowerCase()
+ return schemaRows.any { row ->
row.toString().toLowerCase().contains(target) }
+ }
+
+ def fileSchemaRows = { filePath, format ->
+ return sql("""
+ desc function s3(
+ "uri" = "${filePath}",
+ "format" = "${format}",
+ "s3.access_key" = "admin",
+ "s3.secret_key" = "password",
+ "s3.endpoint" = "${endpoint}",
+ "s3.region" = "us-east-1"
+ )
+ """)
+ }
+
+ def assertCurrentFilesDoNotContainRowLineageColumns = { tableName, format,
messagePrefix ->
+ def files = sql("""select file_path, lower(file_format) from
${tableName}\$files order by file_path""")
+ log.info("${messagePrefix}: checking ${files.size()} current data
files for ${tableName}: ${files}")
+ assertTrue(files.size() > 0, "Current data files should exist for
${tableName}")
+ files.each { row ->
+ assertEquals(format, row[1].toString())
+ assertTrue(row[0].toString().endsWith(format == "parquet" ?
".parquet" : ".orc"),
+ "${messagePrefix} should write ${format} files for
${tableName}, file=${row[0]}")
+ def schemaRows = fileSchemaRows(row[0].toString(), format)
+ log.info("${messagePrefix}: ${format} schema for ${tableName},
file=${row[0]} -> ${schemaRows}")
+ assertTrue(!schemaContainsField(schemaRows, "_row_id"),
+ "${messagePrefix} should not physically write _row_id,
schema=${schemaRows}")
+ assertTrue(!schemaContainsField(schemaRows,
"_last_updated_sequence_number"),
+ "${messagePrefix} should not physically write
_last_updated_sequence_number, schema=${schemaRows}")
+ }
+ }
+
+ def assertRowLineageHiddenColumns = { tableName, visibleColumnCount ->
+ sql("""set show_hidden_columns = false""")
+ def descDefault = sql("""desc ${tableName}""")
+ def defaultColumns = collectDescColumns(descDefault)
+ log.info("Checking hidden-column default visibility for ${tableName}:
desc=${descDefault}")
+ assertTrue(!defaultColumns.contains("_row_id"),
+ "DESC default should hide _row_id for ${tableName}, got
${defaultColumns}")
+ assertTrue(!defaultColumns.contains("_last_updated_sequence_number"),
+ "DESC default should hide _last_updated_sequence_number for
${tableName}, got ${defaultColumns}")
+
+ def selectVisible = sql("""select * from ${tableName} order by id""")
+ log.info("Checking visible SELECT * layout for ${tableName}:
rowCount=${selectVisible.size()}, firstRow=${selectVisible ? selectVisible[0] :
'EMPTY'}")
+ assertTrue(selectVisible.size() > 0, "SELECT * should return rows for
${tableName}")
+ assertEquals(visibleColumnCount, selectVisible[0].size())
+
+ sql("""set show_hidden_columns = true""")
+ def descHidden = sql("""desc ${tableName}""")
+ def hiddenColumns = collectDescColumns(descHidden)
+ log.info("Checking hidden-column enabled visibility for ${tableName}:
desc=${descHidden}")
+ assertTrue(hiddenColumns.contains("_row_id"),
+ "DESC with show_hidden_columns=true should expose _row_id for
${tableName}, got ${hiddenColumns}")
+ assertTrue(hiddenColumns.contains("_last_updated_sequence_number"),
+ "DESC with show_hidden_columns=true should expose
_last_updated_sequence_number for ${tableName}, got ${hiddenColumns}")
+
+ def selectHidden = sql("""select * from ${tableName} order by id""")
+ log.info("Checking hidden SELECT * layout for ${tableName}:
rowCount=${selectHidden.size()}, firstRow=${selectHidden ? selectHidden[0] :
'EMPTY'}")
+ assertTrue(selectHidden.size() > 0, "SELECT * with hidden columns
should return rows for ${tableName}")
+ assertEquals(visibleColumnCount + 2 + 1, selectHidden[0].size()) //
_row_id + _last_updated_sequence_number + __DORIS_ICEBERG_ROWID_COL__
+
+ sql("""set show_hidden_columns = false""")
+ }
+
+ def assertExplicitRowLineageReadable = { tableName, expectedIds ->
+ def rowLineageRows = sql("""
+ select id, _row_id, _last_updated_sequence_number
+ from ${tableName}
+ order by id
+ """)
+ log.info("Checking explicit row lineage projection for ${tableName}:
rows=${rowLineageRows}")
+ assertEquals(expectedIds.size(), rowLineageRows.size())
+ for (int i = 0; i < expectedIds.size(); i++) {
+ assertEquals(expectedIds[i],
rowLineageRows[i][0].toString().toInteger())
+ assertTrue(rowLineageRows[i][1] != null,
+ "_row_id should be non-null for ${tableName},
row=${rowLineageRows[i]}")
+ assertTrue(rowLineageRows[i][2] != null,
+ "_last_updated_sequence_number should be non-null for
${tableName}, row=${rowLineageRows[i]}")
+ }
+
+ long firstRowId = rowLineageRows[0][1].toString().toLong()
+ long secondRowId = rowLineageRows[1][1].toString().toLong()
+ assertTrue(firstRowId < secondRowId,
+ "Row lineage ids should increase with row position for
${tableName}, rows=${rowLineageRows}")
+
+ def byRowId = sql("""select id from ${tableName} where _row_id =
${firstRowId} order by id""")
+ log.info("Checking single _row_id predicate for ${tableName}:
rowId=${firstRowId}, result=${byRowId}")
+ assertEquals(1, byRowId.size())
+ assertEquals(expectedIds[0], byRowId[0][0].toString().toInteger())
+
+ def combinedPredicate = sql("""
+ select id
+ from ${tableName}
+ where id >= ${expectedIds[1]} and _row_id in
(${rowLineageRows[1][1]}, ${rowLineageRows[2][1]})
+ order by id
+ """)
+ log.info("Checking combined business + _row_id predicate for
${tableName}: result=${combinedPredicate}")
+ assertEquals(2, combinedPredicate.size())
+ assertEquals(expectedIds[1],
combinedPredicate[0][0].toString().toInteger())
+ assertEquals(expectedIds[2],
combinedPredicate[1][0].toString().toInteger())
+ }
+
+ sql """drop catalog if exists ${catalogName}"""
+ sql """
+ create catalog if not exists ${catalogName} properties (
+ "type" = "iceberg",
+ "iceberg.catalog.type" = "rest",
+ "uri" = "http://${externalEnvIp}:${restPort}",
+ "s3.access_key" = "admin",
+ "s3.secret_key" = "password",
+ "s3.endpoint" = "${endpoint}",
+ "s3.region" = "us-east-1"
+ )
+ """
+
+ sql """switch ${catalogName}"""
+ sql """create database if not exists ${dbName}"""
+ sql """use ${dbName}"""
+ sql """set enable_fallback_to_original_planner = false"""
+ sql """set show_hidden_columns = false"""
+
+ try {
+ formats.each { format ->
+ String unpartitionedTable =
"test_row_lineage_query_insert_unpartitioned_${format}"
+ String partitionedTable =
"test_row_lineage_query_insert_partitioned_${format}"
+ log.info("Run row lineage query/insert test with format ${format}")
+
+ try {
+ sql """drop table if exists ${unpartitionedTable}"""
+ sql """
+ create table ${unpartitionedTable} (
+ id int,
+ name string,
+ age int
+ ) engine=iceberg
+ properties (
+ "format-version" = "3",
+ "write.format.default" = "${format}"
+ )
+ """
+
+ sql """
+ insert into ${unpartitionedTable} values(1, 'Alice', 25);
+ """
+ sql """ insert into ${unpartitionedTable} values(2, 'Bob', 30)
"""
+ sql """ insert into ${unpartitionedTable} values(3, 'Charlie',
35) """
+
+ log.info("Inserted initial rows into ${unpartitionedTable}")
+
+ // Assert baseline:
+ // 1. DESC and SELECT * hide row lineage columns by default.
+ // 2. show_hidden_columns=true exposes both hidden columns in
DESC and SELECT *.
+ // 3. Explicit SELECT on row lineage columns returns non-null
values.
+ assertRowLineageHiddenColumns(unpartitionedTable, 3)
+ assertExplicitRowLineageReadable(unpartitionedTable, [1, 2, 3])
+
+ test {
+ sql """insert into ${unpartitionedTable}(_row_id, id,
name, age) values (1, 9, 'BadRow', 99)"""
+ exception "Cannot specify row lineage column '_row_id' in
INSERT statement"
+ }
+
+ test {
+ sql """
+ insert into
${unpartitionedTable}(_last_updated_sequence_number, id, name, age)
+ values (1, 10, 'BadSeq', 100)
+ """
+ exception "Cannot specify row lineage column
'_last_updated_sequence_number' in INSERT statement"
+ }
+
+ sql """insert into ${unpartitionedTable}(id, name, age) values
(4, 'Doris', 40)"""
+ def unpartitionedCount = sql """select count(*) from
${unpartitionedTable}"""
+ log.info("Checking row count after regular INSERT for
${unpartitionedTable}: result=${unpartitionedCount}")
+ assertEquals(4,
unpartitionedCount[0][0].toString().toInteger())
+
+ assertCurrentFilesDoNotContainRowLineageColumns(
+ unpartitionedTable,
+ format,
+ "Unpartitioned normal INSERT")
+
+ sql """drop table if exists ${partitionedTable}"""
+ sql """
+ create table ${partitionedTable} (
+ id int,
+ name string,
+ age int,
+ dt date
+ ) engine=iceberg
+ partition by list (day(dt)) ()
+ properties (
+ "format-version" = "3",
+ "write.format.default" = "${format}"
+ )
+ """
+
+ sql """ insert into ${partitionedTable} values(11, 'Penny',
21, '2024-01-01')"""
+ sql """ insert into ${partitionedTable} values(12, 'Quinn',
22, '2024-01-02')"""
+ sql """ insert into ${partitionedTable} values(13, 'Rita', 23,
'2024-01-03')"""
+
+ log.info("Inserted initial rows into ${partitionedTable}")
+
+ // Assert baseline:
+ // 1. Partitioned tables follow the same row lineage semantics
as unpartitioned tables.
+ // 2. Explicit SELECT on _row_id remains readable under
partition predicates.
+ // 3. Regular INSERT still rejects hidden columns and does not
write them physically.
+ assertRowLineageHiddenColumns(partitionedTable, 4)
+
+ def partitionLineageRows = sql """
+ select id, _row_id, _last_updated_sequence_number
+ from ${partitionedTable}
+ where dt >= '2024-01-01'
+ order by id
+ """
+ log.info("Checking partitioned row lineage projection for
${partitionedTable}: rows=${partitionLineageRows}")
+ assertEquals(3, partitionLineageRows.size())
+ partitionLineageRows.each { row ->
+ assertTrue(row[1] != null, "_row_id should be non-null for
partitioned table row=${row}")
+ assertTrue(row[2] != null, "_last_updated_sequence_number
should be non-null for partitioned table row=${row}")
+ }
+
+ def exactPartitionPredicate = sql """
+ select id
+ from ${partitionedTable}
+ where dt = '2024-01-02' and _row_id =
${partitionLineageRows[1][1]}
+ """
+ log.info("Checking exact partition + _row_id predicate for
${partitionedTable}: result=${exactPartitionPredicate}")
+ assertEquals(1, exactPartitionPredicate.size())
+ assertEquals(12,
exactPartitionPredicate[0][0].toString().toInteger())
+
+ test {
+ sql """
+ insert into ${partitionedTable}(_row_id, id, name,
age, dt)
+ values (1, 14, 'BadPartitionRow', 24, '2024-01-04')
+ """
+ exception "Cannot specify row lineage column '_row_id' in
INSERT statement"
+ }
+
+ test {
+ sql """
+ insert into
${partitionedTable}(_last_updated_sequence_number, id, name, age, dt)
+ values (1, 15, 'BadPartitionSeq', 25, '2024-01-05')
+ """
+ exception "Cannot specify row lineage column
'_last_updated_sequence_number' in INSERT statement"
+ }
+
+ sql """insert into ${partitionedTable}(id, name, age, dt)
values (14, 'Sara', 24, '2024-01-04')"""
+ def partitionedCount = sql """select count(*) from
${partitionedTable}"""
+ log.info("Checking row count after regular INSERT for
${partitionedTable}: result=${partitionedCount}")
+ assertEquals(4, partitionedCount[0][0].toString().toInteger())
+
+ assertCurrentFilesDoNotContainRowLineageColumns(
+ partitionedTable,
+ format,
+ "Partitioned normal INSERT")
+ } finally {
+ sql """drop table if exists ${partitionedTable}"""
+ sql """drop table if exists ${unpartitionedTable}"""
+ }
+ }
+ } finally {
+ sql """set show_hidden_columns = false"""
+ sql """drop database if exists ${dbName} force"""
+ sql """drop catalog if exists ${catalogName}"""
+ }
+}
diff --git
a/regression-test/suites/external_table_p0/iceberg/test_iceberg_v3_row_lineage_update_delete_merge.groovy
b/regression-test/suites/external_table_p0/iceberg/test_iceberg_v3_row_lineage_update_delete_merge.groovy
new file mode 100644
index 00000000000..4bce7387f86
--- /dev/null
+++
b/regression-test/suites/external_table_p0/iceberg/test_iceberg_v3_row_lineage_update_delete_merge.groovy
@@ -0,0 +1,292 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_iceberg_v3_row_lineage_update_delete_merge",
"p0,external,iceberg,external_docker,external_docker_iceberg") {
+ String enabled = context.config.otherConfigs.get("enableIcebergTest")
+ if (enabled == null || !enabled.equalsIgnoreCase("true")) {
+ logger.info("Iceberg test is disabled")
+ return
+ }
+
+ String catalogName = "test_iceberg_v3_row_lineage_update_delete_merge"
+ String dbName = "test_row_lineage_update_delete_merge_db"
+ String restPort = context.config.otherConfigs.get("iceberg_rest_uri_port")
+ String minioPort = context.config.otherConfigs.get("iceberg_minio_port")
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ String endpoint = "http://${externalEnvIp}:${minioPort}"
+
+ def formats = ["parquet", "orc"]
+
+ def schemaContainsField = { schemaRows, fieldName ->
+ String target = fieldName.toLowerCase()
+ return schemaRows.any { row ->
row.toString().toLowerCase().contains(target) }
+ }
+
+ def fileSchemaRows = { filePath, format ->
+ return sql("""
+ desc function s3(
+ "uri" = "${filePath}",
+ "format" = "${format}",
+ "s3.access_key" = "admin",
+ "s3.secret_key" = "password",
+ "s3.endpoint" = "${endpoint}",
+ "s3.region" = "us-east-1"
+ )
+ """)
+ }
+
+ def assertDeleteFilesArePuffin = { tableName ->
+ def deleteFiles = sql("""
+ select file_path, lower(file_format)
+ from ${tableName}\$delete_files
+ order by file_path
+ """)
+ log.info("Checking delete files for ${tableName}: ${deleteFiles}")
+ assertTrue(deleteFiles.size() > 0, "V3 table ${tableName} should
produce delete files")
+ deleteFiles.each { row ->
+ assertTrue(row[0].toString().endsWith(".puffin"),
+ "V3 delete file should be Puffin: ${row}")
+ assertEquals("puffin", row[1].toString())
+ }
+ }
+
+ def assertAtLeastOneCurrentDataFileHasRowLineageColumns = { tableName,
format ->
+ def currentFiles = sql("""select file_path, lower(file_format) from
${tableName}\$data_files order by file_path""")
+ log.info("Checking current data files for physical row lineage columns
in ${tableName}: ${currentFiles}")
+ assertTrue(currentFiles.size() > 0, "Current data files should exist
for ${tableName}")
+
+ boolean found = false
+ currentFiles.each { row ->
+ assertEquals(format, row[1].toString())
+ assertTrue(row[0].toString().endsWith(format == "parquet" ?
".parquet" : ".orc"),
+ "Current data file should match ${format} for
${tableName}, file=${row[0]}")
+ def schemaRows = fileSchemaRows(row[0].toString(), format)
+ log.info("${format} schema for ${tableName}, file=${row[0]} ->
${schemaRows}")
+ if (schemaContainsField(schemaRows, "_row_id")
+ && schemaContainsField(schemaRows,
"_last_updated_sequence_number")) {
+ found = true
+ }
+ }
+ assertTrue(found, "At least one current data file should physically
contain row lineage columns for ${tableName}")
+ }
+
+ def assertExplicitRowLineageNonNull = { tableName, expectedRowCount ->
+ def rows = sql("""
+ select id, _row_id, _last_updated_sequence_number
+ from ${tableName}
+ order by id
+ """)
+ log.info("Checking explicit row lineage projection for ${tableName}:
rows=${rows}")
+ assertEquals(expectedRowCount, rows.size())
+ rows.each { row ->
+ assertTrue(row[1] != null, "_row_id should be non-null for
${tableName}, row=${row}")
+ assertTrue(row[2] != null, "_last_updated_sequence_number should
be non-null for ${tableName}, row=${row}")
+ }
+ }
+
+ def lineageMap = { tableName ->
+ def rows = sql("""
+ select id, _row_id, _last_updated_sequence_number
+ from ${tableName}
+ order by id
+ """)
+ Map<Integer, List<String>> result = [:]
+ rows.each { row ->
+ result[row[0].toString().toInteger()] = [row[1].toString(),
row[2].toString()]
+ }
+ log.info("Built lineage map for ${tableName}: ${result}")
+ return result
+ }
+
+ sql """drop catalog if exists ${catalogName}"""
+ sql """
+ create catalog if not exists ${catalogName} properties (
+ "type" = "iceberg",
+ "iceberg.catalog.type" = "rest",
+ "uri" = "http://${externalEnvIp}:${restPort}",
+ "s3.access_key" = "admin",
+ "s3.secret_key" = "password",
+ "s3.endpoint" = "${endpoint}",
+ "s3.region" = "us-east-1"
+ )
+ """
+
+ sql """switch ${catalogName}"""
+ sql """create database if not exists ${dbName}"""
+ sql """use ${dbName}"""
+ sql """set enable_fallback_to_original_planner = false"""
+ sql """set show_hidden_columns = false"""
+
+ try {
+ formats.each { format ->
+ String updateDeleteTable =
"test_row_lineage_v3_update_delete_${format}"
+ String mergeTable = "test_row_lineage_v3_merge_${format}"
+ log.info("Run row lineage update/delete/merge test with format
${format}")
+
+ try {
+ sql """drop table if exists ${updateDeleteTable}"""
+ sql """
+ create table ${updateDeleteTable} (
+ id int,
+ name string,
+ age int
+ ) engine=iceberg
+ properties (
+ "format-version" = "3",
+ "write.format.default" = "${format}"
+ )
+ """
+
+ sql """insert into ${updateDeleteTable} values (1, 'Alice',
25) """
+ sql """insert into ${updateDeleteTable} values (2, 'Bob', 30)
"""
+ sql """insert into ${updateDeleteTable} values (3, 'Charlie',
35)"""
+
+ def updateDeleteLineageBefore = lineageMap(updateDeleteTable)
+ log.info("Lineage before UPDATE/DELETE on
${updateDeleteTable}: ${updateDeleteLineageBefore}")
+ sql """update ${updateDeleteTable} set name = 'Alice_u', age =
26 where id = 1"""
+ sql """delete from ${updateDeleteTable} where id = 2"""
+
+ // Assert baseline:
+ // 1. UPDATE keeps rows readable and applies the new values.
+ // 2. DELETE removes the target row.
+ // 3. V3 delete files use Puffin deletion vectors instead of
delete_pos parquet/orc files.
+ // 4. Explicit row lineage reads remain non-null after DML.
+ def updateDeleteRows = sql """select * from
${updateDeleteTable} order by id"""
+ log.info("Checking table rows after UPDATE/DELETE on
${updateDeleteTable}: ${updateDeleteRows}")
+ assertEquals(2, updateDeleteRows.size())
+ assertEquals(1, updateDeleteRows[0][0].toString().toInteger())
+ assertEquals("Alice_u", updateDeleteRows[0][1])
+ assertEquals(26, updateDeleteRows[0][2].toString().toInteger())
+ assertEquals(3, updateDeleteRows[1][0].toString().toInteger())
+ assertEquals("Charlie", updateDeleteRows[1][1])
+ assertEquals(35, updateDeleteRows[1][2].toString().toInteger())
+
+ assertExplicitRowLineageNonNull(updateDeleteTable, 2)
+ def updateDeleteLineageAfter = lineageMap(updateDeleteTable)
+ log.info("Lineage after UPDATE/DELETE on ${updateDeleteTable}:
${updateDeleteLineageAfter}")
+ assertEquals(updateDeleteLineageBefore[1][0],
updateDeleteLineageAfter[1][0])
+ assertTrue(updateDeleteLineageBefore[1][1] !=
updateDeleteLineageAfter[1][1],
+ "UPDATE should change _last_updated_sequence_number
for id=1")
+ assertTrue(updateDeleteLineageAfter[1][1].toLong() >
updateDeleteLineageBefore[1][1].toLong(),
+ "UPDATE should advance _last_updated_sequence_number
for id=1")
+ assertEquals(updateDeleteLineageBefore[3][0],
updateDeleteLineageAfter[3][0])
+ assertEquals(updateDeleteLineageBefore[3][1],
updateDeleteLineageAfter[3][1])
+ assertTrue(!updateDeleteLineageAfter.containsKey(2), "Deleted
row id=2 should not remain after DELETE")
+ assertDeleteFilesArePuffin(updateDeleteTable)
+
assertAtLeastOneCurrentDataFileHasRowLineageColumns(updateDeleteTable, format)
+
+ def minRowIdAfterUpdate = sql """
+ select min(_row_id)
+ from ${updateDeleteTable}
+ """
+ def rowIdFilterResult = sql """
+ select count(*)
+ from ${updateDeleteTable}
+ where _row_id = ${minRowIdAfterUpdate[0][0]}
+ """
+ log.info("Checking _row_id filter after UPDATE/DELETE on
${updateDeleteTable}: minRowId=${minRowIdAfterUpdate},
result=${rowIdFilterResult}")
+ assertEquals(1, rowIdFilterResult[0][0].toString().toInteger())
+
+ sql """drop table if exists ${mergeTable}"""
+ sql """
+ create table ${mergeTable} (
+ id int,
+ name string,
+ age int,
+ dt date
+ ) engine=iceberg
+ partition by list (day(dt)) ()
+ properties (
+ "format-version" = "3",
+ "write.format.default" = "${format}"
+ )
+ """
+
+ sql """ insert into ${mergeTable} values (1, 'Penny', 21,
'2024-01-01') """
+ sql """ insert into ${mergeTable} values (2, 'Quinn', 22,
'2024-01-02') """
+ sql """ insert into ${mergeTable} values (3, 'Rita', 23,
'2024-01-03') """
+
+ def mergeLineageBefore = lineageMap(mergeTable)
+ log.info("Lineage before MERGE on ${mergeTable}:
${mergeLineageBefore}")
+ sql """
+ merge into ${mergeTable} t
+ using (
+ select 1 as id, 'Penny_u' as name, 31 as age, date
'2024-01-01' as dt, 'U' as flag
+ union all
+ select 2, 'Quinn', 22, date '2024-01-02', 'D'
+ union all
+ select 4, 'Sara', 24, date '2024-01-04', 'I'
+ ) s
+ on t.id = s.id
+ when matched and s.flag = 'D' then delete
+ when matched then update set
+ name = s.name,
+ age = s.age
+ when not matched then insert (id, name, age, dt)
+ values (s.id, s.name, s.age, s.dt)
+ """
+
+ // Assert baseline:
+ // 1. MERGE applies DELETE, UPDATE, and INSERT actions in one
statement.
+ // 2. The partitioned MERGE still writes Puffin deletion
vectors.
+ // 3. At least one current data file written by MERGE contains
physical row lineage columns.
+ def mergeRows = sql """select * from ${mergeTable} order by
id"""
+ log.info("Checking table rows after MERGE on ${mergeTable}:
${mergeRows}")
+ assertEquals(3, mergeRows.size())
+ assertEquals(1, mergeRows[0][0].toString().toInteger())
+ assertEquals("Penny_u", mergeRows[0][1])
+ assertEquals(31, mergeRows[0][2].toString().toInteger())
+ assertEquals(3, mergeRows[1][0].toString().toInteger())
+ assertEquals("Rita", mergeRows[1][1])
+ assertEquals(23, mergeRows[1][2].toString().toInteger())
+ assertEquals(4, mergeRows[2][0].toString().toInteger())
+ assertEquals("Sara", mergeRows[2][1])
+ assertEquals(24, mergeRows[2][2].toString().toInteger())
+
+ assertExplicitRowLineageNonNull(mergeTable, 3)
+ def mergeLineageAfter = lineageMap(mergeTable)
+ log.info("Lineage after MERGE on ${mergeTable}:
${mergeLineageAfter}")
+ assertEquals(mergeLineageBefore[1][0], mergeLineageAfter[1][0])
+ assertTrue(mergeLineageBefore[1][1] != mergeLineageAfter[1][1],
+ "MERGE UPDATE should change
_last_updated_sequence_number for id=1")
+ assertTrue(mergeLineageAfter[1][1].toLong() >
mergeLineageBefore[1][1].toLong(),
+ "MERGE UPDATE should advance
_last_updated_sequence_number for id=1")
+ assertEquals(mergeLineageBefore[3][0], mergeLineageAfter[3][0])
+ assertEquals(mergeLineageBefore[3][1], mergeLineageAfter[3][1])
+ assertTrue(!mergeLineageAfter.containsKey(2), "MERGE DELETE
should remove id=2")
+ assertDeleteFilesArePuffin(mergeTable)
+
assertAtLeastOneCurrentDataFileHasRowLineageColumns(mergeTable, format)
+
+ def insertedRowLineage = sql """
+ select _row_id, _last_updated_sequence_number
+ from ${mergeTable}
+ where id = 4
+ """
+ log.info("Checking inserted MERGE row lineage for
${mergeTable}: ${insertedRowLineage}")
+ assertEquals(1, insertedRowLineage.size())
+ assertTrue(insertedRowLineage[0][0] != null, "Inserted MERGE
row should get generated _row_id")
+ assertTrue(insertedRowLineage[0][1] != null, "Inserted MERGE
row should get generated _last_updated_sequence_number")
+ } finally {
+ sql """drop table if exists ${mergeTable}"""
+ sql """drop table if exists ${updateDeleteTable}"""
+ }
+ }
+ } finally {
+ sql """drop database if exists ${dbName} force"""
+ sql """drop catalog if exists ${catalogName}"""
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]