This is an automated email from the ASF dual-hosted git repository. panxiaolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 75d68f6ad64 [fix](core) Fix core caused by VDataStreamMgr::transmit_block (#50560) 75d68f6ad64 is described below commit 75d68f6ad64403b49af09ab32a336d7c4c97f74a Author: Mryange <yanxuech...@selectdb.com> AuthorDate: Wed Apr 30 19:42:33 2025 +0800 [fix](core) Fix core caused by VDataStreamMgr::transmit_block (#50560) ### What problem does this PR solve? https://github.com/apache/doris/pull/50113 transmit_block should be marked as done when transmitting the last block. Otherwise, the previous block might reach a memory limit and set done to null. ``` F20250430 11:49:29.413803 2176884 vdata_stream_recvr.cpp:200] Check failed: *done != nullptr *** Check failure stack trace: *** @ 0x558196be1956 google::LogMessage::SendToLog() @ 0x558196bde3a0 google::LogMessage::Flush() @ 0x558196be2199 google::LogMessageFatal::~LogMessageFatal() @ 0x558193b1659c doris::vectorized::VDataStreamRecvr::SenderQueue::add_block() @ 0x558193b1f7b5 doris::vectorized::VDataStreamRecvr::add_block() @ 0x558193af6cf8 doris::vectorized::VDataStreamMgr::transmit_block() @ 0x558157f7a685 doris::pipeline::DataStreamRecvrTest_transmit_block_Test::TestBody() @ 0x5581973c8c0b testing::internal::HandleSehExceptionsInMethodIfSupported<>() @ 0x5581973c2a69 testing::internal::HandleExceptionsInMethodIfSupported<>() @ 0x55819739943a testing::Test::Run() @ 0x558197399e5e testing::TestInfo::Run() @ 0x55819739a71e testing::TestSuite::Run() @ 0x5581973a9dde testing::internal::UnitTestImpl::RunAllTests() @ 0x5581973c9a56 testing::internal::HandleSehExceptionsInMethodIfSupported<>() @ 0x5581973c3a61 testing::internal::HandleExceptionsInMethodIfSupported<>() @ 0x5581973a85d3 testing::UnitTest::Run() @ 0x5581593d8653 RUN_ALL_TESTS() ``` ### Release note None ### Check List (For Author) - Test <!-- At least one of them must be included. --> - [ ] Regression test - [x] Unit Test - [ ] Manual test (add detailed scripts or steps below) - [ ] No need to test or manual test. Explain why: - [ ] This is a refactor/code format and no logic has been changed. - [ ] Previous test can cover this change. - [ ] No code files have been changed. - [ ] Other reason <!-- Add your reason? --> - Behavior changed: - [x] No. - [ ] Yes. <!-- Explain the behavior change --> - Does this need documentation? - [x] No. - [ ] Yes. <!-- Add document PR link here. eg: https://github.com/apache/doris-website/pull/1214 --> ### Check List (For Reviewer who merge this PR) - [ ] Confirm the release note - [ ] Confirm test cases - [ ] Confirm document - [ ] Add branch pick label <!-- Add branch pick label that this PR should merge into --> --- be/src/vec/runtime/vdata_stream_mgr.cpp | 16 ++++++- be/src/vec/runtime/vdata_stream_mgr.h | 8 ++-- be/src/vec/runtime/vdata_stream_recvr.cpp | 4 +- be/src/vec/runtime/vdata_stream_recvr.h | 2 +- be/test/pipeline/exec/vdata_stream_recvr_test.cpp | 56 ++++++++++++++++++++++- 5 files changed, 78 insertions(+), 8 deletions(-) diff --git a/be/src/vec/runtime/vdata_stream_mgr.cpp b/be/src/vec/runtime/vdata_stream_mgr.cpp index 2a4f4e22861..c81fa21fa34 100644 --- a/be/src/vec/runtime/vdata_stream_mgr.cpp +++ b/be/src/vec/runtime/vdata_stream_mgr.cpp @@ -150,9 +150,23 @@ Status VDataStreamMgr::transmit_block(const PTransmitDataParams* request, for (int i = 0; i < request->blocks_size(); i++) { std::unique_ptr<PBlock> pblock_ptr = std::make_unique<PBlock>(); pblock_ptr->Swap(const_cast<PBlock*>(&request->blocks(i))); + auto pass_done = [&]() -> ::google::protobuf::Closure** { + // If it is eos, no callback is needed, done can be nullptr + if (eos) { + return nullptr; + } + // If it is the last block, a callback is needed, pass done + if (i == request->blocks_size() - 1) { + return done; + } else { + // If it is not the last block, the blocks in the request currently belong to the same queue, + // and the callback is handled by the done of the last block + return nullptr; + } + }; RETURN_IF_ERROR(recvr->add_block( std::move(pblock_ptr), request->sender_id(), request->be_number(), - request->packet_seq() - request->blocks_size() + i, eos ? nullptr : done, + request->packet_seq() - request->blocks_size() + i, pass_done(), wait_for_worker, cpu_time_stop_watch.elapsed_time())); } } diff --git a/be/src/vec/runtime/vdata_stream_mgr.h b/be/src/vec/runtime/vdata_stream_mgr.h index f9b5bbe5bcd..a9266d02d96 100644 --- a/be/src/vec/runtime/vdata_stream_mgr.h +++ b/be/src/vec/runtime/vdata_stream_mgr.h @@ -27,6 +27,7 @@ #include <unordered_map> #include <utility> +#include "common/be_mock_util.h" #include "common/global_types.h" #include "common/status.h" #include "util/runtime_profile.h" @@ -52,15 +53,16 @@ class VDataStreamRecvr; class VDataStreamMgr { public: VDataStreamMgr(); - ~VDataStreamMgr(); + MOCK_FUNCTION ~VDataStreamMgr(); std::shared_ptr<VDataStreamRecvr> create_recvr( RuntimeState* state, RuntimeProfile::HighWaterMarkCounter* memory_used_counter, const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, int num_senders, RuntimeProfile* profile, bool is_merging, size_t data_queue_capacity); - Status find_recvr(const TUniqueId& fragment_instance_id, PlanNodeId node_id, - std::shared_ptr<VDataStreamRecvr>* res, bool acquire_lock = true); + MOCK_FUNCTION Status find_recvr(const TUniqueId& fragment_instance_id, PlanNodeId node_id, + std::shared_ptr<VDataStreamRecvr>* res, + bool acquire_lock = true); Status deregister_recvr(const TUniqueId& fragment_instance_id, PlanNodeId node_id); diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp b/be/src/vec/runtime/vdata_stream_recvr.cpp index 603270b7206..5bcd7dd1ef9 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.cpp +++ b/be/src/vec/runtime/vdata_stream_recvr.cpp @@ -489,7 +489,9 @@ void VDataStreamRecvr::close() { } // Remove this receiver from the DataStreamMgr that created it. // TODO: log error msg - static_cast<void>(_mgr->deregister_recvr(fragment_instance_id(), dest_node_id())); + if (_mgr) { + static_cast<void>(_mgr->deregister_recvr(fragment_instance_id(), dest_node_id())); + } _mgr = nullptr; _merger.reset(); diff --git a/be/src/vec/runtime/vdata_stream_recvr.h b/be/src/vec/runtime/vdata_stream_recvr.h index c311668ae82..9325f4ada3f 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.h +++ b/be/src/vec/runtime/vdata_stream_recvr.h @@ -111,7 +111,7 @@ public: // Careful: stream sender will call this function for a local receiver, // accessing members of receiver that are allocated by Object pool // in this function is not safe. - bool exceeds_limit(size_t block_byte_size); + MOCK_FUNCTION bool exceeds_limit(size_t block_byte_size); bool queue_exceeds_limit(size_t byte_size) const; bool is_closed() const { return _is_closed; } diff --git a/be/test/pipeline/exec/vdata_stream_recvr_test.cpp b/be/test/pipeline/exec/vdata_stream_recvr_test.cpp index 636772361c9..bed8be3e4f3 100644 --- a/be/test/pipeline/exec/vdata_stream_recvr_test.cpp +++ b/be/test/pipeline/exec/vdata_stream_recvr_test.cpp @@ -28,6 +28,7 @@ #include "testutil/column_helper.h" #include "testutil/mock/mock_runtime_state.h" #include "vec/data_types/data_type_number.h" +#include "vec/runtime/vdata_stream_mgr.h" namespace doris::pipeline { using namespace vectorized; @@ -37,6 +38,14 @@ struct MockVDataStreamRecvr : public VDataStreamRecvr { RuntimeProfile* profile, int num_senders, bool is_merging) : VDataStreamRecvr(nullptr, counter, state, TUniqueId(), 0, num_senders, is_merging, profile, 1) {}; + + bool exceeds_limit(size_t block_byte_size) override { + if (always_exceeds_limit) { + return true; + } + return VDataStreamRecvr::exceeds_limit(block_byte_size); + } + bool always_exceeds_limit = false; }; class DataStreamRecvrTest : public testing::Test { @@ -50,12 +59,12 @@ public: std::make_unique<RuntimeProfile::HighWaterMarkCounter>(TUnit::UNIT, 0, "test"); _mock_state = std::make_unique<MockRuntimeState>(); _mock_profile = std::make_unique<RuntimeProfile>("test"); - recvr = std::make_unique<MockVDataStreamRecvr>(_mock_state.get(), _mock_counter.get(), + recvr = std::make_shared<MockVDataStreamRecvr>(_mock_state.get(), _mock_counter.get(), _mock_profile.get(), num_senders, is_merging); } - std::unique_ptr<MockVDataStreamRecvr> recvr; + std::shared_ptr<MockVDataStreamRecvr> recvr; std::unique_ptr<RuntimeProfile::HighWaterMarkCounter> _mock_counter; @@ -564,6 +573,49 @@ TEST_F(DataStreamRecvrTest, TestRemoteLocalMultiSender) { input3.join(); output.join(); } + +struct MockVDataStreamMgr : public VDataStreamMgr { + ~MockVDataStreamMgr() override = default; + Status find_recvr(const TUniqueId& fragment_instance_id, PlanNodeId node_id, + std::shared_ptr<VDataStreamRecvr>* res, bool acquire_lock = true) override { + *res = recvr; + return Status::OK(); + } + + std::shared_ptr<VDataStreamRecvr> recvr; +}; + +TEST_F(DataStreamRecvrTest, transmit_block) { + create_recvr(1, true); + recvr->always_exceeds_limit = true; + + MockVDataStreamMgr mgr; + mgr.recvr = recvr; + + MockClosure closure; + closure._cb = [&]() { std::cout << "cb" << std::endl; }; + google::protobuf::Closure* done = &closure; + + PTransmitDataParams request; + { + auto* pblock = request.add_blocks(); + auto block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4, 5}); + to_pblock(block, pblock); + } + + { + auto* pblock = request.add_blocks(); + auto block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4, 5}); + to_pblock(block, pblock); + } + + { + auto st = mgr.transmit_block(&request, &done, 1000); + EXPECT_TRUE(st) << st.msg(); + } + recvr->close(); +} + // ./run-be-ut.sh --run --filter=DataStreamRecvrTest.* } // namespace doris::pipeline --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org