This is an automated email from the ASF dual-hosted git repository. panxiaolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 21ef82206a4 [test](beut) add VDataStreamRecvr beut (#48188) 21ef82206a4 is described below commit 21ef82206a4c44685e7d360fe1bafce0fb3fb07b Author: Mryange <yanxuech...@selectdb.com> AuthorDate: Tue Feb 25 12:51:41 2025 +0800 [test](beut) add VDataStreamRecvr beut (#48188) ### What problem does this PR solve? add VDataStreamRecvr beut ### Release note None ### Check List (For Author) - Test <!-- At least one of them must be included. --> - [ ] Regression test - [x] Unit Test - [ ] Manual test (add detailed scripts or steps below) - [ ] No need to test or manual test. Explain why: - [ ] This is a refactor/code format and no logic has been changed. - [ ] Previous test can cover this change. - [ ] No code files have been changed. - [ ] Other reason <!-- Add your reason? --> - Behavior changed: - [x] No. - [ ] Yes. <!-- Explain the behavior change --> - Does this need documentation? - [x] No. - [ ] Yes. <!-- Add document PR link here. eg: https://github.com/apache/doris-website/pull/1214 --> ### Check List (For Reviewer who merge this PR) - [x] Confirm the release note - [x] Confirm test cases - [x] Confirm document - [x] Add branch pick label <!-- Add branch pick label that this PR should merge into --> --- be/src/pipeline/exec/exchange_source_operator.cpp | 4 +- be/src/vec/runtime/vdata_stream_mgr.cpp | 9 +- be/src/vec/runtime/vdata_stream_mgr.h | 5 +- be/src/vec/runtime/vdata_stream_recvr.cpp | 80 ++- be/src/vec/runtime/vdata_stream_recvr.h | 20 +- be/test/pipeline/exec/vdata_stream_recvr_test.cpp | 569 +++++++++++++++++++++ be/test/pipeline/pipeline_test.cpp | 5 +- .../{mock_runtime_state.h => mock_query_context.h} | 31 +- be/test/testutil/mock/mock_runtime_state.h | 7 +- 9 files changed, 643 insertions(+), 87 deletions(-) diff --git a/be/src/pipeline/exec/exchange_source_operator.cpp b/be/src/pipeline/exec/exchange_source_operator.cpp index bbec3d92703..2aefdf8cd3d 100644 --- a/be/src/pipeline/exec/exchange_source_operator.cpp +++ b/be/src/pipeline/exec/exchange_source_operator.cpp @@ -65,8 +65,8 @@ Status ExchangeLocalState::init(RuntimeState* state, LocalStateInfo& info) { SCOPED_TIMER(_init_timer); auto& p = _parent->cast<ExchangeSourceOperatorX>(); stream_recvr = state->exec_env()->vstream_mgr()->create_recvr( - state, _memory_used_counter, p.input_row_desc(), state->fragment_instance_id(), - p.node_id(), p.num_senders(), profile(), p.is_merging(), + state, _memory_used_counter, state->fragment_instance_id(), p.node_id(), + p.num_senders(), profile(), p.is_merging(), std::max(20480, config::exchg_node_buffer_size_bytes / (p.is_merging() ? p.num_senders() : 1))); const auto& queues = stream_recvr->sender_queues(); diff --git a/be/src/vec/runtime/vdata_stream_mgr.cpp b/be/src/vec/runtime/vdata_stream_mgr.cpp index 16d38ce2ea0..e31a2fbfea5 100644 --- a/be/src/vec/runtime/vdata_stream_mgr.cpp +++ b/be/src/vec/runtime/vdata_stream_mgr.cpp @@ -66,15 +66,14 @@ inline uint32_t VDataStreamMgr::get_hash_value(const TUniqueId& fragment_instanc std::shared_ptr<VDataStreamRecvr> VDataStreamMgr::create_recvr( RuntimeState* state, RuntimeProfile::HighWaterMarkCounter* memory_used_counter, - const RowDescriptor& row_desc, const TUniqueId& fragment_instance_id, - PlanNodeId dest_node_id, int num_senders, RuntimeProfile* profile, bool is_merging, - size_t data_queue_capacity) { + const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, int num_senders, + RuntimeProfile* profile, bool is_merging, size_t data_queue_capacity) { DCHECK(profile != nullptr); VLOG_FILE << "creating receiver for fragment=" << print_id(fragment_instance_id) << ", node=" << dest_node_id; std::shared_ptr<VDataStreamRecvr> recvr(new VDataStreamRecvr( - this, memory_used_counter, state, row_desc, fragment_instance_id, dest_node_id, - num_senders, is_merging, profile, data_queue_capacity)); + this, memory_used_counter, state, fragment_instance_id, dest_node_id, num_senders, + is_merging, profile, data_queue_capacity)); uint32_t hash_value = get_hash_value(fragment_instance_id, dest_node_id); std::unique_lock l(_lock); _fragment_stream_set.insert(std::make_pair(fragment_instance_id, dest_node_id)); diff --git a/be/src/vec/runtime/vdata_stream_mgr.h b/be/src/vec/runtime/vdata_stream_mgr.h index 9bf54e94cb8..f9b5bbe5bcd 100644 --- a/be/src/vec/runtime/vdata_stream_mgr.h +++ b/be/src/vec/runtime/vdata_stream_mgr.h @@ -56,9 +56,8 @@ public: std::shared_ptr<VDataStreamRecvr> create_recvr( RuntimeState* state, RuntimeProfile::HighWaterMarkCounter* memory_used_counter, - const RowDescriptor& row_desc, const TUniqueId& fragment_instance_id, - PlanNodeId dest_node_id, int num_senders, RuntimeProfile* profile, bool is_merging, - size_t data_queue_capacity); + const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, int num_senders, + RuntimeProfile* profile, bool is_merging, size_t data_queue_capacity); Status find_recvr(const TUniqueId& fragment_instance_id, PlanNodeId node_id, std::shared_ptr<VDataStreamRecvr>* res, bool acquire_lock = true); diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp b/be/src/vec/runtime/vdata_stream_recvr.cpp index 252104031e0..603270b7206 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.cpp +++ b/be/src/vec/runtime/vdata_stream_recvr.cpp @@ -44,7 +44,7 @@ namespace doris::vectorized { #include "common/compile_check_begin.h" VDataStreamRecvr::SenderQueue::SenderQueue( - VDataStreamRecvr* parent_recvr, int num_senders, RuntimeProfile* profile, + VDataStreamRecvr* parent_recvr, int num_senders, std::shared_ptr<pipeline::Dependency> local_channel_dependency) : _recvr(parent_recvr), _is_cancelled(false), @@ -80,7 +80,7 @@ Status VDataStreamRecvr::SenderQueue::get_batch(Block* block, bool* eos) { #endif BlockItem block_item; { - std::lock_guard<std::mutex> l(_lock); + INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> l(_lock)); //check and get block_item from data_queue if (_is_cancelled) { RETURN_IF_ERROR(_cancel_status); @@ -104,7 +104,7 @@ Status VDataStreamRecvr::SenderQueue::get_batch(Block* block, bool* eos) { COUNTER_UPDATE(_recvr->_decompress_timer, block->get_decompress_time()); COUNTER_UPDATE(_recvr->_decompress_bytes, block->get_decompressed_bytes()); _recvr->_memory_used_counter->update(-(int64_t)block_byte_size); - std::lock_guard<std::mutex> l(_lock); + INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> l(_lock)); sub_blocks_memory_usage(block_byte_size); _record_debug_info(); if (_block_queue.empty() && _source_dependency) { @@ -131,7 +131,9 @@ Status VDataStreamRecvr::SenderQueue::get_batch(Block* block, bool* eos) { return Status::OK(); } -void VDataStreamRecvr::SenderQueue::try_set_dep_ready_without_lock() { +void VDataStreamRecvr::SenderQueue::set_source_ready(std::lock_guard<std::mutex>&) { + // Here, it is necessary to check if _source_dependency is not nullptr. + // This is because the queue might be closed before setting the source dependency. if (!_source_dependency) { return; } @@ -147,7 +149,7 @@ Status VDataStreamRecvr::SenderQueue::add_block(std::unique_ptr<PBlock> pblock, const int64_t wait_for_worker, const uint64_t time_to_find_recvr) { { - std::lock_guard<std::mutex> l(_lock); + INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> l(_lock)); if (_is_cancelled) { return Status::OK(); } @@ -171,7 +173,7 @@ Status VDataStreamRecvr::SenderQueue::add_block(std::unique_ptr<PBlock> pblock, } } - std::lock_guard<std::mutex> l(_lock); + INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> l(_lock)); if (_is_cancelled) { return Status::OK(); } @@ -189,7 +191,7 @@ Status VDataStreamRecvr::SenderQueue::add_block(std::unique_ptr<PBlock> pblock, _block_queue.emplace_back(std::move(pblock), block_byte_size); COUNTER_UPDATE(_recvr->_remote_bytes_received_counter, block_byte_size); _record_debug_info(); - try_set_dep_ready_without_lock(); + set_source_ready(l); // if done is nullptr, this function can't delay this response if (done != nullptr && _recvr->exceeds_limit(block_byte_size)) { @@ -209,10 +211,14 @@ void VDataStreamRecvr::SenderQueue::add_block(Block* block, bool use_move) { return; } { - std::unique_lock<std::mutex> l(_lock); + INJECT_MOCK_SLEEP(std::unique_lock<std::mutex> l(_lock)); if (_is_cancelled) { return; } + DCHECK(_num_remaining_senders >= 0); + if (_num_remaining_senders == 0) { + return; + } } BlockUPtr nblock = Block::create_unique(block->get_columns_with_type_and_name()); @@ -230,13 +236,13 @@ void VDataStreamRecvr::SenderQueue::add_block(Block* block, bool use_move) { auto block_mem_size = nblock->allocated_bytes(); { - std::unique_lock<std::mutex> l(_lock); + INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> l(_lock)); if (_is_cancelled) { return; } _block_queue.emplace_back(std::move(nblock), block_mem_size); _record_debug_info(); - try_set_dep_ready_without_lock(); + set_source_ready(l); COUNTER_UPDATE(_recvr->_local_bytes_received_counter, block_mem_size); _recvr->_memory_used_counter->update(block_mem_size); add_blocks_memory_usage(block_mem_size); @@ -244,7 +250,7 @@ void VDataStreamRecvr::SenderQueue::add_block(Block* block, bool use_move) { } void VDataStreamRecvr::SenderQueue::decrement_senders(int be_number) { - std::lock_guard<std::mutex> l(_lock); + INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> l(_lock)); if (_sender_eos_set.end() != _sender_eos_set.find(be_number)) { return; } @@ -256,25 +262,25 @@ void VDataStreamRecvr::SenderQueue::decrement_senders(int be_number) { << print_id(_recvr->fragment_instance_id()) << " node_id=" << _recvr->dest_node_id() << " #senders=" << _num_remaining_senders; if (_num_remaining_senders == 0) { - try_set_dep_ready_without_lock(); + set_source_ready(l); } } void VDataStreamRecvr::SenderQueue::cancel(Status cancel_status) { { - std::lock_guard<std::mutex> l(_lock); + INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> l(_lock)); if (_is_cancelled) { return; } _is_cancelled = true; _cancel_status = cancel_status; - try_set_dep_ready_without_lock(); + set_source_ready(l); VLOG_QUERY << "cancelled stream: _fragment_instance_id=" << print_id(_recvr->fragment_instance_id()) << " node_id=" << _recvr->dest_node_id(); } { - std::lock_guard<std::mutex> l(_lock); + INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> l(_lock)); for (auto closure_pair : _pending_closures) { closure_pair.first->Run(); int64_t elapse_time = closure_pair.second.elapsed_time(); @@ -287,34 +293,30 @@ void VDataStreamRecvr::SenderQueue::cancel(Status cancel_status) { } void VDataStreamRecvr::SenderQueue::close() { - { - // If _is_cancelled is not set to true, there may be concurrent send - // which add batch to _block_queue. The batch added after _block_queue - // is clear will be memory leak - std::lock_guard<std::mutex> l(_lock); - _is_cancelled = true; - try_set_dep_ready_without_lock(); + // If _is_cancelled is not set to true, there may be concurrent send + // which add batch to _block_queue. The batch added after _block_queue + // is clear will be memory leak + INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> l(_lock)); + _is_cancelled = true; + set_source_ready(l); - for (auto closure_pair : _pending_closures) { - closure_pair.first->Run(); - int64_t elapse_time = closure_pair.second.elapsed_time(); - if (_recvr->_max_wait_to_process_time->value() < elapse_time) { - _recvr->_max_wait_to_process_time->set(elapse_time); - } + for (auto closure_pair : _pending_closures) { + closure_pair.first->Run(); + int64_t elapse_time = closure_pair.second.elapsed_time(); + if (_recvr->_max_wait_to_process_time->value() < elapse_time) { + _recvr->_max_wait_to_process_time->set(elapse_time); } - _pending_closures.clear(); } - + _pending_closures.clear(); // Delete any batches queued in _block_queue _block_queue.clear(); } VDataStreamRecvr::VDataStreamRecvr(VDataStreamMgr* stream_mgr, RuntimeProfile::HighWaterMarkCounter* memory_used_counter, - RuntimeState* state, const RowDescriptor& row_desc, - const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, - int num_senders, bool is_merging, RuntimeProfile* profile, - size_t data_queue_capacity) + RuntimeState* state, const TUniqueId& fragment_instance_id, + PlanNodeId dest_node_id, int num_senders, bool is_merging, + RuntimeProfile* profile, size_t data_queue_capacity) : HasTaskExecutionCtx(state), _mgr(stream_mgr), _memory_used_counter(memory_used_counter), @@ -322,7 +324,6 @@ VDataStreamRecvr::VDataStreamRecvr(VDataStreamMgr* stream_mgr, _query_context(state->get_query_ctx()->shared_from_this()), _fragment_instance_id(fragment_instance_id), _dest_node_id(dest_node_id), - _row_desc(row_desc), _is_merging(is_merging), _is_closed(false), _sender_queue_mem_limit(data_queue_capacity), @@ -344,7 +345,7 @@ VDataStreamRecvr::VDataStreamRecvr(VDataStreamMgr* stream_mgr, int num_sender_per_queue = is_merging ? 1 : num_senders; for (int i = 0; i < num_queues; ++i) { SenderQueue* queue = nullptr; - queue = _sender_queue_pool.add(new SenderQueue(this, num_sender_per_queue, profile, + queue = _sender_queue_pool.add(new SenderQueue(this, num_sender_per_queue, _sender_to_local_channel_dependency[i])); _sender_queues.push_back(queue); } @@ -495,11 +496,8 @@ void VDataStreamRecvr::close() { } void VDataStreamRecvr::set_sink_dep_always_ready() const { - for (auto* sender_queues : sender_queues()) { - auto dep = sender_queues->local_channel_dependency(); - if (dep) { - dep->set_always_ready(); - } + for (auto dep : _sender_to_local_channel_dependency) { + dep->set_always_ready(); } } diff --git a/be/src/vec/runtime/vdata_stream_recvr.h b/be/src/vec/runtime/vdata_stream_recvr.h index 0d444dbb397..85cfe08005a 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.h +++ b/be/src/vec/runtime/vdata_stream_recvr.h @@ -73,10 +73,9 @@ class VDataStreamRecvr : public HasTaskExecutionCtx { public: class SenderQueue; VDataStreamRecvr(VDataStreamMgr* stream_mgr, RuntimeProfile::HighWaterMarkCounter* counter, - RuntimeState* state, const RowDescriptor& row_desc, - const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, - int num_senders, bool is_merging, RuntimeProfile* profile, - size_t data_queue_capacity); + RuntimeState* state, const TUniqueId& fragment_instance_id, + PlanNodeId dest_node_id, int num_senders, bool is_merging, + RuntimeProfile* profile, size_t data_queue_capacity); ~VDataStreamRecvr() override; @@ -97,7 +96,6 @@ public: const TUniqueId& fragment_instance_id() const { return _fragment_instance_id; } PlanNodeId dest_node_id() const { return _dest_node_id; } - const RowDescriptor& row_desc() const { return _row_desc; } // Indicate that a particular sender is done. Delegated to the appropriate // sender queue. Called from DataStreamMgr. @@ -176,15 +174,11 @@ private: class VDataStreamRecvr::SenderQueue { public: - SenderQueue(VDataStreamRecvr* parent_recvr, int num_senders, RuntimeProfile* profile, + SenderQueue(VDataStreamRecvr* parent_recvr, int num_senders, std::shared_ptr<pipeline::Dependency> local_channel_dependency); ~SenderQueue(); - std::shared_ptr<pipeline::Dependency> local_channel_dependency() { - return _local_channel_dependency; - } - Status get_batch(Block* next_block, bool* eos); Status add_block(std::unique_ptr<PBlock> pblock, int be_number, int64_t packet_seq, @@ -203,15 +197,15 @@ public: _source_dependency = dependency; } +protected: void add_blocks_memory_usage(int64_t size); void sub_blocks_memory_usage(int64_t size); bool exceeds_limit(); - -protected: friend class pipeline::ExchangeLocalState; - void try_set_dep_ready_without_lock(); + + void set_source_ready(std::lock_guard<std::mutex>&); // To record information about several variables in the event of a DCHECK failure. // DCHECK(_is_cancelled || !_block_queue.empty() || _num_remaining_senders == 0) diff --git a/be/test/pipeline/exec/vdata_stream_recvr_test.cpp b/be/test/pipeline/exec/vdata_stream_recvr_test.cpp new file mode 100644 index 00000000000..636772361c9 --- /dev/null +++ b/be/test/pipeline/exec/vdata_stream_recvr_test.cpp @@ -0,0 +1,569 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/runtime/vdata_stream_recvr.h" + +#include <gtest/gtest.h> + +#include <memory> +#include <thread> +#include <vector> + +#include "pipeline/dependency.h" +#include "pipeline/exec/multi_cast_data_streamer.h" +#include "testutil/column_helper.h" +#include "testutil/mock/mock_runtime_state.h" +#include "vec/data_types/data_type_number.h" + +namespace doris::pipeline { +using namespace vectorized; + +struct MockVDataStreamRecvr : public VDataStreamRecvr { + MockVDataStreamRecvr(RuntimeState* state, RuntimeProfile::HighWaterMarkCounter* counter, + RuntimeProfile* profile, int num_senders, bool is_merging) + : VDataStreamRecvr(nullptr, counter, state, TUniqueId(), 0, num_senders, is_merging, + profile, 1) {}; +}; + +class DataStreamRecvrTest : public testing::Test { +public: + DataStreamRecvrTest() = default; + ~DataStreamRecvrTest() override = default; + void SetUp() override {} + + void create_recvr(int num_senders, bool is_merging) { + _mock_counter = + std::make_unique<RuntimeProfile::HighWaterMarkCounter>(TUnit::UNIT, 0, "test"); + _mock_state = std::make_unique<MockRuntimeState>(); + _mock_profile = std::make_unique<RuntimeProfile>("test"); + recvr = std::make_unique<MockVDataStreamRecvr>(_mock_state.get(), _mock_counter.get(), + _mock_profile.get(), num_senders, + is_merging); + } + + std::unique_ptr<MockVDataStreamRecvr> recvr; + + std::unique_ptr<RuntimeProfile::HighWaterMarkCounter> _mock_counter; + + std::unique_ptr<MockRuntimeState> _mock_state; + + std::unique_ptr<RuntimeProfile> _mock_profile; +}; + +TEST_F(DataStreamRecvrTest, TestCreateSenderQueue) { + { + create_recvr(3, false); + EXPECT_EQ(recvr->sender_queues().size(), 1); + EXPECT_EQ(recvr->sender_queues().back()->_num_remaining_senders, 3); + } + + { + create_recvr(3, true); + EXPECT_EQ(recvr->sender_queues().size(), 3); + for (auto& queue : recvr->sender_queues()) { + EXPECT_EQ(queue->_num_remaining_senders, 1); + } + } +} + +TEST_F(DataStreamRecvrTest, TestSender) { + create_recvr(3, false); + EXPECT_EQ(recvr->sender_queues().size(), 1); + EXPECT_EQ(recvr->sender_queues().back()->_num_remaining_senders, 3); + + auto* sender = recvr->sender_queues().back(); + + auto sink_dep = sender->_local_channel_dependency; + auto source_dep = std::make_shared<Dependency>(0, 0, "test", false); + sender->set_dependency(source_dep); + + EXPECT_EQ(sink_dep->ready(), true); + EXPECT_EQ(source_dep->ready(), false); + + EXPECT_EQ(sender->_num_remaining_senders, 3); + EXPECT_EQ(sender->_block_queue.size(), 0); + + { + auto block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4, 5}); + sender->add_block(&block, false); + } + + EXPECT_EQ(sink_dep->ready(), false); + + { + EXPECT_EQ(sender->_block_queue.size(), 1); + Block block; + bool eos = false; + auto st = sender->get_batch(&block, &eos); + EXPECT_TRUE(st) << st.msg(); + EXPECT_TRUE(ColumnHelper::block_equal( + block, ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4, 5}))); + EXPECT_FALSE(eos); + } + + { + sender->decrement_senders(1); + EXPECT_EQ(sender->_num_remaining_senders, 2); + sender->decrement_senders(2); + EXPECT_EQ(sender->_num_remaining_senders, 1); + sender->decrement_senders(3); + EXPECT_EQ(sender->_num_remaining_senders, 0); + + EXPECT_EQ(sender->_block_queue.size(), 0); + auto block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4, 5}); + sender->add_block(&block, false); + EXPECT_EQ(sender->_block_queue.size(), 0); + } + + { + EXPECT_EQ(sender->_block_queue.size(), 0); + Block block; + bool eos = false; + auto st = sender->get_batch(&block, &eos); + EXPECT_TRUE(st) << st.msg(); + EXPECT_TRUE(eos); + } + + sender->close(); +} + +TEST_F(DataStreamRecvrTest, TestSenderClose) { + create_recvr(3, false); + EXPECT_EQ(recvr->sender_queues().size(), 1); + EXPECT_EQ(recvr->sender_queues().back()->_num_remaining_senders, 3); + + auto* sender = recvr->sender_queues().back(); + + auto sink_dep = sender->_local_channel_dependency; + sender->close(); +} + +TEST_F(DataStreamRecvrTest, TestRandomSender) { + create_recvr(3, false); + EXPECT_EQ(recvr->sender_queues().size(), 1); + EXPECT_EQ(recvr->sender_queues().back()->_num_remaining_senders, 3); + + auto* sender = recvr->sender_queues().back(); + + auto sink_dep = sender->_local_channel_dependency; + auto source_dep = std::make_shared<Dependency>(0, 0, "test", false); + sender->set_dependency(source_dep); + + EXPECT_EQ(sink_dep->ready(), true); + EXPECT_EQ(source_dep->ready(), false); + + auto input_func = [&](int id) { + mock_random_sleep(); + int input_block = 0; + while (true) { + if (sink_dep->ready()) { + auto block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4, 5}); + sender->add_block(&block, false); + input_block++; + if (input_block == 100) { + sender->decrement_senders(id); + break; + } + } + } + }; + + auto outout_func = [&]() { + mock_random_sleep(); + int output_block = 0; + while (true) { + if (source_dep->ready()) { + Block block; + bool eos = false; + auto st = sender->get_batch(&block, &eos); + EXPECT_TRUE(st) << st.msg(); + if (!block.empty()) { + output_block++; + } + if (eos) { + EXPECT_EQ(output_block, 3 * 100); + break; + } + } + } + }; + + std::thread input1(input_func, 1); + std::thread input2(input_func, 2); + std::thread input3(input_func, 3); + std::thread output(outout_func); + + input1.join(); + input2.join(); + input3.join(); + output.join(); +} + +TEST_F(DataStreamRecvrTest, TestRandomCloseSender) { + create_recvr(3, false); + EXPECT_EQ(recvr->sender_queues().size(), 1); + EXPECT_EQ(recvr->sender_queues().back()->_num_remaining_senders, 3); + + auto* sender = recvr->sender_queues().back(); + + auto sink_dep = sender->_local_channel_dependency; + auto source_dep = std::make_shared<Dependency>(0, 0, "test", false); + sender->set_dependency(source_dep); + + EXPECT_EQ(sink_dep->ready(), true); + EXPECT_EQ(source_dep->ready(), false); + + auto input_func = [&](int id) { + mock_random_sleep(); + int input_block = 0; + while (true) { + if (sink_dep->ready()) { + auto block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4, 5}); + sender->add_block(&block, false); + input_block++; + if (input_block == 100) { + sender->decrement_senders(id); + break; + } + } + } + std::cout << "input func : " << id << " end " << std::endl; + }; + + auto outout_func = [&]() { + mock_random_sleep(); + try { + while (true) { + if (source_dep->ready()) { + Block block; + bool eos = false; + auto st = sender->get_batch(&block, &eos); + + if (!st.ok()) { + std::cout << "get_batch error: " << st.msg() << std::endl; + break; + } + if (eos) { + break; + } + } + } + } catch (std::exception& e) { + std::cout << "exception: " << e.what() << std::endl; + } + std::cout << "output func end" << std::endl; + }; + + auto close_func = [&]() { + try { + mock_random_sleep(); + std::cout << "close_func start" << std::endl; + recvr->close(); + std::cout << "close_func end" << std::endl; + } catch (const std::exception& e) { + std::cout << "close exception: " << e.what() << std::endl; + } + }; + + std::vector<std::thread> threads; + threads.emplace_back(input_func, 1); + threads.emplace_back(input_func, 2); + threads.emplace_back(input_func, 3); + threads.emplace_back(outout_func); + threads.emplace_back(close_func); + + for (auto& t : threads) { + if (t.joinable()) { + try { + t.join(); + } catch (const std::system_error& e) { + std::cout << "Thread join error: " << e.what() << std::endl; + } + } + } +} + +class MockClosure : public google::protobuf::Closure { + MockClosure() = default; + + ~MockClosure() override = default; + + void Run() override { _cb(); } + std::function<void()> _cb; +}; + +void to_pblock(Block& block, PBlock* pblock) { + size_t uncompressed_bytes = 0; + size_t compressed_bytes = 0; + EXPECT_TRUE(block.serialize(BeExecVersionManager::get_newest_version(), pblock, + &uncompressed_bytes, &compressed_bytes, + segment_v2::CompressionTypePB::NO_COMPRESSION)); +} + +TEST_F(DataStreamRecvrTest, TestRemoteSender) { + create_recvr(3, false); + EXPECT_EQ(recvr->sender_queues().size(), 1); + EXPECT_EQ(recvr->sender_queues().back()->_num_remaining_senders, 3); + + auto* sender = recvr->sender_queues().back(); + + auto sink_dep = sender->_local_channel_dependency; + auto source_dep = std::make_shared<Dependency>(0, 0, "test", false); + sender->set_dependency(source_dep); + + EXPECT_EQ(sink_dep->ready(), true); + EXPECT_EQ(source_dep->ready(), false); + + EXPECT_EQ(sender->_num_remaining_senders, 3); + EXPECT_EQ(sender->_block_queue.size(), 0); + + { + auto block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4, 5}); + auto pblock = std::make_unique<PBlock>(); + to_pblock(block, pblock.get()); + MockClosure closure; + closure._cb = [&]() { std::cout << "cb" << std::endl; }; + google::protobuf::Closure* done = &closure; + auto st = sender->add_block(std::move(pblock), 1, 1, &done, 0, 0); + if (done != nullptr) { + done->Run(); + } + } + sender->close(); +} + +TEST_F(DataStreamRecvrTest, TestRemoteMemLimitSender) { + create_recvr(3, false); + EXPECT_EQ(recvr->sender_queues().size(), 1); + EXPECT_EQ(recvr->sender_queues().back()->_num_remaining_senders, 3); + + auto* sender = recvr->sender_queues().back(); + + auto sink_dep = sender->_local_channel_dependency; + auto source_dep = std::make_shared<Dependency>(0, 0, "test", false); + sender->set_dependency(source_dep); + + EXPECT_EQ(sink_dep->ready(), true); + EXPECT_EQ(source_dep->ready(), false); + + EXPECT_EQ(sender->_num_remaining_senders, 3); + EXPECT_EQ(sender->_block_queue.size(), 0); + + config::exchg_node_buffer_size_bytes = 1; + + Defer set_([&]() { config::exchg_node_buffer_size_bytes = 20485760; }); + + { + auto block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4, 5}); + auto pblock = std::make_unique<PBlock>(); + to_pblock(block, pblock.get()); + MockClosure closure; + bool flag = false; + closure._cb = [&]() { + std::cout << "cb" << std::endl; + EXPECT_TRUE(flag); + }; + google::protobuf::Closure* done = &closure; + auto st = sender->add_block(std::move(pblock), 1, 1, &done, 0, 0); + EXPECT_EQ(done, nullptr); + flag = true; + + { + Block block; + bool eos = false; + auto st = sender->get_batch(&block, &eos); + } + } + sender->close(); +} + +TEST_F(DataStreamRecvrTest, TestRemoteMultiSender) { + create_recvr(3, false); + EXPECT_EQ(recvr->sender_queues().size(), 1); + EXPECT_EQ(recvr->sender_queues().back()->_num_remaining_senders, 3); + + auto* sender = recvr->sender_queues().back(); + + auto sink_dep = sender->_local_channel_dependency; + auto source_dep = std::make_shared<Dependency>(0, 0, "test", false); + sender->set_dependency(source_dep); + + EXPECT_EQ(sink_dep->ready(), true); + EXPECT_EQ(source_dep->ready(), false); + + std::vector<std::shared_ptr<MockClosure>> closures {10}; + + for (auto i = 0; i < 10; i++) { + closures[i] = std::make_shared<MockClosure>(); + } + + auto input_func = [&](int id) { + mock_random_sleep(); + int input_block = 0; + auto closure = closures[id]; + std::atomic_bool cb_flag = true; + closure->_cb = [&]() { cb_flag = true; }; + while (true) { + if (sink_dep->ready() && cb_flag) { + auto block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4, 5}); + auto pblock = std::make_unique<PBlock>(); + to_pblock(block, pblock.get()); + google::protobuf::Closure* done = closure.get(); + cb_flag = false; + auto st = sender->add_block(std::move(pblock), id, input_block, &done, 0, 0); + EXPECT_TRUE(st) << st.msg(); + input_block++; + if (done != nullptr) { + done->Run(); + } + if (input_block == 100) { + sender->decrement_senders(id); + break; + } + } + } + std::cout << "input func : " << id << " end " + << "input_block : " << input_block << std::endl; + }; + + auto outout_func = [&]() { + mock_random_sleep(); + int output_block = 0; + while (true) { + if (source_dep->ready()) { + Block block; + bool eos = false; + auto st = sender->get_batch(&block, &eos); + EXPECT_TRUE(st) << st.msg(); + if (!block.empty()) { + output_block++; + } + if (eos) { + EXPECT_EQ(output_block, 3 * 100); + break; + } + } + } + }; + + std::thread input1(input_func, 1); + std::thread input2(input_func, 2); + std::thread input3(input_func, 3); + std::thread output(outout_func); + + input1.join(); + input2.join(); + input3.join(); + output.join(); +} + +TEST_F(DataStreamRecvrTest, TestRemoteLocalMultiSender) { + create_recvr(3, false); + EXPECT_EQ(recvr->sender_queues().size(), 1); + EXPECT_EQ(recvr->sender_queues().back()->_num_remaining_senders, 3); + + auto* sender = recvr->sender_queues().back(); + + auto sink_dep = sender->_local_channel_dependency; + auto source_dep = std::make_shared<Dependency>(0, 0, "test", false); + sender->set_dependency(source_dep); + + EXPECT_EQ(sink_dep->ready(), true); + EXPECT_EQ(source_dep->ready(), false); + + std::vector<std::shared_ptr<MockClosure>> closures {10}; + + for (auto i = 0; i < 10; i++) { + closures[i] = std::make_shared<MockClosure>(); + } + + auto input_remote_func = [&](int id) { + mock_random_sleep(); + int input_block = 0; + auto closure = closures[id]; + std::atomic_bool cb_flag = true; + closure->_cb = [&]() { cb_flag = true; }; + while (true) { + if (sink_dep->ready() && cb_flag) { + auto block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4, 5}); + auto pblock = std::make_unique<PBlock>(); + to_pblock(block, pblock.get()); + google::protobuf::Closure* done = closure.get(); + cb_flag = false; + auto st = sender->add_block(std::move(pblock), id, input_block, &done, 0, 0); + EXPECT_TRUE(st) << st.msg(); + input_block++; + if (done != nullptr) { + done->Run(); + } + if (input_block == 100) { + sender->decrement_senders(id); + break; + } + } + } + }; + + auto input_local_func = [&](int id) { + mock_random_sleep(); + int input_block = 0; + while (true) { + if (sink_dep->ready()) { + auto block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4, 5}); + sender->add_block(&block, false); + input_block++; + if (input_block == 100) { + sender->decrement_senders(id); + break; + } + } + } + }; + + auto outout_func = [&]() { + mock_random_sleep(); + int output_block = 0; + while (true) { + if (source_dep->ready()) { + Block block; + bool eos = false; + auto st = sender->get_batch(&block, &eos); + EXPECT_TRUE(st) << st.msg(); + if (!block.empty()) { + output_block++; + } + if (eos) { + EXPECT_EQ(output_block, 3 * 100); + break; + } + } + } + }; + + std::thread input1(input_remote_func, 1); + std::thread input2(input_local_func, 2); + std::thread input3(input_remote_func, 3); + std::thread output(outout_func); + + input1.join(); + input2.join(); + input3.join(); + output.join(); +} +// ./run-be-ut.sh --run --filter=DataStreamRecvrTest.* + +} // namespace doris::pipeline diff --git a/be/test/pipeline/pipeline_test.cpp b/be/test/pipeline/pipeline_test.cpp index 1c76f2d3c9c..b915dba0fa9 100644 --- a/be/test/pipeline/pipeline_test.cpp +++ b/be/test/pipeline/pipeline_test.cpp @@ -309,7 +309,7 @@ TEST_F(PipelineTest, HAPPY_PATH) { EXPECT_GT(block_mem_usage - 1, 0); auto downstream_recvr = ExecEnv::GetInstance()->_vstream_mgr->create_recvr( - downstream_runtime_state.get(), memory_used_counter, op->row_desc(), dest0, 1, 1, + downstream_runtime_state.get(), memory_used_counter, dest0, 1, 1, downstream_pipeline_profile.get(), false, block_mem_usage - 1); std::vector<TScanRangeParams> scan_ranges; EXPECT_EQ(_pipeline_tasks[cur_pipe->id()].back()->prepare(scan_ranges, 0, tsink, @@ -991,8 +991,7 @@ TEST_F(PipelineTest, PLAN_HASH_JOIN) { auto* memory_used_counter = downstream_pipeline_profile->AddHighWaterMarkCounter( "MemoryUsage", TUnit::BYTES, "", 1); downstream_recvr = ExecEnv::GetInstance()->_vstream_mgr->create_recvr( - downstream_runtime_state.get(), memory_used_counter, - _pipelines.front()->operators().back()->row_desc(), dest_ins_id, dest_node_id, + downstream_runtime_state.get(), memory_used_counter, dest_ins_id, dest_node_id, parallelism, downstream_pipeline_profile.get(), false, 2048000); } for (size_t i = 0; i < _pipelines.size(); i++) { diff --git a/be/test/testutil/mock/mock_runtime_state.h b/be/test/testutil/mock/mock_query_context.h similarity index 56% copy from be/test/testutil/mock/mock_runtime_state.h copy to be/test/testutil/mock/mock_query_context.h index 506fafc2105..bd6b47483e4 100644 --- a/be/test/testutil/mock/mock_runtime_state.h +++ b/be/test/testutil/mock/mock_query_context.h @@ -16,28 +16,21 @@ // under the License. #pragma once -#include "runtime/runtime_state.h" +#include "runtime/query_context.h" namespace doris { -class MockContext : public TaskExecutionContext {}; - -class MockRuntimeState : public RuntimeState { -public: - MockRuntimeState() { set_task_execution_context(_mock_context); }; - - int batch_size() const override { return batsh_size; } - - bool enable_shared_exchange_sink_buffer() const override { - return _enable_shared_exchange_sink_buffer; - } - - bool enable_local_exchange() const override { return true; } - - // default batch size - int batsh_size = 4096; - bool _enable_shared_exchange_sink_buffer = true; - std::shared_ptr<MockContext> _mock_context = std::make_shared<MockContext>(); +inline TQueryOptions create_fake_query_options() { + TQueryOptions query_options; + query_options.query_type = TQueryType::EXTERNAL; + return query_options; +} + +struct MockQueryContext : public QueryContext { + MockQueryContext() + : QueryContext(TUniqueId {}, ExecEnv::GetInstance(), create_fake_query_options(), + TNetworkAddress {}, true, TNetworkAddress {}, + QuerySource::GROUP_COMMIT_LOAD) {} }; } // namespace doris diff --git a/be/test/testutil/mock/mock_runtime_state.h b/be/test/testutil/mock/mock_runtime_state.h index 506fafc2105..b62a29c45eb 100644 --- a/be/test/testutil/mock/mock_runtime_state.h +++ b/be/test/testutil/mock/mock_runtime_state.h @@ -16,6 +16,7 @@ // under the License. #pragma once +#include "mock_query_context.h" #include "runtime/runtime_state.h" namespace doris { @@ -24,7 +25,10 @@ class MockContext : public TaskExecutionContext {}; class MockRuntimeState : public RuntimeState { public: - MockRuntimeState() { set_task_execution_context(_mock_context); }; + MockRuntimeState() { + set_task_execution_context(_mock_context); + _query_ctx = _query_ctx_uptr.get(); + } int batch_size() const override { return batsh_size; } @@ -38,6 +42,7 @@ public: int batsh_size = 4096; bool _enable_shared_exchange_sink_buffer = true; std::shared_ptr<MockContext> _mock_context = std::make_shared<MockContext>(); + std::unique_ptr<MockQueryContext> _query_ctx_uptr = std::make_unique<MockQueryContext>(); }; } // namespace doris --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org