This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 21ef82206a4 [test](beut) add VDataStreamRecvr beut (#48188)
21ef82206a4 is described below

commit 21ef82206a4c44685e7d360fe1bafce0fb3fb07b
Author: Mryange <yanxuech...@selectdb.com>
AuthorDate: Tue Feb 25 12:51:41 2025 +0800

    [test](beut) add VDataStreamRecvr beut (#48188)
    
    ### What problem does this PR solve?
    add VDataStreamRecvr beut
    
    
    ### Release note
    
    None
    
    ### Check List (For Author)
    
    - Test <!-- At least one of them must be included. -->
        - [ ] Regression test
        - [x] Unit Test
        - [ ] Manual test (add detailed scripts or steps below)
        - [ ] No need to test or manual test. Explain why:
    - [ ] This is a refactor/code format and no logic has been changed.
            - [ ] Previous test can cover this change.
            - [ ] No code files have been changed.
            - [ ] Other reason <!-- Add your reason?  -->
    
    - Behavior changed:
        - [x] No.
        - [ ] Yes. <!-- Explain the behavior change -->
    
    - Does this need documentation?
        - [x] No.
    - [ ] Yes. <!-- Add document PR link here. eg:
    https://github.com/apache/doris-website/pull/1214 -->
    
    ### Check List (For Reviewer who merge this PR)
    
    - [x] Confirm the release note
    - [x] Confirm test cases
    - [x] Confirm document
    - [x] Add branch pick label <!-- Add branch pick label that this PR
    should merge into -->
---
 be/src/pipeline/exec/exchange_source_operator.cpp  |   4 +-
 be/src/vec/runtime/vdata_stream_mgr.cpp            |   9 +-
 be/src/vec/runtime/vdata_stream_mgr.h              |   5 +-
 be/src/vec/runtime/vdata_stream_recvr.cpp          |  80 ++-
 be/src/vec/runtime/vdata_stream_recvr.h            |  20 +-
 be/test/pipeline/exec/vdata_stream_recvr_test.cpp  | 569 +++++++++++++++++++++
 be/test/pipeline/pipeline_test.cpp                 |   5 +-
 .../{mock_runtime_state.h => mock_query_context.h} |  31 +-
 be/test/testutil/mock/mock_runtime_state.h         |   7 +-
 9 files changed, 643 insertions(+), 87 deletions(-)

diff --git a/be/src/pipeline/exec/exchange_source_operator.cpp 
b/be/src/pipeline/exec/exchange_source_operator.cpp
index bbec3d92703..2aefdf8cd3d 100644
--- a/be/src/pipeline/exec/exchange_source_operator.cpp
+++ b/be/src/pipeline/exec/exchange_source_operator.cpp
@@ -65,8 +65,8 @@ Status ExchangeLocalState::init(RuntimeState* state, 
LocalStateInfo& info) {
     SCOPED_TIMER(_init_timer);
     auto& p = _parent->cast<ExchangeSourceOperatorX>();
     stream_recvr = state->exec_env()->vstream_mgr()->create_recvr(
-            state, _memory_used_counter, p.input_row_desc(), 
state->fragment_instance_id(),
-            p.node_id(), p.num_senders(), profile(), p.is_merging(),
+            state, _memory_used_counter, state->fragment_instance_id(), 
p.node_id(),
+            p.num_senders(), profile(), p.is_merging(),
             std::max(20480, config::exchg_node_buffer_size_bytes /
                                     (p.is_merging() ? p.num_senders() : 1)));
     const auto& queues = stream_recvr->sender_queues();
diff --git a/be/src/vec/runtime/vdata_stream_mgr.cpp 
b/be/src/vec/runtime/vdata_stream_mgr.cpp
index 16d38ce2ea0..e31a2fbfea5 100644
--- a/be/src/vec/runtime/vdata_stream_mgr.cpp
+++ b/be/src/vec/runtime/vdata_stream_mgr.cpp
@@ -66,15 +66,14 @@ inline uint32_t VDataStreamMgr::get_hash_value(const 
TUniqueId& fragment_instanc
 
 std::shared_ptr<VDataStreamRecvr> VDataStreamMgr::create_recvr(
         RuntimeState* state, RuntimeProfile::HighWaterMarkCounter* 
memory_used_counter,
-        const RowDescriptor& row_desc, const TUniqueId& fragment_instance_id,
-        PlanNodeId dest_node_id, int num_senders, RuntimeProfile* profile, 
bool is_merging,
-        size_t data_queue_capacity) {
+        const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, int 
num_senders,
+        RuntimeProfile* profile, bool is_merging, size_t data_queue_capacity) {
     DCHECK(profile != nullptr);
     VLOG_FILE << "creating receiver for fragment=" << 
print_id(fragment_instance_id)
               << ", node=" << dest_node_id;
     std::shared_ptr<VDataStreamRecvr> recvr(new VDataStreamRecvr(
-            this, memory_used_counter, state, row_desc, fragment_instance_id, 
dest_node_id,
-            num_senders, is_merging, profile, data_queue_capacity));
+            this, memory_used_counter, state, fragment_instance_id, 
dest_node_id, num_senders,
+            is_merging, profile, data_queue_capacity));
     uint32_t hash_value = get_hash_value(fragment_instance_id, dest_node_id);
     std::unique_lock l(_lock);
     _fragment_stream_set.insert(std::make_pair(fragment_instance_id, 
dest_node_id));
diff --git a/be/src/vec/runtime/vdata_stream_mgr.h 
b/be/src/vec/runtime/vdata_stream_mgr.h
index 9bf54e94cb8..f9b5bbe5bcd 100644
--- a/be/src/vec/runtime/vdata_stream_mgr.h
+++ b/be/src/vec/runtime/vdata_stream_mgr.h
@@ -56,9 +56,8 @@ public:
 
     std::shared_ptr<VDataStreamRecvr> create_recvr(
             RuntimeState* state, RuntimeProfile::HighWaterMarkCounter* 
memory_used_counter,
-            const RowDescriptor& row_desc, const TUniqueId& 
fragment_instance_id,
-            PlanNodeId dest_node_id, int num_senders, RuntimeProfile* profile, 
bool is_merging,
-            size_t data_queue_capacity);
+            const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, 
int num_senders,
+            RuntimeProfile* profile, bool is_merging, size_t 
data_queue_capacity);
 
     Status find_recvr(const TUniqueId& fragment_instance_id, PlanNodeId 
node_id,
                       std::shared_ptr<VDataStreamRecvr>* res, bool 
acquire_lock = true);
diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp 
b/be/src/vec/runtime/vdata_stream_recvr.cpp
index 252104031e0..603270b7206 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.cpp
+++ b/be/src/vec/runtime/vdata_stream_recvr.cpp
@@ -44,7 +44,7 @@ namespace doris::vectorized {
 #include "common/compile_check_begin.h"
 
 VDataStreamRecvr::SenderQueue::SenderQueue(
-        VDataStreamRecvr* parent_recvr, int num_senders, RuntimeProfile* 
profile,
+        VDataStreamRecvr* parent_recvr, int num_senders,
         std::shared_ptr<pipeline::Dependency> local_channel_dependency)
         : _recvr(parent_recvr),
           _is_cancelled(false),
@@ -80,7 +80,7 @@ Status VDataStreamRecvr::SenderQueue::get_batch(Block* block, 
bool* eos) {
 #endif
     BlockItem block_item;
     {
-        std::lock_guard<std::mutex> l(_lock);
+        INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> l(_lock));
         //check and get block_item from data_queue
         if (_is_cancelled) {
             RETURN_IF_ERROR(_cancel_status);
@@ -104,7 +104,7 @@ Status VDataStreamRecvr::SenderQueue::get_batch(Block* 
block, bool* eos) {
     COUNTER_UPDATE(_recvr->_decompress_timer, block->get_decompress_time());
     COUNTER_UPDATE(_recvr->_decompress_bytes, block->get_decompressed_bytes());
     _recvr->_memory_used_counter->update(-(int64_t)block_byte_size);
-    std::lock_guard<std::mutex> l(_lock);
+    INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> l(_lock));
     sub_blocks_memory_usage(block_byte_size);
     _record_debug_info();
     if (_block_queue.empty() && _source_dependency) {
@@ -131,7 +131,9 @@ Status VDataStreamRecvr::SenderQueue::get_batch(Block* 
block, bool* eos) {
     return Status::OK();
 }
 
-void VDataStreamRecvr::SenderQueue::try_set_dep_ready_without_lock() {
+void 
VDataStreamRecvr::SenderQueue::set_source_ready(std::lock_guard<std::mutex>&) {
+    // Here, it is necessary to check if _source_dependency is not nullptr.
+    // This is because the queue might be closed before setting the source 
dependency.
     if (!_source_dependency) {
         return;
     }
@@ -147,7 +149,7 @@ Status 
VDataStreamRecvr::SenderQueue::add_block(std::unique_ptr<PBlock> pblock,
                                                 const int64_t wait_for_worker,
                                                 const uint64_t 
time_to_find_recvr) {
     {
-        std::lock_guard<std::mutex> l(_lock);
+        INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> l(_lock));
         if (_is_cancelled) {
             return Status::OK();
         }
@@ -171,7 +173,7 @@ Status 
VDataStreamRecvr::SenderQueue::add_block(std::unique_ptr<PBlock> pblock,
         }
     }
 
-    std::lock_guard<std::mutex> l(_lock);
+    INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> l(_lock));
     if (_is_cancelled) {
         return Status::OK();
     }
@@ -189,7 +191,7 @@ Status 
VDataStreamRecvr::SenderQueue::add_block(std::unique_ptr<PBlock> pblock,
     _block_queue.emplace_back(std::move(pblock), block_byte_size);
     COUNTER_UPDATE(_recvr->_remote_bytes_received_counter, block_byte_size);
     _record_debug_info();
-    try_set_dep_ready_without_lock();
+    set_source_ready(l);
 
     // if done is nullptr, this function can't delay this response
     if (done != nullptr && _recvr->exceeds_limit(block_byte_size)) {
@@ -209,10 +211,14 @@ void VDataStreamRecvr::SenderQueue::add_block(Block* 
block, bool use_move) {
         return;
     }
     {
-        std::unique_lock<std::mutex> l(_lock);
+        INJECT_MOCK_SLEEP(std::unique_lock<std::mutex> l(_lock));
         if (_is_cancelled) {
             return;
         }
+        DCHECK(_num_remaining_senders >= 0);
+        if (_num_remaining_senders == 0) {
+            return;
+        }
     }
     BlockUPtr nblock = 
Block::create_unique(block->get_columns_with_type_and_name());
 
@@ -230,13 +236,13 @@ void VDataStreamRecvr::SenderQueue::add_block(Block* 
block, bool use_move) {
 
     auto block_mem_size = nblock->allocated_bytes();
     {
-        std::unique_lock<std::mutex> l(_lock);
+        INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> l(_lock));
         if (_is_cancelled) {
             return;
         }
         _block_queue.emplace_back(std::move(nblock), block_mem_size);
         _record_debug_info();
-        try_set_dep_ready_without_lock();
+        set_source_ready(l);
         COUNTER_UPDATE(_recvr->_local_bytes_received_counter, block_mem_size);
         _recvr->_memory_used_counter->update(block_mem_size);
         add_blocks_memory_usage(block_mem_size);
@@ -244,7 +250,7 @@ void VDataStreamRecvr::SenderQueue::add_block(Block* block, 
bool use_move) {
 }
 
 void VDataStreamRecvr::SenderQueue::decrement_senders(int be_number) {
-    std::lock_guard<std::mutex> l(_lock);
+    INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> l(_lock));
     if (_sender_eos_set.end() != _sender_eos_set.find(be_number)) {
         return;
     }
@@ -256,25 +262,25 @@ void VDataStreamRecvr::SenderQueue::decrement_senders(int 
be_number) {
               << print_id(_recvr->fragment_instance_id()) << " node_id=" << 
_recvr->dest_node_id()
               << " #senders=" << _num_remaining_senders;
     if (_num_remaining_senders == 0) {
-        try_set_dep_ready_without_lock();
+        set_source_ready(l);
     }
 }
 
 void VDataStreamRecvr::SenderQueue::cancel(Status cancel_status) {
     {
-        std::lock_guard<std::mutex> l(_lock);
+        INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> l(_lock));
         if (_is_cancelled) {
             return;
         }
         _is_cancelled = true;
         _cancel_status = cancel_status;
-        try_set_dep_ready_without_lock();
+        set_source_ready(l);
         VLOG_QUERY << "cancelled stream: _fragment_instance_id="
                    << print_id(_recvr->fragment_instance_id())
                    << " node_id=" << _recvr->dest_node_id();
     }
     {
-        std::lock_guard<std::mutex> l(_lock);
+        INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> l(_lock));
         for (auto closure_pair : _pending_closures) {
             closure_pair.first->Run();
             int64_t elapse_time = closure_pair.second.elapsed_time();
@@ -287,34 +293,30 @@ void VDataStreamRecvr::SenderQueue::cancel(Status 
cancel_status) {
 }
 
 void VDataStreamRecvr::SenderQueue::close() {
-    {
-        // If _is_cancelled is not set to true, there may be concurrent send
-        // which add batch to _block_queue. The batch added after _block_queue
-        // is clear will be memory leak
-        std::lock_guard<std::mutex> l(_lock);
-        _is_cancelled = true;
-        try_set_dep_ready_without_lock();
+    // If _is_cancelled is not set to true, there may be concurrent send
+    // which add batch to _block_queue. The batch added after _block_queue
+    // is clear will be memory leak
+    INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> l(_lock));
+    _is_cancelled = true;
+    set_source_ready(l);
 
-        for (auto closure_pair : _pending_closures) {
-            closure_pair.first->Run();
-            int64_t elapse_time = closure_pair.second.elapsed_time();
-            if (_recvr->_max_wait_to_process_time->value() < elapse_time) {
-                _recvr->_max_wait_to_process_time->set(elapse_time);
-            }
+    for (auto closure_pair : _pending_closures) {
+        closure_pair.first->Run();
+        int64_t elapse_time = closure_pair.second.elapsed_time();
+        if (_recvr->_max_wait_to_process_time->value() < elapse_time) {
+            _recvr->_max_wait_to_process_time->set(elapse_time);
         }
-        _pending_closures.clear();
     }
-
+    _pending_closures.clear();
     // Delete any batches queued in _block_queue
     _block_queue.clear();
 }
 
 VDataStreamRecvr::VDataStreamRecvr(VDataStreamMgr* stream_mgr,
                                    RuntimeProfile::HighWaterMarkCounter* 
memory_used_counter,
-                                   RuntimeState* state, const RowDescriptor& 
row_desc,
-                                   const TUniqueId& fragment_instance_id, 
PlanNodeId dest_node_id,
-                                   int num_senders, bool is_merging, 
RuntimeProfile* profile,
-                                   size_t data_queue_capacity)
+                                   RuntimeState* state, const TUniqueId& 
fragment_instance_id,
+                                   PlanNodeId dest_node_id, int num_senders, 
bool is_merging,
+                                   RuntimeProfile* profile, size_t 
data_queue_capacity)
         : HasTaskExecutionCtx(state),
           _mgr(stream_mgr),
           _memory_used_counter(memory_used_counter),
@@ -322,7 +324,6 @@ VDataStreamRecvr::VDataStreamRecvr(VDataStreamMgr* 
stream_mgr,
           _query_context(state->get_query_ctx()->shared_from_this()),
           _fragment_instance_id(fragment_instance_id),
           _dest_node_id(dest_node_id),
-          _row_desc(row_desc),
           _is_merging(is_merging),
           _is_closed(false),
           _sender_queue_mem_limit(data_queue_capacity),
@@ -344,7 +345,7 @@ VDataStreamRecvr::VDataStreamRecvr(VDataStreamMgr* 
stream_mgr,
     int num_sender_per_queue = is_merging ? 1 : num_senders;
     for (int i = 0; i < num_queues; ++i) {
         SenderQueue* queue = nullptr;
-        queue = _sender_queue_pool.add(new SenderQueue(this, 
num_sender_per_queue, profile,
+        queue = _sender_queue_pool.add(new SenderQueue(this, 
num_sender_per_queue,
                                                        
_sender_to_local_channel_dependency[i]));
         _sender_queues.push_back(queue);
     }
@@ -495,11 +496,8 @@ void VDataStreamRecvr::close() {
 }
 
 void VDataStreamRecvr::set_sink_dep_always_ready() const {
-    for (auto* sender_queues : sender_queues()) {
-        auto dep = sender_queues->local_channel_dependency();
-        if (dep) {
-            dep->set_always_ready();
-        }
+    for (auto dep : _sender_to_local_channel_dependency) {
+        dep->set_always_ready();
     }
 }
 
diff --git a/be/src/vec/runtime/vdata_stream_recvr.h 
b/be/src/vec/runtime/vdata_stream_recvr.h
index 0d444dbb397..85cfe08005a 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.h
+++ b/be/src/vec/runtime/vdata_stream_recvr.h
@@ -73,10 +73,9 @@ class VDataStreamRecvr : public HasTaskExecutionCtx {
 public:
     class SenderQueue;
     VDataStreamRecvr(VDataStreamMgr* stream_mgr, 
RuntimeProfile::HighWaterMarkCounter* counter,
-                     RuntimeState* state, const RowDescriptor& row_desc,
-                     const TUniqueId& fragment_instance_id, PlanNodeId 
dest_node_id,
-                     int num_senders, bool is_merging, RuntimeProfile* profile,
-                     size_t data_queue_capacity);
+                     RuntimeState* state, const TUniqueId& 
fragment_instance_id,
+                     PlanNodeId dest_node_id, int num_senders, bool is_merging,
+                     RuntimeProfile* profile, size_t data_queue_capacity);
 
     ~VDataStreamRecvr() override;
 
@@ -97,7 +96,6 @@ public:
 
     const TUniqueId& fragment_instance_id() const { return 
_fragment_instance_id; }
     PlanNodeId dest_node_id() const { return _dest_node_id; }
-    const RowDescriptor& row_desc() const { return _row_desc; }
 
     // Indicate that a particular sender is done. Delegated to the appropriate
     // sender queue. Called from DataStreamMgr.
@@ -176,15 +174,11 @@ private:
 
 class VDataStreamRecvr::SenderQueue {
 public:
-    SenderQueue(VDataStreamRecvr* parent_recvr, int num_senders, 
RuntimeProfile* profile,
+    SenderQueue(VDataStreamRecvr* parent_recvr, int num_senders,
                 std::shared_ptr<pipeline::Dependency> 
local_channel_dependency);
 
     ~SenderQueue();
 
-    std::shared_ptr<pipeline::Dependency> local_channel_dependency() {
-        return _local_channel_dependency;
-    }
-
     Status get_batch(Block* next_block, bool* eos);
 
     Status add_block(std::unique_ptr<PBlock> pblock, int be_number, int64_t 
packet_seq,
@@ -203,15 +197,15 @@ public:
         _source_dependency = dependency;
     }
 
+protected:
     void add_blocks_memory_usage(int64_t size);
 
     void sub_blocks_memory_usage(int64_t size);
 
     bool exceeds_limit();
-
-protected:
     friend class pipeline::ExchangeLocalState;
-    void try_set_dep_ready_without_lock();
+
+    void set_source_ready(std::lock_guard<std::mutex>&);
 
     // To record information about several variables in the event of a DCHECK 
failure.
     //  DCHECK(_is_cancelled || !_block_queue.empty() || 
_num_remaining_senders == 0)
diff --git a/be/test/pipeline/exec/vdata_stream_recvr_test.cpp 
b/be/test/pipeline/exec/vdata_stream_recvr_test.cpp
new file mode 100644
index 00000000000..636772361c9
--- /dev/null
+++ b/be/test/pipeline/exec/vdata_stream_recvr_test.cpp
@@ -0,0 +1,569 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/runtime/vdata_stream_recvr.h"
+
+#include <gtest/gtest.h>
+
+#include <memory>
+#include <thread>
+#include <vector>
+
+#include "pipeline/dependency.h"
+#include "pipeline/exec/multi_cast_data_streamer.h"
+#include "testutil/column_helper.h"
+#include "testutil/mock/mock_runtime_state.h"
+#include "vec/data_types/data_type_number.h"
+
+namespace doris::pipeline {
+using namespace vectorized;
+
+struct MockVDataStreamRecvr : public VDataStreamRecvr {
+    MockVDataStreamRecvr(RuntimeState* state, 
RuntimeProfile::HighWaterMarkCounter* counter,
+                         RuntimeProfile* profile, int num_senders, bool 
is_merging)
+            : VDataStreamRecvr(nullptr, counter, state, TUniqueId(), 0, 
num_senders, is_merging,
+                               profile, 1) {};
+};
+
+class DataStreamRecvrTest : public testing::Test {
+public:
+    DataStreamRecvrTest() = default;
+    ~DataStreamRecvrTest() override = default;
+    void SetUp() override {}
+
+    void create_recvr(int num_senders, bool is_merging) {
+        _mock_counter =
+                
std::make_unique<RuntimeProfile::HighWaterMarkCounter>(TUnit::UNIT, 0, "test");
+        _mock_state = std::make_unique<MockRuntimeState>();
+        _mock_profile = std::make_unique<RuntimeProfile>("test");
+        recvr = std::make_unique<MockVDataStreamRecvr>(_mock_state.get(), 
_mock_counter.get(),
+                                                       _mock_profile.get(), 
num_senders,
+                                                       is_merging);
+    }
+
+    std::unique_ptr<MockVDataStreamRecvr> recvr;
+
+    std::unique_ptr<RuntimeProfile::HighWaterMarkCounter> _mock_counter;
+
+    std::unique_ptr<MockRuntimeState> _mock_state;
+
+    std::unique_ptr<RuntimeProfile> _mock_profile;
+};
+
+TEST_F(DataStreamRecvrTest, TestCreateSenderQueue) {
+    {
+        create_recvr(3, false);
+        EXPECT_EQ(recvr->sender_queues().size(), 1);
+        EXPECT_EQ(recvr->sender_queues().back()->_num_remaining_senders, 3);
+    }
+
+    {
+        create_recvr(3, true);
+        EXPECT_EQ(recvr->sender_queues().size(), 3);
+        for (auto& queue : recvr->sender_queues()) {
+            EXPECT_EQ(queue->_num_remaining_senders, 1);
+        }
+    }
+}
+
+TEST_F(DataStreamRecvrTest, TestSender) {
+    create_recvr(3, false);
+    EXPECT_EQ(recvr->sender_queues().size(), 1);
+    EXPECT_EQ(recvr->sender_queues().back()->_num_remaining_senders, 3);
+
+    auto* sender = recvr->sender_queues().back();
+
+    auto sink_dep = sender->_local_channel_dependency;
+    auto source_dep = std::make_shared<Dependency>(0, 0, "test", false);
+    sender->set_dependency(source_dep);
+
+    EXPECT_EQ(sink_dep->ready(), true);
+    EXPECT_EQ(source_dep->ready(), false);
+
+    EXPECT_EQ(sender->_num_remaining_senders, 3);
+    EXPECT_EQ(sender->_block_queue.size(), 0);
+
+    {
+        auto block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4, 
5});
+        sender->add_block(&block, false);
+    }
+
+    EXPECT_EQ(sink_dep->ready(), false);
+
+    {
+        EXPECT_EQ(sender->_block_queue.size(), 1);
+        Block block;
+        bool eos = false;
+        auto st = sender->get_batch(&block, &eos);
+        EXPECT_TRUE(st) << st.msg();
+        EXPECT_TRUE(ColumnHelper::block_equal(
+                block, ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4, 
5})));
+        EXPECT_FALSE(eos);
+    }
+
+    {
+        sender->decrement_senders(1);
+        EXPECT_EQ(sender->_num_remaining_senders, 2);
+        sender->decrement_senders(2);
+        EXPECT_EQ(sender->_num_remaining_senders, 1);
+        sender->decrement_senders(3);
+        EXPECT_EQ(sender->_num_remaining_senders, 0);
+
+        EXPECT_EQ(sender->_block_queue.size(), 0);
+        auto block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4, 
5});
+        sender->add_block(&block, false);
+        EXPECT_EQ(sender->_block_queue.size(), 0);
+    }
+
+    {
+        EXPECT_EQ(sender->_block_queue.size(), 0);
+        Block block;
+        bool eos = false;
+        auto st = sender->get_batch(&block, &eos);
+        EXPECT_TRUE(st) << st.msg();
+        EXPECT_TRUE(eos);
+    }
+
+    sender->close();
+}
+
+TEST_F(DataStreamRecvrTest, TestSenderClose) {
+    create_recvr(3, false);
+    EXPECT_EQ(recvr->sender_queues().size(), 1);
+    EXPECT_EQ(recvr->sender_queues().back()->_num_remaining_senders, 3);
+
+    auto* sender = recvr->sender_queues().back();
+
+    auto sink_dep = sender->_local_channel_dependency;
+    sender->close();
+}
+
+TEST_F(DataStreamRecvrTest, TestRandomSender) {
+    create_recvr(3, false);
+    EXPECT_EQ(recvr->sender_queues().size(), 1);
+    EXPECT_EQ(recvr->sender_queues().back()->_num_remaining_senders, 3);
+
+    auto* sender = recvr->sender_queues().back();
+
+    auto sink_dep = sender->_local_channel_dependency;
+    auto source_dep = std::make_shared<Dependency>(0, 0, "test", false);
+    sender->set_dependency(source_dep);
+
+    EXPECT_EQ(sink_dep->ready(), true);
+    EXPECT_EQ(source_dep->ready(), false);
+
+    auto input_func = [&](int id) {
+        mock_random_sleep();
+        int input_block = 0;
+        while (true) {
+            if (sink_dep->ready()) {
+                auto block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 
3, 4, 5});
+                sender->add_block(&block, false);
+                input_block++;
+                if (input_block == 100) {
+                    sender->decrement_senders(id);
+                    break;
+                }
+            }
+        }
+    };
+
+    auto outout_func = [&]() {
+        mock_random_sleep();
+        int output_block = 0;
+        while (true) {
+            if (source_dep->ready()) {
+                Block block;
+                bool eos = false;
+                auto st = sender->get_batch(&block, &eos);
+                EXPECT_TRUE(st) << st.msg();
+                if (!block.empty()) {
+                    output_block++;
+                }
+                if (eos) {
+                    EXPECT_EQ(output_block, 3 * 100);
+                    break;
+                }
+            }
+        }
+    };
+
+    std::thread input1(input_func, 1);
+    std::thread input2(input_func, 2);
+    std::thread input3(input_func, 3);
+    std::thread output(outout_func);
+
+    input1.join();
+    input2.join();
+    input3.join();
+    output.join();
+}
+
+TEST_F(DataStreamRecvrTest, TestRandomCloseSender) {
+    create_recvr(3, false);
+    EXPECT_EQ(recvr->sender_queues().size(), 1);
+    EXPECT_EQ(recvr->sender_queues().back()->_num_remaining_senders, 3);
+
+    auto* sender = recvr->sender_queues().back();
+
+    auto sink_dep = sender->_local_channel_dependency;
+    auto source_dep = std::make_shared<Dependency>(0, 0, "test", false);
+    sender->set_dependency(source_dep);
+
+    EXPECT_EQ(sink_dep->ready(), true);
+    EXPECT_EQ(source_dep->ready(), false);
+
+    auto input_func = [&](int id) {
+        mock_random_sleep();
+        int input_block = 0;
+        while (true) {
+            if (sink_dep->ready()) {
+                auto block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 
3, 4, 5});
+                sender->add_block(&block, false);
+                input_block++;
+                if (input_block == 100) {
+                    sender->decrement_senders(id);
+                    break;
+                }
+            }
+        }
+        std::cout << "input func : " << id << " end " << std::endl;
+    };
+
+    auto outout_func = [&]() {
+        mock_random_sleep();
+        try {
+            while (true) {
+                if (source_dep->ready()) {
+                    Block block;
+                    bool eos = false;
+                    auto st = sender->get_batch(&block, &eos);
+
+                    if (!st.ok()) {
+                        std::cout << "get_batch error: " << st.msg() << 
std::endl;
+                        break;
+                    }
+                    if (eos) {
+                        break;
+                    }
+                }
+            }
+        } catch (std::exception& e) {
+            std::cout << "exception: " << e.what() << std::endl;
+        }
+        std::cout << "output func end" << std::endl;
+    };
+
+    auto close_func = [&]() {
+        try {
+            mock_random_sleep();
+            std::cout << "close_func start" << std::endl;
+            recvr->close();
+            std::cout << "close_func end" << std::endl;
+        } catch (const std::exception& e) {
+            std::cout << "close exception: " << e.what() << std::endl;
+        }
+    };
+
+    std::vector<std::thread> threads;
+    threads.emplace_back(input_func, 1);
+    threads.emplace_back(input_func, 2);
+    threads.emplace_back(input_func, 3);
+    threads.emplace_back(outout_func);
+    threads.emplace_back(close_func);
+
+    for (auto& t : threads) {
+        if (t.joinable()) {
+            try {
+                t.join();
+            } catch (const std::system_error& e) {
+                std::cout << "Thread join error: " << e.what() << std::endl;
+            }
+        }
+    }
+}
+
+class MockClosure : public google::protobuf::Closure {
+    MockClosure() = default;
+
+    ~MockClosure() override = default;
+
+    void Run() override { _cb(); }
+    std::function<void()> _cb;
+};
+
+void to_pblock(Block& block, PBlock* pblock) {
+    size_t uncompressed_bytes = 0;
+    size_t compressed_bytes = 0;
+    EXPECT_TRUE(block.serialize(BeExecVersionManager::get_newest_version(), 
pblock,
+                                &uncompressed_bytes, &compressed_bytes,
+                                
segment_v2::CompressionTypePB::NO_COMPRESSION));
+}
+
+TEST_F(DataStreamRecvrTest, TestRemoteSender) {
+    create_recvr(3, false);
+    EXPECT_EQ(recvr->sender_queues().size(), 1);
+    EXPECT_EQ(recvr->sender_queues().back()->_num_remaining_senders, 3);
+
+    auto* sender = recvr->sender_queues().back();
+
+    auto sink_dep = sender->_local_channel_dependency;
+    auto source_dep = std::make_shared<Dependency>(0, 0, "test", false);
+    sender->set_dependency(source_dep);
+
+    EXPECT_EQ(sink_dep->ready(), true);
+    EXPECT_EQ(source_dep->ready(), false);
+
+    EXPECT_EQ(sender->_num_remaining_senders, 3);
+    EXPECT_EQ(sender->_block_queue.size(), 0);
+
+    {
+        auto block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4, 
5});
+        auto pblock = std::make_unique<PBlock>();
+        to_pblock(block, pblock.get());
+        MockClosure closure;
+        closure._cb = [&]() { std::cout << "cb" << std::endl; };
+        google::protobuf::Closure* done = &closure;
+        auto st = sender->add_block(std::move(pblock), 1, 1, &done, 0, 0);
+        if (done != nullptr) {
+            done->Run();
+        }
+    }
+    sender->close();
+}
+
+TEST_F(DataStreamRecvrTest, TestRemoteMemLimitSender) {
+    create_recvr(3, false);
+    EXPECT_EQ(recvr->sender_queues().size(), 1);
+    EXPECT_EQ(recvr->sender_queues().back()->_num_remaining_senders, 3);
+
+    auto* sender = recvr->sender_queues().back();
+
+    auto sink_dep = sender->_local_channel_dependency;
+    auto source_dep = std::make_shared<Dependency>(0, 0, "test", false);
+    sender->set_dependency(source_dep);
+
+    EXPECT_EQ(sink_dep->ready(), true);
+    EXPECT_EQ(source_dep->ready(), false);
+
+    EXPECT_EQ(sender->_num_remaining_senders, 3);
+    EXPECT_EQ(sender->_block_queue.size(), 0);
+
+    config::exchg_node_buffer_size_bytes = 1;
+
+    Defer set_([&]() { config::exchg_node_buffer_size_bytes = 20485760; });
+
+    {
+        auto block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4, 
5});
+        auto pblock = std::make_unique<PBlock>();
+        to_pblock(block, pblock.get());
+        MockClosure closure;
+        bool flag = false;
+        closure._cb = [&]() {
+            std::cout << "cb" << std::endl;
+            EXPECT_TRUE(flag);
+        };
+        google::protobuf::Closure* done = &closure;
+        auto st = sender->add_block(std::move(pblock), 1, 1, &done, 0, 0);
+        EXPECT_EQ(done, nullptr);
+        flag = true;
+
+        {
+            Block block;
+            bool eos = false;
+            auto st = sender->get_batch(&block, &eos);
+        }
+    }
+    sender->close();
+}
+
+TEST_F(DataStreamRecvrTest, TestRemoteMultiSender) {
+    create_recvr(3, false);
+    EXPECT_EQ(recvr->sender_queues().size(), 1);
+    EXPECT_EQ(recvr->sender_queues().back()->_num_remaining_senders, 3);
+
+    auto* sender = recvr->sender_queues().back();
+
+    auto sink_dep = sender->_local_channel_dependency;
+    auto source_dep = std::make_shared<Dependency>(0, 0, "test", false);
+    sender->set_dependency(source_dep);
+
+    EXPECT_EQ(sink_dep->ready(), true);
+    EXPECT_EQ(source_dep->ready(), false);
+
+    std::vector<std::shared_ptr<MockClosure>> closures {10};
+
+    for (auto i = 0; i < 10; i++) {
+        closures[i] = std::make_shared<MockClosure>();
+    }
+
+    auto input_func = [&](int id) {
+        mock_random_sleep();
+        int input_block = 0;
+        auto closure = closures[id];
+        std::atomic_bool cb_flag = true;
+        closure->_cb = [&]() { cb_flag = true; };
+        while (true) {
+            if (sink_dep->ready() && cb_flag) {
+                auto block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 
3, 4, 5});
+                auto pblock = std::make_unique<PBlock>();
+                to_pblock(block, pblock.get());
+                google::protobuf::Closure* done = closure.get();
+                cb_flag = false;
+                auto st = sender->add_block(std::move(pblock), id, 
input_block, &done, 0, 0);
+                EXPECT_TRUE(st) << st.msg();
+                input_block++;
+                if (done != nullptr) {
+                    done->Run();
+                }
+                if (input_block == 100) {
+                    sender->decrement_senders(id);
+                    break;
+                }
+            }
+        }
+        std::cout << "input func : " << id << " end "
+                  << "input_block  : " << input_block << std::endl;
+    };
+
+    auto outout_func = [&]() {
+        mock_random_sleep();
+        int output_block = 0;
+        while (true) {
+            if (source_dep->ready()) {
+                Block block;
+                bool eos = false;
+                auto st = sender->get_batch(&block, &eos);
+                EXPECT_TRUE(st) << st.msg();
+                if (!block.empty()) {
+                    output_block++;
+                }
+                if (eos) {
+                    EXPECT_EQ(output_block, 3 * 100);
+                    break;
+                }
+            }
+        }
+    };
+
+    std::thread input1(input_func, 1);
+    std::thread input2(input_func, 2);
+    std::thread input3(input_func, 3);
+    std::thread output(outout_func);
+
+    input1.join();
+    input2.join();
+    input3.join();
+    output.join();
+}
+
+TEST_F(DataStreamRecvrTest, TestRemoteLocalMultiSender) {
+    create_recvr(3, false);
+    EXPECT_EQ(recvr->sender_queues().size(), 1);
+    EXPECT_EQ(recvr->sender_queues().back()->_num_remaining_senders, 3);
+
+    auto* sender = recvr->sender_queues().back();
+
+    auto sink_dep = sender->_local_channel_dependency;
+    auto source_dep = std::make_shared<Dependency>(0, 0, "test", false);
+    sender->set_dependency(source_dep);
+
+    EXPECT_EQ(sink_dep->ready(), true);
+    EXPECT_EQ(source_dep->ready(), false);
+
+    std::vector<std::shared_ptr<MockClosure>> closures {10};
+
+    for (auto i = 0; i < 10; i++) {
+        closures[i] = std::make_shared<MockClosure>();
+    }
+
+    auto input_remote_func = [&](int id) {
+        mock_random_sleep();
+        int input_block = 0;
+        auto closure = closures[id];
+        std::atomic_bool cb_flag = true;
+        closure->_cb = [&]() { cb_flag = true; };
+        while (true) {
+            if (sink_dep->ready() && cb_flag) {
+                auto block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 
3, 4, 5});
+                auto pblock = std::make_unique<PBlock>();
+                to_pblock(block, pblock.get());
+                google::protobuf::Closure* done = closure.get();
+                cb_flag = false;
+                auto st = sender->add_block(std::move(pblock), id, 
input_block, &done, 0, 0);
+                EXPECT_TRUE(st) << st.msg();
+                input_block++;
+                if (done != nullptr) {
+                    done->Run();
+                }
+                if (input_block == 100) {
+                    sender->decrement_senders(id);
+                    break;
+                }
+            }
+        }
+    };
+
+    auto input_local_func = [&](int id) {
+        mock_random_sleep();
+        int input_block = 0;
+        while (true) {
+            if (sink_dep->ready()) {
+                auto block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 
3, 4, 5});
+                sender->add_block(&block, false);
+                input_block++;
+                if (input_block == 100) {
+                    sender->decrement_senders(id);
+                    break;
+                }
+            }
+        }
+    };
+
+    auto outout_func = [&]() {
+        mock_random_sleep();
+        int output_block = 0;
+        while (true) {
+            if (source_dep->ready()) {
+                Block block;
+                bool eos = false;
+                auto st = sender->get_batch(&block, &eos);
+                EXPECT_TRUE(st) << st.msg();
+                if (!block.empty()) {
+                    output_block++;
+                }
+                if (eos) {
+                    EXPECT_EQ(output_block, 3 * 100);
+                    break;
+                }
+            }
+        }
+    };
+
+    std::thread input1(input_remote_func, 1);
+    std::thread input2(input_local_func, 2);
+    std::thread input3(input_remote_func, 3);
+    std::thread output(outout_func);
+
+    input1.join();
+    input2.join();
+    input3.join();
+    output.join();
+}
+// ./run-be-ut.sh --run --filter=DataStreamRecvrTest.*
+
+} // namespace doris::pipeline
diff --git a/be/test/pipeline/pipeline_test.cpp 
b/be/test/pipeline/pipeline_test.cpp
index 1c76f2d3c9c..b915dba0fa9 100644
--- a/be/test/pipeline/pipeline_test.cpp
+++ b/be/test/pipeline/pipeline_test.cpp
@@ -309,7 +309,7 @@ TEST_F(PipelineTest, HAPPY_PATH) {
     EXPECT_GT(block_mem_usage - 1, 0);
 
     auto downstream_recvr = ExecEnv::GetInstance()->_vstream_mgr->create_recvr(
-            downstream_runtime_state.get(), memory_used_counter, 
op->row_desc(), dest0, 1, 1,
+            downstream_runtime_state.get(), memory_used_counter, dest0, 1, 1,
             downstream_pipeline_profile.get(), false, block_mem_usage - 1);
     std::vector<TScanRangeParams> scan_ranges;
     EXPECT_EQ(_pipeline_tasks[cur_pipe->id()].back()->prepare(scan_ranges, 0, 
tsink,
@@ -991,8 +991,7 @@ TEST_F(PipelineTest, PLAN_HASH_JOIN) {
         auto* memory_used_counter = 
downstream_pipeline_profile->AddHighWaterMarkCounter(
                 "MemoryUsage", TUnit::BYTES, "", 1);
         downstream_recvr = ExecEnv::GetInstance()->_vstream_mgr->create_recvr(
-                downstream_runtime_state.get(), memory_used_counter,
-                _pipelines.front()->operators().back()->row_desc(), 
dest_ins_id, dest_node_id,
+                downstream_runtime_state.get(), memory_used_counter, 
dest_ins_id, dest_node_id,
                 parallelism, downstream_pipeline_profile.get(), false, 
2048000);
     }
     for (size_t i = 0; i < _pipelines.size(); i++) {
diff --git a/be/test/testutil/mock/mock_runtime_state.h 
b/be/test/testutil/mock/mock_query_context.h
similarity index 56%
copy from be/test/testutil/mock/mock_runtime_state.h
copy to be/test/testutil/mock/mock_query_context.h
index 506fafc2105..bd6b47483e4 100644
--- a/be/test/testutil/mock/mock_runtime_state.h
+++ b/be/test/testutil/mock/mock_query_context.h
@@ -16,28 +16,21 @@
 // under the License.
 
 #pragma once
-#include "runtime/runtime_state.h"
+#include "runtime/query_context.h"
 
 namespace doris {
 
-class MockContext : public TaskExecutionContext {};
-
-class MockRuntimeState : public RuntimeState {
-public:
-    MockRuntimeState() { set_task_execution_context(_mock_context); };
-
-    int batch_size() const override { return batsh_size; }
-
-    bool enable_shared_exchange_sink_buffer() const override {
-        return _enable_shared_exchange_sink_buffer;
-    }
-
-    bool enable_local_exchange() const override { return true; }
-
-    // default batch size
-    int batsh_size = 4096;
-    bool _enable_shared_exchange_sink_buffer = true;
-    std::shared_ptr<MockContext> _mock_context = 
std::make_shared<MockContext>();
+inline TQueryOptions create_fake_query_options() {
+    TQueryOptions query_options;
+    query_options.query_type = TQueryType::EXTERNAL;
+    return query_options;
+}
+
+struct MockQueryContext : public QueryContext {
+    MockQueryContext()
+            : QueryContext(TUniqueId {}, ExecEnv::GetInstance(), 
create_fake_query_options(),
+                           TNetworkAddress {}, true, TNetworkAddress {},
+                           QuerySource::GROUP_COMMIT_LOAD) {}
 };
 
 } // namespace doris
diff --git a/be/test/testutil/mock/mock_runtime_state.h 
b/be/test/testutil/mock/mock_runtime_state.h
index 506fafc2105..b62a29c45eb 100644
--- a/be/test/testutil/mock/mock_runtime_state.h
+++ b/be/test/testutil/mock/mock_runtime_state.h
@@ -16,6 +16,7 @@
 // under the License.
 
 #pragma once
+#include "mock_query_context.h"
 #include "runtime/runtime_state.h"
 
 namespace doris {
@@ -24,7 +25,10 @@ class MockContext : public TaskExecutionContext {};
 
 class MockRuntimeState : public RuntimeState {
 public:
-    MockRuntimeState() { set_task_execution_context(_mock_context); };
+    MockRuntimeState() {
+        set_task_execution_context(_mock_context);
+        _query_ctx = _query_ctx_uptr.get();
+    }
 
     int batch_size() const override { return batsh_size; }
 
@@ -38,6 +42,7 @@ public:
     int batsh_size = 4096;
     bool _enable_shared_exchange_sink_buffer = true;
     std::shared_ptr<MockContext> _mock_context = 
std::make_shared<MockContext>();
+    std::unique_ptr<MockQueryContext> _query_ctx_uptr = 
std::make_unique<MockQueryContext>();
 };
 
 } // namespace doris


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to