This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 75d68f6ad64 [fix](core) Fix core caused by 
VDataStreamMgr::transmit_block (#50560)
75d68f6ad64 is described below

commit 75d68f6ad64403b49af09ab32a336d7c4c97f74a
Author: Mryange <yanxuech...@selectdb.com>
AuthorDate: Wed Apr 30 19:42:33 2025 +0800

    [fix](core) Fix core caused by VDataStreamMgr::transmit_block (#50560)
    
    ### What problem does this PR solve?
    
    https://github.com/apache/doris/pull/50113
    
    transmit_block should be marked as done when transmitting the last
    block.
    Otherwise, the previous block might reach a memory limit and set done to
    null.
    
    ```
    F20250430 11:49:29.413803 2176884 vdata_stream_recvr.cpp:200] Check failed: 
*done != nullptr
    *** Check failure stack trace: ***
        @     0x558196be1956  google::LogMessage::SendToLog()
        @     0x558196bde3a0  google::LogMessage::Flush()
        @     0x558196be2199  google::LogMessageFatal::~LogMessageFatal()
        @     0x558193b1659c  
doris::vectorized::VDataStreamRecvr::SenderQueue::add_block()
        @     0x558193b1f7b5  doris::vectorized::VDataStreamRecvr::add_block()
        @     0x558193af6cf8  
doris::vectorized::VDataStreamMgr::transmit_block()
        @     0x558157f7a685  
doris::pipeline::DataStreamRecvrTest_transmit_block_Test::TestBody()
        @     0x5581973c8c0b  
testing::internal::HandleSehExceptionsInMethodIfSupported<>()
        @     0x5581973c2a69  
testing::internal::HandleExceptionsInMethodIfSupported<>()
        @     0x55819739943a  testing::Test::Run()
        @     0x558197399e5e  testing::TestInfo::Run()
        @     0x55819739a71e  testing::TestSuite::Run()
        @     0x5581973a9dde  testing::internal::UnitTestImpl::RunAllTests()
        @     0x5581973c9a56  
testing::internal::HandleSehExceptionsInMethodIfSupported<>()
        @     0x5581973c3a61  
testing::internal::HandleExceptionsInMethodIfSupported<>()
        @     0x5581973a85d3  testing::UnitTest::Run()
        @     0x5581593d8653  RUN_ALL_TESTS()
    ```
    ### Release note
    
    None
    
    ### Check List (For Author)
    
    - Test <!-- At least one of them must be included. -->
        - [ ] Regression test
        - [x] Unit Test
        - [ ] Manual test (add detailed scripts or steps below)
        - [ ] No need to test or manual test. Explain why:
    - [ ] This is a refactor/code format and no logic has been changed.
            - [ ] Previous test can cover this change.
            - [ ] No code files have been changed.
            - [ ] Other reason <!-- Add your reason?  -->
    
    - Behavior changed:
        - [x] No.
        - [ ] Yes. <!-- Explain the behavior change -->
    
    - Does this need documentation?
        - [x] No.
    - [ ] Yes. <!-- Add document PR link here. eg:
    https://github.com/apache/doris-website/pull/1214 -->
    
    ### Check List (For Reviewer who merge this PR)
    
    - [ ] Confirm the release note
    - [ ] Confirm test cases
    - [ ] Confirm document
    - [ ] Add branch pick label <!-- Add branch pick label that this PR
    should merge into -->
---
 be/src/vec/runtime/vdata_stream_mgr.cpp           | 16 ++++++-
 be/src/vec/runtime/vdata_stream_mgr.h             |  8 ++--
 be/src/vec/runtime/vdata_stream_recvr.cpp         |  4 +-
 be/src/vec/runtime/vdata_stream_recvr.h           |  2 +-
 be/test/pipeline/exec/vdata_stream_recvr_test.cpp | 56 ++++++++++++++++++++++-
 5 files changed, 78 insertions(+), 8 deletions(-)

diff --git a/be/src/vec/runtime/vdata_stream_mgr.cpp 
b/be/src/vec/runtime/vdata_stream_mgr.cpp
index 2a4f4e22861..c81fa21fa34 100644
--- a/be/src/vec/runtime/vdata_stream_mgr.cpp
+++ b/be/src/vec/runtime/vdata_stream_mgr.cpp
@@ -150,9 +150,23 @@ Status VDataStreamMgr::transmit_block(const 
PTransmitDataParams* request,
         for (int i = 0; i < request->blocks_size(); i++) {
             std::unique_ptr<PBlock> pblock_ptr = std::make_unique<PBlock>();
             pblock_ptr->Swap(const_cast<PBlock*>(&request->blocks(i)));
+            auto pass_done = [&]() -> ::google::protobuf::Closure** {
+                // If it is eos, no callback is needed, done can be nullptr
+                if (eos) {
+                    return nullptr;
+                }
+                // If it is the last block, a callback is needed, pass done
+                if (i == request->blocks_size() - 1) {
+                    return done;
+                } else {
+                    // If it is not the last block, the blocks in the request 
currently belong to the same queue,
+                    // and the callback is handled by the done of the last 
block
+                    return nullptr;
+                }
+            };
             RETURN_IF_ERROR(recvr->add_block(
                     std::move(pblock_ptr), request->sender_id(), 
request->be_number(),
-                    request->packet_seq() - request->blocks_size() + i, eos ? 
nullptr : done,
+                    request->packet_seq() - request->blocks_size() + i, 
pass_done(),
                     wait_for_worker, cpu_time_stop_watch.elapsed_time()));
         }
     }
diff --git a/be/src/vec/runtime/vdata_stream_mgr.h 
b/be/src/vec/runtime/vdata_stream_mgr.h
index f9b5bbe5bcd..a9266d02d96 100644
--- a/be/src/vec/runtime/vdata_stream_mgr.h
+++ b/be/src/vec/runtime/vdata_stream_mgr.h
@@ -27,6 +27,7 @@
 #include <unordered_map>
 #include <utility>
 
+#include "common/be_mock_util.h"
 #include "common/global_types.h"
 #include "common/status.h"
 #include "util/runtime_profile.h"
@@ -52,15 +53,16 @@ class VDataStreamRecvr;
 class VDataStreamMgr {
 public:
     VDataStreamMgr();
-    ~VDataStreamMgr();
+    MOCK_FUNCTION ~VDataStreamMgr();
 
     std::shared_ptr<VDataStreamRecvr> create_recvr(
             RuntimeState* state, RuntimeProfile::HighWaterMarkCounter* 
memory_used_counter,
             const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, 
int num_senders,
             RuntimeProfile* profile, bool is_merging, size_t 
data_queue_capacity);
 
-    Status find_recvr(const TUniqueId& fragment_instance_id, PlanNodeId 
node_id,
-                      std::shared_ptr<VDataStreamRecvr>* res, bool 
acquire_lock = true);
+    MOCK_FUNCTION Status find_recvr(const TUniqueId& fragment_instance_id, 
PlanNodeId node_id,
+                                    std::shared_ptr<VDataStreamRecvr>* res,
+                                    bool acquire_lock = true);
 
     Status deregister_recvr(const TUniqueId& fragment_instance_id, PlanNodeId 
node_id);
 
diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp 
b/be/src/vec/runtime/vdata_stream_recvr.cpp
index 603270b7206..5bcd7dd1ef9 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.cpp
+++ b/be/src/vec/runtime/vdata_stream_recvr.cpp
@@ -489,7 +489,9 @@ void VDataStreamRecvr::close() {
     }
     // Remove this receiver from the DataStreamMgr that created it.
     // TODO: log error msg
-    static_cast<void>(_mgr->deregister_recvr(fragment_instance_id(), 
dest_node_id()));
+    if (_mgr) {
+        static_cast<void>(_mgr->deregister_recvr(fragment_instance_id(), 
dest_node_id()));
+    }
     _mgr = nullptr;
 
     _merger.reset();
diff --git a/be/src/vec/runtime/vdata_stream_recvr.h 
b/be/src/vec/runtime/vdata_stream_recvr.h
index c311668ae82..9325f4ada3f 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.h
+++ b/be/src/vec/runtime/vdata_stream_recvr.h
@@ -111,7 +111,7 @@ public:
     // Careful: stream sender will call this function for a local receiver,
     // accessing members of receiver that are allocated by Object pool
     // in this function is not safe.
-    bool exceeds_limit(size_t block_byte_size);
+    MOCK_FUNCTION bool exceeds_limit(size_t block_byte_size);
     bool queue_exceeds_limit(size_t byte_size) const;
     bool is_closed() const { return _is_closed; }
 
diff --git a/be/test/pipeline/exec/vdata_stream_recvr_test.cpp 
b/be/test/pipeline/exec/vdata_stream_recvr_test.cpp
index 636772361c9..bed8be3e4f3 100644
--- a/be/test/pipeline/exec/vdata_stream_recvr_test.cpp
+++ b/be/test/pipeline/exec/vdata_stream_recvr_test.cpp
@@ -28,6 +28,7 @@
 #include "testutil/column_helper.h"
 #include "testutil/mock/mock_runtime_state.h"
 #include "vec/data_types/data_type_number.h"
+#include "vec/runtime/vdata_stream_mgr.h"
 
 namespace doris::pipeline {
 using namespace vectorized;
@@ -37,6 +38,14 @@ struct MockVDataStreamRecvr : public VDataStreamRecvr {
                          RuntimeProfile* profile, int num_senders, bool 
is_merging)
             : VDataStreamRecvr(nullptr, counter, state, TUniqueId(), 0, 
num_senders, is_merging,
                                profile, 1) {};
+
+    bool exceeds_limit(size_t block_byte_size) override {
+        if (always_exceeds_limit) {
+            return true;
+        }
+        return VDataStreamRecvr::exceeds_limit(block_byte_size);
+    }
+    bool always_exceeds_limit = false;
 };
 
 class DataStreamRecvrTest : public testing::Test {
@@ -50,12 +59,12 @@ public:
                 
std::make_unique<RuntimeProfile::HighWaterMarkCounter>(TUnit::UNIT, 0, "test");
         _mock_state = std::make_unique<MockRuntimeState>();
         _mock_profile = std::make_unique<RuntimeProfile>("test");
-        recvr = std::make_unique<MockVDataStreamRecvr>(_mock_state.get(), 
_mock_counter.get(),
+        recvr = std::make_shared<MockVDataStreamRecvr>(_mock_state.get(), 
_mock_counter.get(),
                                                        _mock_profile.get(), 
num_senders,
                                                        is_merging);
     }
 
-    std::unique_ptr<MockVDataStreamRecvr> recvr;
+    std::shared_ptr<MockVDataStreamRecvr> recvr;
 
     std::unique_ptr<RuntimeProfile::HighWaterMarkCounter> _mock_counter;
 
@@ -564,6 +573,49 @@ TEST_F(DataStreamRecvrTest, TestRemoteLocalMultiSender) {
     input3.join();
     output.join();
 }
+
+struct MockVDataStreamMgr : public VDataStreamMgr {
+    ~MockVDataStreamMgr() override = default;
+    Status find_recvr(const TUniqueId& fragment_instance_id, PlanNodeId 
node_id,
+                      std::shared_ptr<VDataStreamRecvr>* res, bool 
acquire_lock = true) override {
+        *res = recvr;
+        return Status::OK();
+    }
+
+    std::shared_ptr<VDataStreamRecvr> recvr;
+};
+
+TEST_F(DataStreamRecvrTest, transmit_block) {
+    create_recvr(1, true);
+    recvr->always_exceeds_limit = true;
+
+    MockVDataStreamMgr mgr;
+    mgr.recvr = recvr;
+
+    MockClosure closure;
+    closure._cb = [&]() { std::cout << "cb" << std::endl; };
+    google::protobuf::Closure* done = &closure;
+
+    PTransmitDataParams request;
+    {
+        auto* pblock = request.add_blocks();
+        auto block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4, 
5});
+        to_pblock(block, pblock);
+    }
+
+    {
+        auto* pblock = request.add_blocks();
+        auto block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4, 
5});
+        to_pblock(block, pblock);
+    }
+
+    {
+        auto st = mgr.transmit_block(&request, &done, 1000);
+        EXPECT_TRUE(st) << st.msg();
+    }
+    recvr->close();
+}
+
 // ./run-be-ut.sh --run --filter=DataStreamRecvrTest.*
 
 } // namespace doris::pipeline


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to