This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 74f36dc8617 [improvement](topn) check multiget result rows against 
request row id count and add be UT (#61758) (#61781)
74f36dc8617 is described below

commit 74f36dc86178df1ed888dee39f272e61a634df8c
Author: TengJianPing <[email protected]>
AuthorDate: Fri Mar 27 09:40:03 2026 +0800

    [improvement](topn) check multiget result rows against request row id count 
and add be UT (#61758) (#61781)
    
    Pick #61758
    
    Check multiget result rows matches request row id count when doing
    merge_multi_response:
    1. A BE may return an empty block event if
    request.request_block_descs(i).row_id_size() != 0: If the id_file_map
    was GC'd on the BE before it could process the request, refer 'if
    (!id_file_map)' in
    RowIdStorageReader::read_by_rowids.
    2. Report error in any case where the row count doesn't match, even if
    it's not empty,
    since that indicates a bug in BE's row fetching logic or serialization
    logic.
    
    Also add comments.
    
    Issue Number: close #xxx
    
    Related PR: #xxx
    
    Problem Summary:
    
    None
    
    - Test <!-- At least one of them must be included. -->
        - [ ] Regression test
        - [ ] Unit Test
        - [ ] Manual test (add detailed scripts or steps below)
        - [ ] No need to test or manual test. Explain why:
    - [ ] This is a refactor/code format and no logic has been changed.
    - [ ] Previous test can cover this change. - [ ] No code files have been
    changed. - [ ] Other reason <!-- Add your reason? -->
    
    - Behavior changed:
        - [ ] No.
        - [ ] Yes. <!-- Explain the behavior change -->
    
    - Does this need documentation?
        - [ ] No.
    - [ ] Yes. <!-- Add document PR link here. eg:
    https://github.com/apache/doris-website/pull/1214 -->
    
    - [ ] Confirm the release note
    - [ ] Confirm test cases
    - [ ] Confirm document
    - [ ] Add branch pick label <!-- Add branch pick label that this PR
    should merge into -->
    
    ### What problem does this PR solve?
    
    Issue Number: close #xxx
    
    Related PR: #xxx
    
    Problem Summary:
    
    ### Release note
    
    None
    
    ### Check List (For Author)
    
    - Test <!-- At least one of them must be included. -->
        - [ ] Regression test
        - [ ] Unit Test
        - [ ] Manual test (add detailed scripts or steps below)
        - [ ] No need to test or manual test. Explain why:
    - [ ] This is a refactor/code format and no logic has been changed.
            - [ ] Previous test can cover this change.
            - [ ] No code files have been changed.
            - [ ] Other reason <!-- Add your reason?  -->
    
    - Behavior changed:
        - [ ] No.
        - [ ] Yes. <!-- Explain the behavior change -->
    
    - Does this need documentation?
        - [ ] No.
    - [ ] Yes. <!-- Add document PR link here. eg:
    https://github.com/apache/doris-website/pull/1214 -->
    
    ### Check List (For Reviewer who merge this PR)
    
    - [ ] Confirm the release note
    - [ ] Confirm test cases
    - [ ] Confirm document
    - [ ] Add branch pick label <!-- Add branch pick label that this PR
    should merge into -->
---
 be/src/exec/rowid_fetcher.cpp                      |   1 +
 be/src/pipeline/exec/materialization_opertor.cpp   |  75 +++++--
 .../operator/materialization_shared_state_test.cpp | 219 ++++++++++++++++++++-
 3 files changed, 281 insertions(+), 14 deletions(-)

diff --git a/be/src/exec/rowid_fetcher.cpp b/be/src/exec/rowid_fetcher.cpp
index 1c1eb0a4360..c1430c2ecfe 100644
--- a/be/src/exec/rowid_fetcher.cpp
+++ b/be/src/exec/rowid_fetcher.cpp
@@ -548,6 +548,7 @@ Status RowIdStorageReader::read_by_rowids(const 
PMultiGetRequestV2& request,
         // if id_file_map is null, means the BE not have scan range, just 
return ok
         if (!id_file_map) {
             // padding empty block to response
+            LOG(INFO) << "id_file_map not found for query_id: " << 
print_id(request.query_id());
             for (int i = 0; i < request.request_block_descs_size(); ++i) {
                 response->add_blocks();
             }
diff --git a/be/src/pipeline/exec/materialization_opertor.cpp 
b/be/src/pipeline/exec/materialization_opertor.cpp
index e040e35d7f6..1e3c34aabe8 100644
--- a/be/src/pipeline/exec/materialization_opertor.cpp
+++ b/be/src/pipeline/exec/materialization_opertor.cpp
@@ -54,8 +54,25 @@ void 
MaterializationSharedState::get_block(vectorized::Block* block) {
     origin_block.clear();
 }
 
+// Merges RPC responses from multiple BEs into `response_blocks` in the 
original row order.
+//
+// After parallel multiget_data_v2 RPCs complete, each BE's response contains 
a partial block
+// with only the rows that BE owns (ordered by file_id/row_id). This function 
reassembles them
+// into the correct TopN output order using `block_order_results` as the 
ordering guide.
+//
+// Data flow:
+//   rpc_struct_map[backend_id].response  (per-BE partial blocks, unordered 
across BEs)
+//       + block_order_results[i][j]      (maps each output row → its source 
backend_id)
+//       → response_blocks[i]             (final merged result in original 
TopN row order)
 Status MaterializationSharedState::merge_multi_response() {
+    // Outer loop: iterate over each relation (i.e., each rowid column / 
table).
+    // A query with lazy materialization on 2 tables would have 
block_order_results.size() == 2,
+    // each with its own set of response_blocks and RPC request_block_descs.
     for (int i = 0; i < block_order_results.size(); ++i) {
+        // Maps backend_id → (deserialized block from that BE, row cursor into 
the block).
+        // The cursor tracks how many rows we've consumed from this BE's block 
so far,
+        // since the rows in the partial block are in the same order as the 
row_ids we sent.
+
         // block_maps must be rebuilt for each relation (each i), because a 
backend that
         // returned a non-empty block for relation i-1 may return an empty 
block for
         // relation i (e.g. it holds rows only from one of the two tables in a 
UNION ALL).
@@ -63,6 +80,9 @@ Status MaterializationSharedState::merge_multi_response() {
         // relation and miss entries for the current one, causing the
         // "backend_id not found in block_maps" error.
         std::unordered_map<int64_t, std::pair<vectorized::Block, int>> 
block_maps;
+
+        // Phase 1: Deserialize the i-th response block from every BE into 
block_maps.
+        // Each BE's response.blocks(i) corresponds to the i-th relation's 
fetched columns.
         for (auto& [backend_id, rpc_struct] : rpc_struct_map) {
             vectorized::Block partial_block;
             size_t uncompressed_size = 0;
@@ -70,13 +90,31 @@ Status MaterializationSharedState::merge_multi_response() {
             DCHECK(rpc_struct.response.blocks_size() > i);
             
RETURN_IF_ERROR(partial_block.deserialize(rpc_struct.response.blocks(i).block(),
                                                       &uncompressed_size, 
&uncompressed_time));
+            // Check multiget result rows matches request row id count.
+            // 1. A BE may return an empty block event if
+            // request.request_block_descs(i).row_id_size() != 0:
+            // If the id_file_map was GC'd on the BE before it could process 
the request,
+            // refer 'if (!id_file_map)' in RowIdStorageReader::read_by_rowids.
+            // 2. Report error in any case where the row count doesn't match, 
even if it's not empty,
+            //    since that indicates a bug in BE's row fetching logic or 
serialization logic.
+            if (rpc_struct.request.request_block_descs(i).row_id_size() != 
partial_block.rows()) {
+                return Status::InternalError(
+                        fmt::format("merge_multi_response, "
+                                    "backend_id {} returned block with row 
count {} not match "
+                                    "request row id count {}",
+                                    backend_id, partial_block.rows(),
+                                    
rpc_struct.request.request_block_descs(i).row_id_size()));
+            }
             if (rpc_struct.response.blocks(i).has_profile()) {
                 auto response_profile =
                         
RuntimeProfile::from_proto(rpc_struct.response.blocks(i).profile());
                 _update_profile_info(backend_id, response_profile.get());
             }
 
+            // Only insert non-empty blocks. A BE may return an empty block if
+            // request.request_block_descs(i).row_id_size() is 0
             if (!partial_block.is_empty_column()) {
+                // Reset row cursor to 0 — we'll consume rows from this block 
sequentially.
                 block_maps[backend_id] = 
std::make_pair(std::move(partial_block), 0);
             }
         }
@@ -89,8 +127,12 @@ Status MaterializationSharedState::merge_multi_response() {
                         *source_block_rows.first.get_by_position(k).column));
             }
         }
