This is an automated email from the ASF dual-hosted git repository.
Gabriel39 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 7a79dd88a5f [fix](iceberg)fix iceberg v3 row lineage count distinct
error result (#63826)
7a79dd88a5f is described below
commit 7a79dd88a5f0b5c6e058f9c2f31d430174b83d07
Author: daidai <[email protected]>
AuthorDate: Mon Jun 1 11:12:53 2026 +0800
[fix](iceberg)fix iceberg v3 row lineage count distinct error result
(#63826)
Critical checkpoint conclusions:
Goal and proof: The PR fixes Parquet Iceberg v3 row-lineage-only queries
where _row_id is synthesized without reading physical table columns. The code
now keeps the row positions generated inside _read_empty_batch instead of
recomputing them after _total_read_rows has advanced, and the regression adds
distinct, group by, count(distinct), and ndv coverage for _row_id-only reads.
Scope: The code change is small and focused on Parquet row-position
preparation. The test addition is directly tied to the bug.
Concurrency and lifecycle: No new shared state, threads, locks, static
initialization, or lifecycle ownership changes were introduced.
Configuration and compatibility: No new config, persisted format, RPC, or
FE/BE protocol compatibility changes.
Parallel code paths: The fix applies to the Parquet path where the bug is
introduced by _read_empty_batch and _total_read_rows; the regression
intentionally gates the aggregate-only check to Parquet.
Conditional checks: The new _need_current_batch_row_positions() helper
preserves the existing synthesized/generated handler condition and removes
duplicated condition logic.
Test coverage: Added external Iceberg regression coverage for
row-lineage-only aggregate/distinct reads before and after an insert. I did not
run the regression locally in this GitHub Actions review session.
Observability, transactions, persistence, data writes, metrics, and memory
tracking: Not applicable to this change.
User focus: No additional user-provided focus points were supplied.
---
be/src/format/parquet/vparquet_group_reader.cpp | 30 +++++------
be/src/format/parquet/vparquet_group_reader.h | 4 +-
...test_iceberg_v3_row_lineage_query_insert.groovy | 63 ++++++++++++++++++++--
3 files changed, 74 insertions(+), 23 deletions(-)
diff --git a/be/src/format/parquet/vparquet_group_reader.cpp
b/be/src/format/parquet/vparquet_group_reader.cpp
index 016fc4eb9d7..030793e2ca0 100644
--- a/be/src/format/parquet/vparquet_group_reader.cpp
+++ b/be/src/format/parquet/vparquet_group_reader.cpp
@@ -326,17 +326,13 @@ Status RowGroupReader::next_batch(Block* block, size_t
batch_size, size_t* read_
// Process external table query task that select columns are all from path.
if (_read_table_columns.empty()) {
- bool modify_row_ids = false;
- RETURN_IF_ERROR(_read_empty_batch(batch_size, read_rows, batch_eof,
&modify_row_ids));
+ RETURN_IF_ERROR(_read_empty_batch(batch_size, read_rows, batch_eof));
DCHECK(_table_format_reader);
RETURN_IF_ERROR(_table_format_reader->on_fill_partition_columns(
block, *read_rows, _lazy_read_ctx.partition_col_names));
RETURN_IF_ERROR(_table_format_reader->on_fill_missing_columns(
block, *read_rows, _lazy_read_ctx.missing_col_names));
- if (_table_format_reader->has_synthesized_column_handlers()) {
- RETURN_IF_ERROR(_get_current_batch_row_id(*read_rows));
- }
RETURN_IF_ERROR(_table_format_reader->fill_synthesized_columns(block,
*read_rows));
RETURN_IF_ERROR(_table_format_reader->fill_generated_columns(block,
*read_rows));
Status st = VExprContext::filter_block(_lazy_read_ctx.conjuncts,
block, block->columns());
@@ -357,8 +353,7 @@ Status RowGroupReader::next_batch(Block* block, size_t
batch_size, size_t* read_
RETURN_IF_ERROR(_table_format_reader->on_fill_missing_columns(
block, *read_rows, _lazy_read_ctx.missing_col_names));
- if (_table_format_reader->has_synthesized_column_handlers() ||
- _table_format_reader->has_generated_column_handlers()) {
+ if (_need_current_batch_row_positions()) {
RETURN_IF_ERROR(_get_current_batch_row_id(*read_rows));
}
RETURN_IF_ERROR(_table_format_reader->fill_synthesized_columns(block,
*read_rows));
@@ -639,8 +634,7 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t
batch_size, size_t* re
block, pre_read_rows,
_lazy_read_ctx.predicate_partition_col_names));
RETURN_IF_ERROR(_table_format_reader->on_fill_missing_columns(
block, pre_read_rows,
_lazy_read_ctx.predicate_missing_col_names));
- if (_table_format_reader->has_synthesized_column_handlers() ||
- _table_format_reader->has_generated_column_handlers()) {
+ if (_need_current_batch_row_positions()) {
RETURN_IF_ERROR(_get_current_batch_row_id(pre_read_rows));
}
RETURN_IF_ERROR(_table_format_reader->fill_synthesized_columns(block,
pre_read_rows));
@@ -949,9 +943,13 @@ Status RowGroupReader::_get_block_column_pos(const Block&
block, const std::stri
return Status::OK();
}
-Status RowGroupReader::_read_empty_batch(size_t batch_size, size_t* read_rows,
bool* batch_eof,
- bool* modify_row_ids) {
- *modify_row_ids = false;
+bool RowGroupReader::_need_current_batch_row_positions() const {
+ DCHECK(_table_format_reader);
+ return _table_format_reader->has_synthesized_column_handlers() ||
+ _table_format_reader->has_generated_column_handlers();
+}
+
+Status RowGroupReader::_read_empty_batch(size_t batch_size, size_t* read_rows,
bool* batch_eof) {
if (_position_delete_ctx.has_filter) {
int64_t start_row_id = _position_delete_ctx.current_row_id;
int64_t end_row_id = std::min(_position_delete_ctx.current_row_id +
(int64_t)batch_size,
@@ -975,9 +973,7 @@ Status RowGroupReader::_read_empty_batch(size_t batch_size,
size_t* read_rows, b
_position_delete_ctx.current_row_id = end_row_id;
*batch_eof = _position_delete_ctx.current_row_id ==
_position_delete_ctx.last_row_id;
- if (_table_format_reader->has_synthesized_column_handlers() ||
- _table_format_reader->has_generated_column_handlers()) {
- *modify_row_ids = true;
+ if (_need_current_batch_row_positions()) {
_current_batch_row_ids.clear();
_current_batch_row_ids.resize(*read_rows);
size_t idx = 0;
@@ -1000,9 +996,7 @@ Status RowGroupReader::_read_empty_batch(size_t
batch_size, size_t* read_rows, b
_remaining_rows = 0;
*batch_eof = true;
}
- if (_table_format_reader->has_synthesized_column_handlers() ||
- _table_format_reader->has_generated_column_handlers()) {
- *modify_row_ids = true;
+ if (_need_current_batch_row_positions()) {
RETURN_IF_ERROR(_get_current_batch_row_id(*read_rows));
}
}
diff --git a/be/src/format/parquet/vparquet_group_reader.h
b/be/src/format/parquet/vparquet_group_reader.h
index e9eb5370e02..79b171ca064 100644
--- a/be/src/format/parquet/vparquet_group_reader.h
+++ b/be/src/format/parquet/vparquet_group_reader.h
@@ -227,8 +227,7 @@ protected:
}
private:
- Status _read_empty_batch(size_t batch_size, size_t* read_rows, bool*
batch_eof,
- bool* modify_row_ids);
+ Status _read_empty_batch(size_t batch_size, size_t* read_rows, bool*
batch_eof);
Status _read_column_data(Block* block, const std::vector<std::string>&
columns,
size_t batch_size, size_t* read_rows, bool*
batch_eof,
@@ -255,6 +254,7 @@ private:
const IColumn::Filter& filter);
bool _can_filter_by_dict(int slot_id, const tparquet::ColumnMetaData&
column_metadata);
+ bool _need_current_batch_row_positions() const;
bool is_dictionary_encoded(const tparquet::ColumnMetaData&
column_metadata);
Status _rewrite_dict_predicates();
Status _rewrite_dict_conjuncts(std::vector<int32_t>& dict_codes, int
slot_id, bool is_nullable);
diff --git
a/regression-test/suites/external_table_p0/iceberg/test_iceberg_v3_row_lineage_query_insert.groovy
b/regression-test/suites/external_table_p0/iceberg/test_iceberg_v3_row_lineage_query_insert.groovy
index 7276fadba76..28e28bdd887 100644
---
a/regression-test/suites/external_table_p0/iceberg/test_iceberg_v3_row_lineage_query_insert.groovy
+++
b/regression-test/suites/external_table_p0/iceberg/test_iceberg_v3_row_lineage_query_insert.groovy
@@ -140,6 +140,56 @@ suite("test_iceberg_v3_row_lineage_query_insert",
"p0,external,iceberg,external_
assertEquals(expectedIds[2],
combinedPredicate[1][0].toString().toInteger())
}
+ def assertRowLineageOnlyAggregatesReadable = { tableName, expectedRowCount
->
+ def rowIdsWithPhysicalColumn = sql("""
+ select _row_id, id
+ from ${tableName}
+ order by _row_id
+ """).collect { row -> row[0].toString().toLong() }
+ log.info("Checking row lineage only baseline for ${tableName}:
rowIds=${rowIdsWithPhysicalColumn}")
+ assertEquals(expectedRowCount, rowIdsWithPhysicalColumn.size())
+ assertEquals(expectedRowCount, rowIdsWithPhysicalColumn.toSet().size())
+
+ def distinctRowIdsWithPhysicalColumn = sql("""
+ select distinct _row_id, id
+ from ${tableName}
+ order by _row_id
+ """).collect { row -> row[0].toString().toLong() }
+ log.info("""Checking distinct _row_id with physical column for
${tableName}: distinctRowIds=${distinctRowIdsWithPhysicalColumn}""")
+ assertEquals(rowIdsWithPhysicalColumn,
distinctRowIdsWithPhysicalColumn)
+
+ def distinctRowIds = sql("""
+ select distinct _row_id
+ from ${tableName}
+ order by _row_id
+ """).collect { row -> row[0].toString().toLong() }
+ log.info("Checking distinct _row_id only for ${tableName}:
distinctRowIds=${distinctRowIds}")
+ assertEquals(rowIdsWithPhysicalColumn, distinctRowIds)
+
+ def groupRows = sql("""
+ select _row_id, count(*)
+ from ${tableName}
+ group by _row_id
+ order by _row_id
+ """)
+ log.info("Checking group by _row_id only for ${tableName}:
groupRows=${groupRows}")
+ assertEquals(expectedRowCount, groupRows.size())
+ assertEquals(rowIdsWithPhysicalColumn, groupRows.collect { row ->
row[0].toString().toLong() })
+ groupRows.each { row ->
+ assertEquals(1, row[1].toString().toInteger())
+ }
+
+ def distinctAggRows = sql("""
+ select count(*), count(distinct _row_id), ndv(_row_id)
+ from ${tableName}
+ """)
+ log.info("Checking distinct aggregate on _row_id only for
${tableName}: result=${distinctAggRows}")
+ assertEquals(1, distinctAggRows.size())
+ assertEquals(expectedRowCount,
distinctAggRows[0][0].toString().toInteger())
+ assertEquals(expectedRowCount,
distinctAggRows[0][1].toString().toInteger())
+ assertEquals(expectedRowCount,
distinctAggRows[0][2].toString().toInteger())
+ }
+
sql """drop catalog if exists ${catalogName}"""
sql """
create catalog if not exists ${catalogName} properties (
@@ -180,10 +230,11 @@ suite("test_iceberg_v3_row_lineage_query_insert",
"p0,external,iceberg,external_
"""
sql """
- insert into ${unpartitionedTable} values(1, 'Alice', 25);
+ insert into ${unpartitionedTable} values
+ (1, 'Alice', 25),
+ (2, 'Bob', 30),
+ (3, 'Charlie', 35)
"""
- sql """ insert into ${unpartitionedTable} values(2, 'Bob', 30)
"""
- sql """ insert into ${unpartitionedTable} values(3, 'Charlie',
35) """
log.info("Inserted initial rows into ${unpartitionedTable}")
@@ -193,6 +244,9 @@ suite("test_iceberg_v3_row_lineage_query_insert",
"p0,external,iceberg,external_
// 3. Explicit SELECT on row lineage columns returns non-null
values.
assertRowLineageHiddenColumns(unpartitionedTable, 3)
assertExplicitRowLineageReadable(unpartitionedTable, [1, 2, 3])
+ if (format == "parquet") {
+ assertRowLineageOnlyAggregatesReadable(unpartitionedTable,
3)
+ }
test {
sql """insert into ${unpartitionedTable}(_row_id, id,
name, age) values (1, 9, 'BadRow', 99)"""
@@ -216,6 +270,9 @@ suite("test_iceberg_v3_row_lineage_query_insert",
"p0,external,iceberg,external_
unpartitionedTable,
format,
"Unpartitioned normal INSERT")
+ if (format == "parquet") {
+ assertRowLineageOnlyAggregatesReadable(unpartitionedTable,
4)
+ }
sql """drop table if exists ${partitionedTable}"""
sql """
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]