+        // Phase 2: Walk the original row order and copy each row from the 
correct BE's block
+        // into response_blocks[i]. block_order_results[i][j] tells us which 
backend_id owns
+        // row j. A value of 0 means the rowid was NULL (e.g., from an outer 
join).
         for (int j = 0; j < block_order_results[i].size(); ++j) {
             auto backend_id = block_order_results[i][j];
+            // Non-null rowid: copy the next row from this BE's partial block.
             if (backend_id) {
                 if (UNLIKELY(block_maps.find(backend_id) == block_maps.end())) 
{
                     return Status::InternalError(
@@ -98,13 +140,17 @@ Status MaterializationSharedState::merge_multi_response() {
                                         "backend_id {} not found in 
block_maps",
                                         backend_id));
                 }
+                // source_block_rows.first  = the deserialized Block from this 
BE
+                // source_block_rows.second = current row cursor (how many 
rows consumed so far)
                 auto& source_block_rows = block_maps[backend_id];
                 DCHECK(source_block_rows.second < 
source_block_rows.first.rows());
+                // Copy column-by-column from the source block's current row 
into response_blocks.
                 for (int k = 0; k < response_blocks[i].columns(); ++k) {
                     response_blocks[i].get_column_by_position(k)->insert_from(
                             *source_block_rows.first.get_by_position(k).column,
                             source_block_rows.second);
                 }
+                // Advance the cursor — next time we see this backend_id, we 
take the next row.
                 source_block_rows.second++;
             } else {
                 for (int k = 0; k < response_blocks[i].columns(); ++k) {
@@ -115,6 +161,9 @@ Status MaterializationSharedState::merge_multi_response() {
     }
 
     // clear request/response
+    // Phase 3: Clear the row_id and file_id arrays in each RPC request to 
prepare for the
+    // next batch. The request template (column_descs, slots, etc.) is reused 
across batches;
+    // only the per-row data (file_id, row_id) needs to be cleared.
     for (auto& [_, rpc_struct] : rpc_struct_map) {
         for (int i = 0; i < rpc_struct.request.request_block_descs_size(); 
++i) {
             rpc_struct.request.mutable_request_block_descs(i)->clear_row_id();
@@ -160,9 +209,9 @@ Status 
MaterializationSharedState::create_muiltget_result(const vectorized::Colu
     for (int i = 0; i < columns.size(); ++i) {
         const uint8_t* null_map = nullptr;
         const vectorized::ColumnString* column_rowid = nullptr;
-        auto& column = columns[i];
+        const auto& column = columns[i];
 
-        if (auto column_ptr = 
check_and_get_column<vectorized::ColumnNullable>(*column)) {
+        if (const auto* column_ptr = 
check_and_get_column<vectorized::ColumnNullable>(*column)) {
             null_map = column_ptr->get_null_map_data().data();
             column_rowid = assert_cast<const vectorized::ColumnString*>(
                     column_ptr->get_nested_column_ptr().get());
@@ -223,7 +272,7 @@ Status MaterializationSharedState::init_multi_requests(
     // Initialize the base struct of PMultiGetRequestV2
     multi_get_request.set_be_exec_version(state->be_exec_version());
     
multi_get_request.set_wg_id(state->get_query_ctx()->workload_group()->id());
-    auto query_id = multi_get_request.mutable_query_id();
+    auto* query_id = multi_get_request.mutable_query_id();
     query_id->set_hi(state->query_id().hi);
     query_id->set_lo(state->query_id().lo);
     DCHECK_EQ(materialization_node.column_descs_lists.size(),
@@ -236,24 +285,24 @@ Status MaterializationSharedState::init_multi_requests(
             
std::vector<vectorized::MutableBlock>(materialization_node.column_descs_lists.size());
 
     for (int i = 0; i < materialization_node.column_descs_lists.size(); ++i) {
-        auto request_block_desc = multi_get_request.add_request_block_descs();
+        auto* request_block_desc = multi_get_request.add_request_block_descs();
         
request_block_desc->set_fetch_row_store(materialization_node.fetch_row_stores[i]);
         // Initialize the column_descs and slot_locs
-        auto& column_descs = materialization_node.column_descs_lists[i];
-        for (auto& column_desc_item : column_descs) {
+        const auto& column_descs = materialization_node.column_descs_lists[i];
+        for (const auto& column_desc_item : column_descs) {
             
TabletColumn(column_desc_item).to_schema_pb(request_block_desc->add_column_descs());
         }
 
-        auto& slot_locs = materialization_node.slot_locs_lists[i];
+        const auto& slot_locs = materialization_node.slot_locs_lists[i];
         tuple_desc->to_protobuf(request_block_desc->mutable_desc());
 
-        auto& column_idxs = materialization_node.column_idxs_lists[i];
+        const auto& column_idxs = materialization_node.column_idxs_lists[i];
         for (auto idx : column_idxs) {
             request_block_desc->add_column_idxs(idx);
         }
 
         std::vector<SlotDescriptor*> slots_res;
-        for (auto& slot_loc_item : slot_locs) {
+        for (const auto& slot_loc_item : slot_locs) {
             slots[slot_loc_item]->to_protobuf(request_block_desc->add_slots());
             slots_res.emplace_back(slots[slot_loc_item]);
         }
@@ -286,7 +335,7 @@ Status MaterializationOperator::init(const 
doris::TPlanNode& tnode, doris::Runti
     _materialization_node = tnode.materialization_node;
     _gc_id_map = tnode.materialization_node.gc_id_map;
     // Create result_expr_ctx_lists_ from thrift exprs.
-    auto& fetch_expr_lists = tnode.materialization_node.fetch_expr_lists;
+    const auto& fetch_expr_lists = tnode.materialization_node.fetch_expr_lists;
     RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(fetch_expr_lists, 
_rowid_exprs));
     return Status::OK();
 }
@@ -316,7 +365,7 @@ Status MaterializationOperator::pull(RuntimeState* state, 
vectorized::Block* out
     if (*eos) {
         for (const auto& [backend_id, child_info] :
              local_state._materialization_state.backend_profile_info_string) {
-            auto child_profile = local_state.operator_profile()->create_child(
+            auto* child_profile = local_state.operator_profile()->create_child(
                     "RowIDFetcher: BackendId:" + std::to_string(backend_id));
             for (const auto& [info_key, info_value] :
                  
local_state._materialization_state.backend_profile_info_string[backend_id]) {
@@ -344,7 +393,7 @@ Status MaterializationOperator::push(RuntimeState* state, 
vectorized::Block* in_
         if (in_block->rows() != 0) {
             
local_state._materialization_state.rowid_locs.resize(_rowid_exprs.size());
             for (int i = 0; i < _rowid_exprs.size(); ++i) {
-                auto& rowid_expr = _rowid_exprs[i];
+                const auto& rowid_expr = _rowid_exprs[i];
                 RETURN_IF_ERROR(rowid_expr->execute(
                         in_block, 
&local_state._materialization_state.rowid_locs[i]));
                 columns.emplace_back(
@@ -360,7 +409,7 @@ Status MaterializationOperator::push(RuntimeState* state, 
vectorized::Block* in_
         bthread::CountdownEvent counter(static_cast<int>(size));
         MonotonicStopWatch rpc_timer(true);
         for (auto& [backend_id, rpc_struct] : 
local_state._materialization_state.rpc_struct_map) {
-            auto callback = brpc::NewCallback(fetch_callback, &counter);
+            auto* callback = brpc::NewCallback(fetch_callback, &counter);
             rpc_struct.cntl->set_timeout_ms(state->execution_timeout() * 1000);
             // send brpc request
             rpc_struct.stub->multiget_data_v2(rpc_struct.cntl.get(), 
&rpc_struct.request,
diff --git a/be/test/pipeline/operator/materialization_shared_state_test.cpp 
b/be/test/pipeline/operator/materialization_shared_state_test.cpp
index d96653b2dfe..769fc98e786 100644
--- a/be/test/pipeline/operator/materialization_shared_state_test.cpp
+++ b/be/test/pipeline/operator/materialization_shared_state_test.cpp
@@ -108,6 +108,12 @@ TEST_F(MaterializationSharedStateTest, 
TestMergeMultiResponse) {
     // Backend 1's response
     {
         vectorized::Block resp_block1;
+        _shared_state->rpc_struct_map[_backend_id1]
+                .request.mutable_request_block_descs(0)
+                ->add_row_id(0);
+        _shared_state->rpc_struct_map[_backend_id1]
+                .request.mutable_request_block_descs(0)
+                ->add_row_id(1);
         auto resp_value_col1 = _int_type->create_column();
         auto* value_col_data1 = 
reinterpret_cast<vectorized::ColumnInt32*>(resp_value_col1.get());
         
value_col_data1->insert(vectorized::Field::create_field<PrimitiveType::TYPE_INT>(100));
@@ -132,6 +138,9 @@ TEST_F(MaterializationSharedStateTest, 
TestMergeMultiResponse) {
     // Backend 2's response
     {
         vectorized::Block resp_block2;
+        _shared_state->rpc_struct_map[_backend_id2]
+                .request.mutable_request_block_descs(0)
+                ->add_row_id(2);
         auto resp_value_col2 = _int_type->create_column();
         auto* value_col_data2 = 
reinterpret_cast<vectorized::ColumnInt32*>(resp_value_col2.get());
         
value_col_data2->insert(vectorized::Field::create_field<PrimitiveType::TYPE_INT>(200));
@@ -211,6 +220,9 @@ TEST_F(MaterializationSharedStateTest, 
TestMergeMultiResponseMultiBlocks) {
     // 2. Setup response blocks from multiple backends for first rowid
     {
         vectorized::Block resp_block1;
+        _shared_state->rpc_struct_map[_backend_id1]
+                .request.mutable_request_block_descs(0)
+                ->add_row_id(0);
         auto resp_value_col1 = _int_type->create_column();
         auto* value_col_data1 = 
reinterpret_cast<vectorized::ColumnInt32*>(resp_value_col1.get());
         
value_col_data1->insert(vectorized::Field::create_field<PrimitiveType::TYPE_INT>(100));
@@ -233,6 +245,9 @@ TEST_F(MaterializationSharedStateTest, 
TestMergeMultiResponseMultiBlocks) {
     // Backend 2's response for first rowid
     {
         vectorized::Block resp_block2;
+        _shared_state->rpc_struct_map[_backend_id2]
+                .request.mutable_request_block_descs(0)
+                ->add_row_id(0);
         auto resp_value_col2 = _int_type->create_column();
         auto* value_col_data2 = 
reinterpret_cast<vectorized::ColumnInt32*>(resp_value_col2.get());
         
value_col_data2->insert(vectorized::Field::create_field<PrimitiveType::TYPE_INT>(102));
@@ -252,8 +267,13 @@ TEST_F(MaterializationSharedStateTest, 
TestMergeMultiResponseMultiBlocks) {
     }
 
     // Add second block responses for second rowid
+    
_shared_state->rpc_struct_map[_backend_id1].request.add_request_block_descs();
+    
_shared_state->rpc_struct_map[_backend_id2].request.add_request_block_descs();
     {
         vectorized::Block resp_block1;
+        _shared_state->rpc_struct_map[_backend_id1]
+                .request.mutable_request_block_descs(1)
+                ->add_row_id(0);
         auto resp_value_col1 = _int_type->create_column();
         auto* value_col_data1 = 
reinterpret_cast<vectorized::ColumnInt32*>(resp_value_col1.get());
         
value_col_data1->insert(vectorized::Field::create_field<PrimitiveType::TYPE_INT>(200));
@@ -273,13 +293,16 @@ TEST_F(MaterializationSharedStateTest, 
TestMergeMultiResponseMultiBlocks) {
 
     {
         vectorized::Block resp_block2;
+        _shared_state->rpc_struct_map[_backend_id2]
+                .request.mutable_request_block_descs(1)
+                ->add_row_id(0);
         auto resp_value_col2 = _int_type->create_column();
         auto* value_col_data2 = 
reinterpret_cast<vectorized::ColumnInt32*>(resp_value_col2.get());
         
value_col_data2->insert(vectorized::Field::create_field<PrimitiveType::TYPE_INT>(201));
         resp_block2.insert(
                 {make_nullable(std::move(resp_value_col2)), 
make_nullable(_int_type), "value2"});
 
-        auto serialized_block =
+        auto* serialized_block =
                 
_shared_state->rpc_struct_map[_backend_id2].response.add_blocks()->mutable_block();
         size_t uncompressed_size = 0;
         size_t compressed_size = 0;
@@ -318,4 +341,198 @@ TEST_F(MaterializationSharedStateTest, 
TestMergeMultiResponseMultiBlocks) {
     EXPECT_EQ(merged_value_col2->get_data_at(2).data, nullptr);
 }
 
+// Test: when a remote BE returns an empty response block for a relation
+// (e.g., id_file_map was GC'd), merge_multi_response() should return a clear
+// InternalError("... not match request row id count...") rather than crashing.
+//
+// This simulates the scenario in RowIdStorageReader::read_by_rowids() where:
+//   auto id_file_map = get_id_manager()->get_id_file_map(request.query_id());
+//   if (!id_file_map) {
+//       for (int i = 0; i < request.request_block_descs_size(); ++i)
+//           response->add_blocks();  // <-- empty block, no column data
+//       return Status::OK();
+//   }
+TEST_F(MaterializationSharedStateTest, TestMergeMultiResponseBackendNotFound) {
+    // Setup: 1 relation, 2 backends
+    // BE_1 returns a valid 1-row block
+    // BE_2 returns an empty block (simulating id_file_map missing)
+    // block_order_results references both BE_1 and BE_2
+    _shared_state->response_blocks = std::vector<vectorized::MutableBlock>(1);
+
+    // --- BE_1: valid response with 1 row ---
+    {
+        _shared_state->rpc_struct_map[_backend_id1]
+                .request.mutable_request_block_descs(0)
+                ->add_row_id(0);
+        vectorized::Block resp_block;
+        auto col = _int_type->create_column();
+        reinterpret_cast<vectorized::ColumnInt32*>(col.get())->insert(
+                vectorized::Field::create_field<PrimitiveType::TYPE_INT>(42));
+        resp_block.insert({make_nullable(std::move(col)), 
make_nullable(_int_type), "value"});
+
+        PMultiGetResponseV2 response;
+        auto* serialized_block = response.add_blocks()->mutable_block();
+        size_t uncompressed_size = 0, compressed_size = 0;
+        int64_t compress_time = 0;
+        ASSERT_TRUE(resp_block
+                            .serialize(0, serialized_block, 
&uncompressed_size, &compressed_size,
+                                       &compress_time, CompressionTypePB::LZ4)
+                            .ok());
+
+        _shared_state->rpc_struct_map[_backend_id1].response = 
std::move(response);
+        _shared_state->response_blocks[0] = resp_block.clone_empty();
+    }
+
+    // --- BE_2: empty response (simulating id_file_map = null on remote BE) 
---
+    // The remote BE adds an empty PMultiGetBlockV2 with no PBlock data.
+    // After deserialization this produces a Block with 0 columns,
+    // so is_empty_column() == true and it won't be inserted into block_maps.
+    {
+        _shared_state->rpc_struct_map[_backend_id2]
+                .request.mutable_request_block_descs(0)
+                ->add_row_id(0);
+        PMultiGetResponseV2 response;
+        response.add_blocks(); // empty PMultiGetBlockV2, no mutable_block() 
data
+        _shared_state->rpc_struct_map[_backend_id2].response = 
std::move(response);
+    }
+
+    // block_order_results references both BEs:
+    // row 0 → BE_1 (will succeed), row 1 → BE_2 (will fail: not in block_maps)
+    _shared_state->block_order_results = {{_backend_id1, _backend_id2}};
+
+    // Setup origin block so get_block() can work if merge somehow passes
+    auto rowid_col = _string_type->create_column();
+    rowid_col->insert_many_defaults(2);
+    auto value_col = _int_type->create_column();
+    value_col->insert_many_defaults(2);
+    _shared_state->origin_block = vectorized::Block({{std::move(rowid_col), 
_string_type, "rowid"},
+                                                     {std::move(value_col), 
_int_type, "value"}});
+    _shared_state->rowid_locs = {0};
+
+    // merge_multi_response() should return InternalError
+    Status st = _shared_state->merge_multi_response();
+    ASSERT_FALSE(st.ok());
+    ASSERT_TRUE(st.is<ErrorCode::INTERNAL_ERROR>());
+    ASSERT_TRUE(st.to_string().find("not match request row id count") != 
std::string::npos)
+            << "Actual error: " << st.to_string();
+}
+
+// test for the stale block_maps bug fixed by commit c655b1a.
+// With 2 relations, if block_maps is NOT rebuilt per relation, a stale entry
+// from relation 0 (with different schema) could be accessed during relation 1,
+// causing wrong data or type mismatch crashes.
+TEST_F(MaterializationSharedStateTest, TestMergeMultiResponseStaleBlockMaps) {
+    // Setup: 2 relations, 2 backends
+    // Relation 0 (table A): BE_1 has 1 row, BE_2 has 0 rows (empty response)
+    // Relation 1 (table B): BE_1 has 0 rows (empty response), BE_2 has 1 row
+    // block_order_results[0] = [BE_1]
+    // block_order_results[1] = [BE_2]
+    //
+    // Before c655b1a fix: block_maps persists across relations.
+    //   i=0: block_maps = {BE_1: table_A_block}. Merge OK.
+    //   i=1: BE_1 empty (stale entry stays), BE_2 has data.
+    //         block_maps = {BE_1: stale!, BE_2: table_B_block}.
+    //         block_order_results[1] = [BE_2] → accesses BE_2 → OK.
+    //         But if block_order_results[1] also had BE_1 → stale schema → 
crash!
+    //
+    // After fix: block_maps is fresh per relation. This test verifies the
+    // correct behavior for cross-relation data distribution.
+
+    _shared_state->response_blocks = std::vector<vectorized::MutableBlock>(2);
+    
_shared_state->rpc_struct_map[_backend_id1].request.add_request_block_descs();
+    
_shared_state->rpc_struct_map[_backend_id2].request.add_request_block_descs();
+
+    // --- Build BE_1's response: blocks[0]=1 row (INT), blocks[1]=empty ---
+    {
+        _shared_state->rpc_struct_map[_backend_id1]
+                .request.mutable_request_block_descs(0)
+                ->add_row_id(0);
+        PMultiGetResponseV2 response;
+
+        // blocks[0]: 1 row of INT for relation 0
+        vectorized::Block rel0_block;
+        auto col = _int_type->create_column();
+        reinterpret_cast<vectorized::ColumnInt32*>(col.get())->insert(
+                vectorized::Field::create_field<PrimitiveType::TYPE_INT>(100));
+        rel0_block.insert({make_nullable(std::move(col)), 
make_nullable(_int_type), "price"});
+
+        auto* pb0 = response.add_blocks()->mutable_block();
+        size_t us = 0, cs = 0;
+        int64_t ct = 0;
+        ASSERT_TRUE(rel0_block.serialize(0, pb0, &us, &cs, &ct, 
CompressionTypePB::LZ4).ok());
+        _shared_state->response_blocks[0] = rel0_block.clone_empty();
+
+        // blocks[1]: empty (BE_1 has no data for relation 1)
+        response.add_blocks();
+
+        _shared_state->rpc_struct_map[_backend_id1].response = 
std::move(response);
+    }
+
+    // --- Build BE_2's response: blocks[0]=empty, blocks[1]=1 row (STRING) ---
+    {
+        PMultiGetResponseV2 response;
+
+        // blocks[0]: empty (BE_2 has no data for relation 0)
+        response.add_blocks();
+
+        // blocks[1]: 1 row of STRING for relation 1
+        vectorized::Block rel1_block;
+        auto col = _string_type->create_column();
+        
reinterpret_cast<vectorized::ColumnString*>(col.get())->insert_data("Alice", 5);
+        rel1_block.insert({make_nullable(std::move(col)), 
make_nullable(_string_type), "name"});
+
+        _shared_state->rpc_struct_map[_backend_id2]
+                .request.mutable_request_block_descs(1)
+                ->add_row_id(0);
+        auto* pb1 = response.add_blocks()->mutable_block();
+        size_t us = 0, cs = 0;
+        int64_t ct = 0;
+        ASSERT_TRUE(rel1_block.serialize(0, pb1, &us, &cs, &ct, 
CompressionTypePB::LZ4).ok());
+        _shared_state->response_blocks[1] = rel1_block.clone_empty();
+
+        _shared_state->rpc_struct_map[_backend_id2].response = 
std::move(response);
+    }
+
+    // block_order_results: relation 0 → only BE_1, relation 1 → only BE_2
+    _shared_state->block_order_results = {{_backend_id1}, {_backend_id2}};
+
+    // Setup origin block: [rowid_rel0, rowid_rel1, sort_col]
+    auto rowid_col0 = _string_type->create_column();
+    rowid_col0->insert_many_defaults(1);
+    auto rowid_col1 = _string_type->create_column();
+    rowid_col1->insert_many_defaults(1);
+    auto sort_col = _int_type->create_column();
+    
sort_col->insert(vectorized::Field::create_field<PrimitiveType::TYPE_INT>(999));
+    _shared_state->origin_block =
+            vectorized::Block({{std::move(rowid_col0), _string_type, "rowid0"},
+                               {std::move(rowid_col1), _string_type, "rowid1"},
+                               {std::move(sort_col), _int_type, "sort_key"}});
+    _shared_state->rowid_locs = {0, 1};
+
+    // merge should succeed — each relation only references the BE that has 
data
+    Status st = _shared_state->merge_multi_response();
+    ASSERT_TRUE(st.ok()) << "merge_multi_response failed: " << st.to_string();
+
+    // Verify results
+    vectorized::Block result_block;
+    _shared_state->get_block(&result_block);
+    EXPECT_EQ(result_block.rows(), 1);
+    // Column order: response_blocks[0] cols, response_blocks[1] cols, sort_key
+    // = [price(nullable INT), name(nullable STRING), sort_key(INT)]
+    EXPECT_EQ(result_block.columns(), 3);
+
+    // Verify relation 0 data (price = 100)
+    auto* price_col = result_block.get_by_position(0).column.get();
+    auto* nullable_price = assert_cast<const 
vectorized::ColumnNullable*>(price_col);
+    EXPECT_FALSE(nullable_price->is_null_at(0));
+    EXPECT_EQ(
+            *reinterpret_cast<const 
int*>(nullable_price->get_nested_column().get_data_at(0).data),
+            100);
+
+    // Verify relation 1 data (name = "Alice")
+    auto* name_col = result_block.get_by_position(1).column.get();
+    auto* nullable_name = assert_cast<const 
vectorized::ColumnNullable*>(name_col);
+    EXPECT_FALSE(nullable_name->is_null_at(0));
+    EXPECT_EQ(nullable_name->get_nested_column().get_data_at(0).to_string(), 
"Alice");
+}
 } // namespace doris::pipeline


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to