This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 083af74c255 [test](ut) add cases about partitioned aggregation operators (#48925) 083af74c255 is described below commit 083af74c255b12e90749574e6b76b7958bc7dcdd Author: Jerry Hu <hushengg...@selectdb.com> AuthorDate: Mon Mar 17 18:49:16 2025 +0800 [test](ut) add cases about partitioned aggregation operators (#48925) --- be/src/pipeline/exec/aggregation_source_operator.h | 3 +- .../exec/partitioned_aggregation_sink_operator.cpp | 7 +- .../exec/partitioned_aggregation_source_operator.h | 3 +- .../partitioned_aggregation_sink_operator_test.cpp | 467 ++++++++++++++++++++ ...artitioned_aggregation_source_operator_test.cpp | 470 +++++++++++++++++++++ .../partitioned_aggregation_test_helper.cpp | 238 +++++++++++ .../operator/partitioned_aggregation_test_helper.h | 155 +++++++ .../partitioned_hash_join_probe_operator_test.cpp | 32 +- .../partitioned_hash_join_sink_operator_test.cpp | 8 - .../operator/partitioned_hash_join_test_helper.cpp | 76 ++-- .../operator/partitioned_hash_join_test_helper.h | 83 +--- .../operator/spillable_operator_test_helper.cpp | 76 ++++ .../operator/spillable_operator_test_helper.h | 132 ++++++ be/test/testutil/creators.h | 44 +- 14 files changed, 1606 insertions(+), 188 deletions(-) diff --git a/be/src/pipeline/exec/aggregation_source_operator.h b/be/src/pipeline/exec/aggregation_source_operator.h index f18e9345b44..4d29cfb603d 100644 --- a/be/src/pipeline/exec/aggregation_source_operator.h +++ b/be/src/pipeline/exec/aggregation_source_operator.h @@ -18,6 +18,7 @@ #include <stdint.h> +#include "common/be_mock_util.h" #include "common/status.h" #include "operator.h" @@ -28,7 +29,7 @@ namespace pipeline { #include "common/compile_check_begin.h" class AggSourceOperatorX; -class AggLocalState final : public PipelineXLocalState<AggSharedState> { +class AggLocalState MOCK_REMOVE(final) : public PipelineXLocalState<AggSharedState> { public: using Base = PipelineXLocalState<AggSharedState>; ENABLE_FACTORY_CREATOR(AggLocalState); diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp index e003ea23240..521da54f21f 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp @@ -130,8 +130,6 @@ Status PartitionedAggSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* s RETURN_IF_ERROR(DataSinkOperatorX<PartitionedAggSinkLocalState>::init(tnode, state)); _name = "PARTITIONED_AGGREGATION_SINK_OPERATOR"; _spill_partition_count = state->spill_aggregation_partition_count(); - RETURN_IF_ERROR( - _agg_sink_operator->set_child(DataSinkOperatorX<PartitionedAggSinkLocalState>::_child)); return _agg_sink_operator->init(tnode, state); } @@ -255,10 +253,6 @@ Status PartitionedAggSinkLocalState::revoke_memory( update_profile<true>(sink_local_state->profile()); } - // TODO: spill thread may set_ready before the task::execute thread put the task to blocked state - if (!_eos) { - Base::_spill_dependency->Dependency::block(); - } auto& parent = Base::_parent->template cast<Parent>(); Status status; Defer defer {[&]() { @@ -331,6 +325,7 @@ Status PartitionedAggSinkLocalState::revoke_memory( return status; }); + Base::_spill_dependency->Dependency::block(); return ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->submit( std::move(spill_runnable)); } diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.h b/be/src/pipeline/exec/partitioned_aggregation_source_operator.h index 24e56df1be8..c7d4b21af56 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.h +++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.h @@ -30,7 +30,8 @@ namespace pipeline { class PartitionedAggSourceOperatorX; class PartitionedAggLocalState; -class PartitionedAggLocalState final : public PipelineXSpillLocalState<PartitionedAggSharedState> { +class PartitionedAggLocalState MOCK_REMOVE(final) + : public PipelineXSpillLocalState<PartitionedAggSharedState> { public: ENABLE_FACTORY_CREATOR(PartitionedAggLocalState); using Base = PipelineXSpillLocalState<PartitionedAggSharedState>; diff --git a/be/test/pipeline/operator/partitioned_aggregation_sink_operator_test.cpp b/be/test/pipeline/operator/partitioned_aggregation_sink_operator_test.cpp new file mode 100644 index 00000000000..930eba76fd6 --- /dev/null +++ b/be/test/pipeline/operator/partitioned_aggregation_sink_operator_test.cpp @@ -0,0 +1,467 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "pipeline/exec/partitioned_aggregation_sink_operator.h" + +#include <gtest/gtest.h> + +#include <memory> + +#include "common/config.h" +#include "partitioned_aggregation_test_helper.h" +#include "pipeline/exec/aggregation_sink_operator.h" +#include "pipeline/exec/partitioned_hash_join_probe_operator.h" +#include "pipeline/exec/partitioned_hash_join_sink_operator.h" +#include "pipeline/pipeline_task.h" +#include "runtime/fragment_mgr.h" +#include "testutil/column_helper.h" +#include "testutil/mock/mock_runtime_state.h" +#include "util/runtime_profile.h" +#include "vec/core/block.h" +#include "vec/data_types/data_type_number.h" +#include "vec/spill/spill_stream_manager.h" + +namespace doris::pipeline { +class PartitionedAggregationSinkOperatorTest : public testing::Test { +protected: + void SetUp() override { _helper.SetUp(); } + void TearDown() override { _helper.TearDown(); } + PartitionedAggregationTestHelper _helper; +}; + +TEST_F(PartitionedAggregationSinkOperatorTest, Init) { + auto [source_operator, sink_operator] = _helper.create_operators(); + ASSERT_TRUE(source_operator != nullptr); + ASSERT_TRUE(sink_operator != nullptr); + + const auto tnode = _helper.create_test_plan_node(); + auto st = sink_operator->init(tnode, _helper.runtime_state.get()); + ASSERT_TRUE(st.ok()) << "init failed: " << st.to_string(); + + st = sink_operator->prepare(_helper.runtime_state.get()); + ASSERT_TRUE(st.ok()) << "prepare failed: " << st.to_string(); + + std::shared_ptr<MockPartitionedAggSharedState> shared_state = + MockPartitionedAggSharedState::create_shared(); + + LocalSinkStateInfo info {.task_idx = 0, + .parent_profile = _helper.runtime_profile.get(), + .sender_id = 0, + .shared_state = shared_state.get(), + .le_state_map = {}, + .tsink = TDataSink()}; + st = sink_operator->setup_local_state(_helper.runtime_state.get(), info); + ASSERT_TRUE(st.ok()) << "setup_local_state failed: " << st.to_string(); + + auto* local_state = _helper.runtime_state->get_sink_local_state(); + ASSERT_TRUE(local_state != nullptr); + + st = local_state->open(_helper.runtime_state.get()); + ASSERT_TRUE(st.ok()) << "open failed: " << st.to_string(); + + st = sink_operator->close(_helper.runtime_state.get(), st); + ASSERT_TRUE(st.ok()) << "close failed: " << st.to_string(); + + st = local_state->close(_helper.runtime_state.get(), st); + ASSERT_TRUE(st.ok()) << "close failed: " << st.to_string(); +} + +TEST_F(PartitionedAggregationSinkOperatorTest, Sink) { + auto [source_operator, sink_operator] = _helper.create_operators(); + ASSERT_TRUE(source_operator != nullptr); + ASSERT_TRUE(sink_operator != nullptr); + + const auto tnode = _helper.create_test_plan_node(); + auto st = sink_operator->init(tnode, _helper.runtime_state.get()); + ASSERT_TRUE(st.ok()) << "init failed: " << st.to_string(); + + st = sink_operator->prepare(_helper.runtime_state.get()); + ASSERT_TRUE(st.ok()) << "prepare failed: " << st.to_string(); + + auto shared_state = sink_operator->create_shared_state(); + auto* dep = shared_state->create_source_dependency(source_operator->operator_id(), + source_operator->node_id(), + "PartitionedAggSinkTestDep"); + + LocalSinkStateInfo info {.task_idx = 0, + .parent_profile = _helper.runtime_profile.get(), + .sender_id = 0, + .shared_state = shared_state.get(), + .le_state_map = {}, + .tsink = TDataSink()}; + st = sink_operator->setup_local_state(_helper.runtime_state.get(), info); + ASSERT_TRUE(st.ok()) << "setup_local_state failed: " << st.to_string(); + + auto* local_state = _helper.runtime_state->get_sink_local_state(); + ASSERT_TRUE(local_state != nullptr); + + st = local_state->open(_helper.runtime_state.get()); + ASSERT_TRUE(st.ok()) << "open failed: " << st.to_string(); + + auto block = vectorized::ColumnHelper::create_block<vectorized::DataTypeInt32>( + {1, 2, 3, 4, 2, 3, 4, 3, 4, 4}); + + block.insert(vectorized::ColumnHelper::create_column_with_name<vectorized::DataTypeInt32>( + {1, 2, 3, 4, 2, 3, 4, 3, 4, 4})); + + ASSERT_GT(sink_operator->get_reserve_mem_size(_helper.runtime_state.get(), false), 0); + st = sink_operator->sink(_helper.runtime_state.get(), &block, false); + ASSERT_TRUE(st.ok()) << "sink failed: " << st.to_string(); + + ASSERT_GT(sink_operator->get_reserve_mem_size(_helper.runtime_state.get(), true), 0); + st = sink_operator->sink(_helper.runtime_state.get(), &block, true); + ASSERT_TRUE(st.ok()) << "sink failed: " << st.to_string(); + ASSERT_TRUE(dep->is_blocked_by(nullptr) == nullptr); + + st = sink_operator->close(_helper.runtime_state.get(), st); + ASSERT_TRUE(st.ok()) << "close failed: " << st.to_string(); +} + +TEST_F(PartitionedAggregationSinkOperatorTest, SinkWithEmptyEOS) { + auto [source_operator, sink_operator] = _helper.create_operators(); + ASSERT_TRUE(source_operator != nullptr); + ASSERT_TRUE(sink_operator != nullptr); + + const auto tnode = _helper.create_test_plan_node(); + auto st = sink_operator->init(tnode, _helper.runtime_state.get()); + ASSERT_TRUE(st.ok()) << "init failed: " << st.to_string(); + + st = sink_operator->prepare(_helper.runtime_state.get()); + ASSERT_TRUE(st.ok()) << "prepare failed: " << st.to_string(); + + auto shared_state = sink_operator->create_shared_state(); + auto* dep = shared_state->create_source_dependency(source_operator->operator_id(), + source_operator->node_id(), + "PartitionedAggSinkTestDep"); + + LocalSinkStateInfo info {.task_idx = 0, + .parent_profile = _helper.runtime_profile.get(), + .sender_id = 0, + .shared_state = shared_state.get(), + .le_state_map = {}, + .tsink = TDataSink()}; + st = sink_operator->setup_local_state(_helper.runtime_state.get(), info); + ASSERT_TRUE(st.ok()) << "setup_local_state failed: " << st.to_string(); + + auto* local_state = _helper.runtime_state->get_sink_local_state(); + ASSERT_TRUE(local_state != nullptr); + + st = local_state->open(_helper.runtime_state.get()); + ASSERT_TRUE(st.ok()) << "open failed: " << st.to_string(); + + auto block = vectorized::ColumnHelper::create_block<vectorized::DataTypeInt32>( + {1, 2, 3, 4, 2, 3, 4, 3, 4, 4}); + + block.insert(vectorized::ColumnHelper::create_column_with_name<vectorized::DataTypeInt32>( + {1, 2, 3, 4, 2, 3, 4, 3, 4, 4})); + + ASSERT_GT(sink_operator->get_reserve_mem_size(_helper.runtime_state.get(), false), 0); + st = sink_operator->sink(_helper.runtime_state.get(), &block, false); + ASSERT_TRUE(st.ok()) << "sink failed: " << st.to_string(); + + ASSERT_GT(sink_operator->get_reserve_mem_size(_helper.runtime_state.get(), true), 0); + block.clear_column_data(); + st = sink_operator->sink(_helper.runtime_state.get(), &block, true); + ASSERT_TRUE(st.ok()) << "sink failed: " << st.to_string(); + ASSERT_TRUE(dep->is_blocked_by(nullptr) == nullptr); + + st = sink_operator->close(_helper.runtime_state.get(), st); + ASSERT_TRUE(st.ok()) << "close failed: " << st.to_string(); +} + +TEST_F(PartitionedAggregationSinkOperatorTest, SinkWithSpill) { + auto [source_operator, sink_operator] = _helper.create_operators(); + ASSERT_TRUE(source_operator != nullptr); + ASSERT_TRUE(sink_operator != nullptr); + + const auto tnode = _helper.create_test_plan_node(); + auto st = sink_operator->init(tnode, _helper.runtime_state.get()); + ASSERT_TRUE(st.ok()) << "init failed: " << st.to_string(); + + st = sink_operator->prepare(_helper.runtime_state.get()); + ASSERT_TRUE(st.ok()) << "prepare failed: " << st.to_string(); + + auto shared_state = sink_operator->create_shared_state(); + auto* dep = shared_state->create_source_dependency(source_operator->operator_id(), + source_operator->node_id(), + "PartitionedAggSinkTestDep"); + + LocalSinkStateInfo info {.task_idx = 0, + .parent_profile = _helper.runtime_profile.get(), + .sender_id = 0, + .shared_state = shared_state.get(), + .le_state_map = {}, + .tsink = TDataSink()}; + st = sink_operator->setup_local_state(_helper.runtime_state.get(), info); + ASSERT_TRUE(st.ok()) << "setup_local_state failed: " << st.to_string(); + + auto* local_state = reinterpret_cast<PartitionedAggSinkLocalState*>( + _helper.runtime_state->get_sink_local_state()); + ASSERT_TRUE(local_state != nullptr); + + st = local_state->open(_helper.runtime_state.get()); + ASSERT_TRUE(st.ok()) << "open failed: " << st.to_string(); + + auto block = vectorized::ColumnHelper::create_block<vectorized::DataTypeInt32>( + {1, 2, 3, 4, 2, 3, 4, 3, 4, 4}); + + block.insert(vectorized::ColumnHelper::create_column_with_name<vectorized::DataTypeInt32>( + {1, 2, 3, 4, 2, 3, 4, 3, 4, 4})); + + st = sink_operator->sink(_helper.runtime_state.get(), &block, false); + ASSERT_TRUE(st.ok()) << "sink failed: " << st.to_string(); + + ASSERT_GT(sink_operator->revocable_mem_size(_helper.runtime_state.get()), 0); + + auto* inner_sink_local_state = reinterpret_cast<AggSinkLocalState*>( + local_state->_runtime_state->get_sink_local_state()); + ASSERT_GT(inner_sink_local_state->_get_hash_table_size(), 0); + + st = sink_operator->revoke_memory(_helper.runtime_state.get(), nullptr); + ASSERT_TRUE(st.ok()) << "revoke_memory failed: " << st.to_string(); + + while (local_state->_spill_dependency->is_blocked_by(nullptr) != nullptr) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + + ASSERT_EQ(inner_sink_local_state->_get_hash_table_size(), 0); + + st = sink_operator->sink(_helper.runtime_state.get(), &block, true); + ASSERT_TRUE(st.ok()) << "sink failed: " << st.to_string(); + + while (local_state->_spill_dependency->is_blocked_by(nullptr) != nullptr) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + + ASSERT_TRUE(dep->is_blocked_by(nullptr) == nullptr); + + st = sink_operator->close(_helper.runtime_state.get(), st); + ASSERT_TRUE(st.ok()) << "close failed: " << st.to_string(); +} + +TEST_F(PartitionedAggregationSinkOperatorTest, SinkWithSpillAndEmptyEOS) { + auto [source_operator, sink_operator] = _helper.create_operators(); + ASSERT_TRUE(source_operator != nullptr); + ASSERT_TRUE(sink_operator != nullptr); + + const auto tnode = _helper.create_test_plan_node(); + auto st = sink_operator->init(tnode, _helper.runtime_state.get()); + ASSERT_TRUE(st.ok()) << "init failed: " << st.to_string(); + + st = sink_operator->prepare(_helper.runtime_state.get()); + ASSERT_TRUE(st.ok()) << "prepare failed: " << st.to_string(); + + auto shared_state = sink_operator->create_shared_state(); + auto* dep = shared_state->create_source_dependency(source_operator->operator_id(), + source_operator->node_id(), + "PartitionedAggSinkTestDep"); + + LocalSinkStateInfo info {.task_idx = 0, + .parent_profile = _helper.runtime_profile.get(), + .sender_id = 0, + .shared_state = shared_state.get(), + .le_state_map = {}, + .tsink = TDataSink()}; + st = sink_operator->setup_local_state(_helper.runtime_state.get(), info); + ASSERT_TRUE(st.ok()) << "setup_local_state failed: " << st.to_string(); + + auto* local_state = reinterpret_cast<PartitionedAggSinkLocalState*>( + _helper.runtime_state->get_sink_local_state()); + ASSERT_TRUE(local_state != nullptr); + + st = local_state->open(_helper.runtime_state.get()); + ASSERT_TRUE(st.ok()) << "open failed: " << st.to_string(); + + auto block = vectorized::ColumnHelper::create_block<vectorized::DataTypeInt32>( + {1, 2, 3, 4, 2, 3, 4, 3, 4, 4}); + + block.insert(vectorized::ColumnHelper::create_column_with_name<vectorized::DataTypeInt32>( + {1, 2, 3, 4, 2, 3, 4, 3, 4, 4})); + + st = sink_operator->sink(_helper.runtime_state.get(), &block, false); + ASSERT_TRUE(st.ok()) << "sink failed: " << st.to_string(); + + ASSERT_GT(sink_operator->revocable_mem_size(_helper.runtime_state.get()), 0); + + auto* inner_sink_local_state = reinterpret_cast<AggSinkLocalState*>( + local_state->_runtime_state->get_sink_local_state()); + ASSERT_GT(inner_sink_local_state->_get_hash_table_size(), 0); + + st = sink_operator->revoke_memory(_helper.runtime_state.get(), nullptr); + ASSERT_TRUE(st.ok()) << "revoke_memory failed: " << st.to_string(); + + while (local_state->_spill_dependency->is_blocked_by(nullptr) != nullptr) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + + ASSERT_EQ(inner_sink_local_state->_get_hash_table_size(), 0); + + block.clear_column_data(); + st = sink_operator->sink(_helper.runtime_state.get(), &block, true); + ASSERT_TRUE(st.ok()) << "sink failed: " << st.to_string(); + ASSERT_TRUE(local_state->_spill_dependency->is_blocked_by(nullptr) == nullptr); + ASSERT_TRUE(dep->is_blocked_by(nullptr) == nullptr); + + st = sink_operator->close(_helper.runtime_state.get(), st); + ASSERT_TRUE(st.ok()) << "close failed: " << st.to_string(); +} + +TEST_F(PartitionedAggregationSinkOperatorTest, SinkWithSpillLargeData) { + auto [source_operator, sink_operator] = _helper.create_operators(); + ASSERT_TRUE(source_operator != nullptr); + ASSERT_TRUE(sink_operator != nullptr); + + const auto tnode = _helper.create_test_plan_node(); + auto st = sink_operator->init(tnode, _helper.runtime_state.get()); + ASSERT_TRUE(st.ok()) << "init failed: " << st.to_string(); + + st = sink_operator->prepare(_helper.runtime_state.get()); + ASSERT_TRUE(st.ok()) << "prepare failed: " << st.to_string(); + + auto shared_state = sink_operator->create_shared_state(); + auto* dep = shared_state->create_source_dependency(source_operator->operator_id(), + source_operator->node_id(), + "PartitionedAggSinkTestDep"); + + LocalSinkStateInfo info {.task_idx = 0, + .parent_profile = _helper.runtime_profile.get(), + .sender_id = 0, + .shared_state = shared_state.get(), + .le_state_map = {}, + .tsink = TDataSink()}; + st = sink_operator->setup_local_state(_helper.runtime_state.get(), info); + ASSERT_TRUE(st.ok()) << "setup_local_state failed: " << st.to_string(); + + auto* local_state = reinterpret_cast<PartitionedAggSinkLocalState*>( + _helper.runtime_state->get_sink_local_state()); + ASSERT_TRUE(local_state != nullptr); + + st = local_state->open(_helper.runtime_state.get()); + ASSERT_TRUE(st.ok()) << "open failed: " << st.to_string(); + + auto block = vectorized::ColumnHelper::create_block<vectorized::DataTypeInt32>( + {1, 2, 3, 4, 2, 3, 4, 3, 4, 4}); + + block.insert(vectorized::ColumnHelper::create_column_with_name<vectorized::DataTypeInt32>( + {1, 2, 3, 4, 2, 3, 4, 3, 4, 4})); + + st = sink_operator->sink(_helper.runtime_state.get(), &block, false); + ASSERT_TRUE(st.ok()) << "sink failed: " << st.to_string(); + + ASSERT_GT(sink_operator->revocable_mem_size(_helper.runtime_state.get()), 0); + + auto* inner_sink_local_state = reinterpret_cast<AggSinkLocalState*>( + local_state->_runtime_state->get_sink_local_state()); + ASSERT_GT(inner_sink_local_state->_get_hash_table_size(), 0); + + st = sink_operator->revoke_memory(_helper.runtime_state.get(), nullptr); + ASSERT_TRUE(st.ok()) << "revoke_memory failed: " << st.to_string(); + + while (local_state->_spill_dependency->is_blocked_by(nullptr) != nullptr) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + + auto* spill_write_rows_counter = local_state->profile()->get_counter("SpillWriteRows"); + ASSERT_TRUE(spill_write_rows_counter != nullptr); + ASSERT_EQ(spill_write_rows_counter->value(), 4); + + ASSERT_EQ(inner_sink_local_state->_get_hash_table_size(), 0); + + const size_t count = 1048576; + std::vector<int32_t> data(count); + std::iota(data.begin(), data.end(), 0); + block = vectorized::ColumnHelper::create_block<vectorized::DataTypeInt32>(data); + + block.insert( + vectorized::ColumnHelper::create_column_with_name<vectorized::DataTypeInt32>(data)); + st = sink_operator->sink(_helper.runtime_state.get(), &block, false); + ASSERT_TRUE(st.ok()) << "sink failed: " << st.to_string(); + + while (local_state->_spill_dependency->is_blocked_by(nullptr) != nullptr) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + + block.clear_column_data(); + st = sink_operator->sink(_helper.runtime_state.get(), &block, true); + ASSERT_TRUE(st.ok()) << "sink failed: " << st.to_string(); + ASSERT_TRUE(local_state->_spill_dependency->is_blocked_by(nullptr) == nullptr); + ASSERT_TRUE(dep->is_blocked_by(nullptr) == nullptr); + + st = sink_operator->close(_helper.runtime_state.get(), st); + ASSERT_EQ(spill_write_rows_counter->value(), 1048576 + 4); + ASSERT_TRUE(st.ok()) << "close failed: " << st.to_string(); +} + +TEST_F(PartitionedAggregationSinkOperatorTest, SinkWithSpilError) { + auto [source_operator, sink_operator] = _helper.create_operators(); + ASSERT_TRUE(source_operator != nullptr); + ASSERT_TRUE(sink_operator != nullptr); + + const auto tnode = _helper.create_test_plan_node(); + auto st = sink_operator->init(tnode, _helper.runtime_state.get()); + ASSERT_TRUE(st.ok()) << "init failed: " << st.to_string(); + + st = sink_operator->prepare(_helper.runtime_state.get()); + ASSERT_TRUE(st.ok()) << "prepare failed: " << st.to_string(); + + auto shared_state = sink_operator->create_shared_state(); + + LocalSinkStateInfo info {.task_idx = 0, + .parent_profile = _helper.runtime_profile.get(), + .sender_id = 0, + .shared_state = shared_state.get(), + .le_state_map = {}, + .tsink = TDataSink()}; + st = sink_operator->setup_local_state(_helper.runtime_state.get(), info); + ASSERT_TRUE(st.ok()) << "setup_local_state failed: " << st.to_string(); + + auto* local_state = reinterpret_cast<PartitionedAggSinkLocalState*>( + _helper.runtime_state->get_sink_local_state()); + ASSERT_TRUE(local_state != nullptr); + + st = local_state->open(_helper.runtime_state.get()); + ASSERT_TRUE(st.ok()) << "open failed: " << st.to_string(); + + auto block = vectorized::ColumnHelper::create_block<vectorized::DataTypeInt32>( + {1, 2, 3, 4, 2, 3, 4, 3, 4, 4}); + + block.insert(vectorized::ColumnHelper::create_column_with_name<vectorized::DataTypeInt32>( + {1, 2, 3, 4, 2, 3, 4, 3, 4, 4})); + + st = sink_operator->sink(_helper.runtime_state.get(), &block, false); + ASSERT_TRUE(st.ok()) << "sink failed: " << st.to_string(); + + ASSERT_GT(sink_operator->revocable_mem_size(_helper.runtime_state.get()), 0); + + auto* inner_sink_local_state = reinterpret_cast<AggSinkLocalState*>( + local_state->_runtime_state->get_sink_local_state()); + ASSERT_GT(inner_sink_local_state->_get_hash_table_size(), 0); + + SpillableDebugPointHelper dp_helper("fault_inject::spill_stream::spill_block"); + st = sink_operator->revoke_memory(_helper.runtime_state.get(), nullptr); + ASSERT_TRUE(st.ok()) << "revoke_memory failed: " << st.to_string(); + + while (local_state->_spill_dependency->is_blocked_by(nullptr) != nullptr) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + + std::cout << "profile: " << _helper.runtime_profile->pretty_print() << std::endl; + + ASSERT_FALSE(dp_helper.get_spill_status().ok()) << "spilll status should be failed"; +} + +} // namespace doris::pipeline \ No newline at end of file diff --git a/be/test/pipeline/operator/partitioned_aggregation_source_operator_test.cpp b/be/test/pipeline/operator/partitioned_aggregation_source_operator_test.cpp new file mode 100644 index 00000000000..a23c7237979 --- /dev/null +++ b/be/test/pipeline/operator/partitioned_aggregation_source_operator_test.cpp @@ -0,0 +1,470 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "pipeline/exec/partitioned_aggregation_source_operator.h" + +#include <gtest/gtest.h> + +#include <memory> + +#include "common/config.h" +#include "partitioned_aggregation_test_helper.h" +#include "pipeline/dependency.h" +#include "pipeline/exec/aggregation_sink_operator.h" +#include "pipeline/exec/operator.h" +#include "pipeline/exec/partitioned_aggregation_sink_operator.h" +#include "pipeline/exec/partitioned_hash_join_probe_operator.h" +#include "pipeline/exec/partitioned_hash_join_sink_operator.h" +#include "pipeline/pipeline_task.h" +#include "runtime/fragment_mgr.h" +#include "testutil/column_helper.h" +#include "testutil/mock/mock_runtime_state.h" +#include "vec/core/block.h" +#include "vec/data_types/data_type_number.h" + +namespace doris::pipeline { +class PartitionedAggregationSourceOperatorTest : public testing::Test { +protected: + void SetUp() override { _helper.SetUp(); } + void TearDown() override { _helper.TearDown(); } + + PartitionedAggregationTestHelper _helper; +}; + +TEST_F(PartitionedAggregationSourceOperatorTest, Init) { + auto [source_operator, sink_operator] = _helper.create_operators(); + ASSERT_TRUE(source_operator != nullptr); + ASSERT_TRUE(sink_operator != nullptr); + + const auto tnode = _helper.create_test_plan_node(); + auto st = source_operator->init(tnode, _helper.runtime_state.get()); + ASSERT_TRUE(st.ok()) << "init failed: " << st.to_string(); + + st = source_operator->prepare(_helper.runtime_state.get()); + ASSERT_TRUE(st.ok()) << "prepare failed: " << st.to_string(); + + std::shared_ptr<MockPartitionedAggSharedState> shared_state = + MockPartitionedAggSharedState::create_shared(); + + shared_state->in_mem_shared_state_sptr = std::make_shared<AggSharedState>(); + shared_state->in_mem_shared_state = + reinterpret_cast<AggSharedState*>(shared_state->in_mem_shared_state_sptr.get()); + + LocalStateInfo info { + .parent_profile = _helper.runtime_profile.get(), + .scan_ranges = {}, + .shared_state = shared_state.get(), + .le_state_map = {}, + .task_idx = 0, + }; + st = source_operator->setup_local_state(_helper.runtime_state.get(), info); + ASSERT_TRUE(st.ok()) << "setup_local_state failed: " << st.to_string(); + + auto* local_state = _helper.runtime_state->get_local_state(source_operator->operator_id()); + ASSERT_TRUE(local_state != nullptr); + + st = local_state->open(_helper.runtime_state.get()); + ASSERT_TRUE(st.ok()) << "open failed: " << st.to_string(); + + st = source_operator->close(_helper.runtime_state.get()); + ASSERT_TRUE(st.ok()) << "close failed: " << st.to_string(); + + st = local_state->open(_helper.runtime_state.get()); + ASSERT_TRUE(st.ok()) << "open failed: " << st.to_string(); +} + +TEST_F(PartitionedAggregationSourceOperatorTest, GetBlockEmpty) { + auto [source_operator, sink_operator] = _helper.create_operators(); + ASSERT_TRUE(source_operator != nullptr); + ASSERT_TRUE(sink_operator != nullptr); + + const auto tnode = _helper.create_test_plan_node(); + auto st = source_operator->init(tnode, _helper.runtime_state.get()); + ASSERT_TRUE(st.ok()) << "init failed: " << st.to_string(); + + st = source_operator->prepare(_helper.runtime_state.get()); + ASSERT_TRUE(st.ok()) << "prepare failed: " << st.to_string(); + + st = sink_operator->init(tnode, _helper.runtime_state.get()); + ASSERT_TRUE(st.ok()) << "init failed: " << st.to_string(); + + st = sink_operator->prepare(_helper.runtime_state.get()); + ASSERT_TRUE(st.ok()) << "prepare failed: " << st.to_string(); + + auto shared_state = sink_operator->create_shared_state(); + shared_state->create_source_dependency(source_operator->operator_id(), + source_operator->node_id(), "PartitionedAggSinkTestDep"); + + LocalSinkStateInfo sink_info {.task_idx = 0, + .parent_profile = _helper.runtime_profile.get(), + .sender_id = 0, + .shared_state = shared_state.get(), + .le_state_map = {}, + .tsink = TDataSink()}; + st = sink_operator->setup_local_state(_helper.runtime_state.get(), sink_info); + ASSERT_TRUE(st.ok()) << "setup_local_state failed: " << st.to_string(); + + auto* sink_local_state = _helper.runtime_state->get_sink_local_state(); + ASSERT_TRUE(sink_local_state != nullptr); + + st = sink_local_state->open(_helper.runtime_state.get()); + ASSERT_TRUE(st.ok()) << "open failed: " << st.to_string(); + + LocalStateInfo info { + .parent_profile = _helper.runtime_profile.get(), + .scan_ranges = {}, + .shared_state = shared_state.get(), + .le_state_map = {}, + .task_idx = 0, + }; + st = source_operator->setup_local_state(_helper.runtime_state.get(), info); + ASSERT_TRUE(st.ok()) << "setup_local_state failed: " << st.to_string(); + + auto* local_state = reinterpret_cast<PartitionedAggLocalState*>( + _helper.runtime_state->get_local_state(source_operator->operator_id())); + ASSERT_TRUE(local_state != nullptr); + + local_state->_copy_shared_spill_profile = false; + + st = local_state->open(_helper.runtime_state.get()); + ASSERT_TRUE(st.ok()) << "open failed: " << st.to_string(); + + vectorized::Block block; + bool eos = false; + st = source_operator->get_block(_helper.runtime_state.get(), &block, &eos); + ASSERT_TRUE(st.ok()) << "get_block failed: " << st.to_string(); + + ASSERT_EQ(block.rows(), 0); + ASSERT_TRUE(eos); + + st = source_operator->close(_helper.runtime_state.get()); + ASSERT_TRUE(st.ok()) << "close failed: " << st.to_string(); + + st = local_state->open(_helper.runtime_state.get()); + ASSERT_TRUE(st.ok()) << "open failed: " << st.to_string(); +} + +TEST_F(PartitionedAggregationSourceOperatorTest, GetBlock) { + auto [source_operator, sink_operator] = _helper.create_operators(); + ASSERT_TRUE(source_operator != nullptr); + ASSERT_TRUE(sink_operator != nullptr); + + const auto tnode = _helper.create_test_plan_node(); + auto st = source_operator->init(tnode, _helper.runtime_state.get()); + ASSERT_TRUE(st.ok()) << "init failed: " << st.to_string(); + + st = source_operator->prepare(_helper.runtime_state.get()); + ASSERT_TRUE(st.ok()) << "prepare failed: " << st.to_string(); + + st = sink_operator->init(tnode, _helper.runtime_state.get()); + ASSERT_TRUE(st.ok()) << "init failed: " << st.to_string(); + + st = sink_operator->prepare(_helper.runtime_state.get()); + ASSERT_TRUE(st.ok()) << "prepare failed: " << st.to_string(); + + auto shared_state = sink_operator->create_shared_state(); + shared_state->create_source_dependency(source_operator->operator_id(), + source_operator->node_id(), "PartitionedAggSinkTestDep"); + + LocalSinkStateInfo sink_info {.task_idx = 0, + .parent_profile = _helper.runtime_profile.get(), + .sender_id = 0, + .shared_state = shared_state.get(), + .le_state_map = {}, + .tsink = TDataSink()}; + st = sink_operator->setup_local_state(_helper.runtime_state.get(), sink_info); + ASSERT_TRUE(st.ok()) << "setup_local_state failed: " << st.to_string(); + + auto* sink_local_state = reinterpret_cast<PartitionedAggSinkLocalState*>( + _helper.runtime_state->get_sink_local_state()); + ASSERT_TRUE(sink_local_state != nullptr); + + st = sink_local_state->open(_helper.runtime_state.get()); + ASSERT_TRUE(st.ok()) << "open failed: " << st.to_string(); + + auto block = vectorized::ColumnHelper::create_block<vectorized::DataTypeInt32>( + {1, 2, 3, 4, 2, 3, 4, 3, 4, 4}); + + block.insert(vectorized::ColumnHelper::create_column_with_name<vectorized::DataTypeInt32>( + {1, 2, 3, 4, 2, 3, 4, 3, 4, 4})); + + st = sink_operator->sink(_helper.runtime_state.get(), &block, false); + ASSERT_TRUE(st.ok()) << "sink failed: " << st.to_string(); + + st = sink_operator->sink(_helper.runtime_state.get(), &block, true); + ASSERT_TRUE(st.ok()) << "sink failed: " << st.to_string(); + + ASSERT_GT(sink_operator->revocable_mem_size(_helper.runtime_state.get()), 0); + + auto* inner_sink_local_state = reinterpret_cast<AggSinkLocalState*>( + sink_local_state->_runtime_state->get_sink_local_state()); + ASSERT_GT(inner_sink_local_state->_get_hash_table_size(), 0); + + LocalStateInfo info { + .parent_profile = _helper.runtime_profile.get(), + .scan_ranges = {}, + .shared_state = shared_state.get(), + .le_state_map = {}, + .task_idx = 0, + }; + st = source_operator->setup_local_state(_helper.runtime_state.get(), info); + ASSERT_TRUE(st.ok()) << "setup_local_state failed: " << st.to_string(); + + auto* local_state = reinterpret_cast<PartitionedAggLocalState*>( + _helper.runtime_state->get_local_state(source_operator->operator_id())); + ASSERT_TRUE(local_state != nullptr); + + local_state->_copy_shared_spill_profile = false; + + st = local_state->open(_helper.runtime_state.get()); + ASSERT_TRUE(st.ok()) << "open failed: " << st.to_string(); + + block.clear(); + bool eos = false; + st = source_operator->get_block(_helper.runtime_state.get(), &block, &eos); + ASSERT_TRUE(st.ok()) << "get_block failed: " << st.to_string(); + + ASSERT_TRUE(eos); + DCHECK_EQ(block.rows(), 4); + ASSERT_EQ(block.rows(), 4); + + st = source_operator->close(_helper.runtime_state.get()); + ASSERT_TRUE(st.ok()) << "close failed: " << st.to_string(); + + st = local_state->open(_helper.runtime_state.get()); + ASSERT_TRUE(st.ok()) << "open failed: " << st.to_string(); +} + +TEST_F(PartitionedAggregationSourceOperatorTest, GetBlockWithSpill) { + auto [source_operator, sink_operator] = _helper.create_operators(); + ASSERT_TRUE(source_operator != nullptr); + ASSERT_TRUE(sink_operator != nullptr); + + const auto tnode = _helper.create_test_plan_node(); + auto st = source_operator->init(tnode, _helper.runtime_state.get()); + ASSERT_TRUE(st.ok()) << "init failed: " << st.to_string(); + + st = source_operator->prepare(_helper.runtime_state.get()); + ASSERT_TRUE(st.ok()) << "prepare failed: " << st.to_string(); + + st = sink_operator->init(tnode, _helper.runtime_state.get()); + ASSERT_TRUE(st.ok()) << "init failed: " << st.to_string(); + + st = sink_operator->prepare(_helper.runtime_state.get()); + ASSERT_TRUE(st.ok()) << "prepare failed: " << st.to_string(); + + auto shared_state = std::dynamic_pointer_cast<PartitionedAggSharedState>( + sink_operator->create_shared_state()); + shared_state->create_source_dependency(source_operator->operator_id(), + source_operator->node_id(), "PartitionedAggSinkTestDep"); + + LocalSinkStateInfo sink_info {.task_idx = 0, + .parent_profile = _helper.runtime_profile.get(), + .sender_id = 0, + .shared_state = shared_state.get(), + .le_state_map = {}, + .tsink = TDataSink()}; + st = sink_operator->setup_local_state(_helper.runtime_state.get(), sink_info); + ASSERT_TRUE(st.ok()) << "setup_local_state failed: " << st.to_string(); + + auto* sink_local_state = reinterpret_cast<PartitionedAggSinkLocalState*>( + _helper.runtime_state->get_sink_local_state()); + ASSERT_TRUE(sink_local_state != nullptr); + + st = sink_local_state->open(_helper.runtime_state.get()); + ASSERT_TRUE(st.ok()) << "open failed: " << st.to_string(); + + auto block = vectorized::ColumnHelper::create_block<vectorized::DataTypeInt32>( + {1, 2, 3, 4, 2, 3, 4, 3, 4, 4}); + + block.insert(vectorized::ColumnHelper::create_column_with_name<vectorized::DataTypeInt32>( + {1, 2, 3, 4, 2, 3, 4, 3, 4, 4})); + + st = sink_operator->sink(_helper.runtime_state.get(), &block, false); + ASSERT_TRUE(st.ok()) << "sink failed: " << st.to_string(); + + st = sink_operator->revoke_memory(_helper.runtime_state.get(), nullptr); + ASSERT_TRUE(st.ok()) << "revoke_memory failed: " << st.to_string(); + + while (sink_local_state->_spill_dependency->is_blocked_by(nullptr) != nullptr) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + + ASSERT_TRUE(shared_state->is_spilled); + + st = sink_operator->sink(_helper.runtime_state.get(), &block, true); + ASSERT_TRUE(st.ok()) << "sink failed: " << st.to_string(); + + while (sink_local_state->_spill_dependency->is_blocked_by(nullptr) != nullptr) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + + ASSERT_EQ(sink_operator->revocable_mem_size(_helper.runtime_state.get()), 0); + + auto* inner_sink_local_state = reinterpret_cast<AggSinkLocalState*>( + sink_local_state->_runtime_state->get_sink_local_state()); + ASSERT_EQ(inner_sink_local_state->_get_hash_table_size(), 0); + + LocalStateInfo info { + .parent_profile = _helper.runtime_profile.get(), + .scan_ranges = {}, + .shared_state = shared_state.get(), + .le_state_map = {}, + .task_idx = 0, + }; + st = source_operator->setup_local_state(_helper.runtime_state.get(), info); + ASSERT_TRUE(st.ok()) << "setup_local_state failed: " << st.to_string(); + + auto* local_state = reinterpret_cast<PartitionedAggLocalState*>( + _helper.runtime_state->get_local_state(source_operator->operator_id())); + ASSERT_TRUE(local_state != nullptr); + + local_state->_copy_shared_spill_profile = false; + + st = local_state->open(_helper.runtime_state.get()); + ASSERT_TRUE(st.ok()) << "open failed: " << st.to_string(); + + block.clear(); + bool eos = false; + size_t rows = 0; + while (!eos) { + st = source_operator->get_block(_helper.runtime_state.get(), &block, &eos); + ASSERT_TRUE(st.ok()) << "get_block failed: " << st.to_string(); + + while (local_state->_spill_dependency->is_blocked_by(nullptr) != nullptr) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + rows += block.rows(); + block.clear_column_data(); + } + + ASSERT_TRUE(eos); + ASSERT_EQ(rows, 4); + + st = source_operator->close(_helper.runtime_state.get()); + ASSERT_TRUE(st.ok()) << "close failed: " << st.to_string(); + + st = local_state->open(_helper.runtime_state.get()); + ASSERT_TRUE(st.ok()) << "open failed: " << st.to_string(); +} + +TEST_F(PartitionedAggregationSourceOperatorTest, GetBlockWithSpillError) { + auto [source_operator, sink_operator] = _helper.create_operators(); + ASSERT_TRUE(source_operator != nullptr); + ASSERT_TRUE(sink_operator != nullptr); + + const auto tnode = _helper.create_test_plan_node(); + auto st = source_operator->init(tnode, _helper.runtime_state.get()); + ASSERT_TRUE(st.ok()) << "init failed: " << st.to_string(); + + st = source_operator->prepare(_helper.runtime_state.get()); + ASSERT_TRUE(st.ok()) << "prepare failed: " << st.to_string(); + + st = sink_operator->init(tnode, _helper.runtime_state.get()); + ASSERT_TRUE(st.ok()) << "init failed: " << st.to_string(); + + st = sink_operator->prepare(_helper.runtime_state.get()); + ASSERT_TRUE(st.ok()) << "prepare failed: " << st.to_string(); + + auto shared_state = std::dynamic_pointer_cast<PartitionedAggSharedState>( + sink_operator->create_shared_state()); + shared_state->create_source_dependency(source_operator->operator_id(), + source_operator->node_id(), "PartitionedAggSinkTestDep"); + + LocalSinkStateInfo sink_info {.task_idx = 0, + .parent_profile = _helper.runtime_profile.get(), + .sender_id = 0, + .shared_state = shared_state.get(), + .le_state_map = {}, + .tsink = TDataSink()}; + st = sink_operator->setup_local_state(_helper.runtime_state.get(), sink_info); + ASSERT_TRUE(st.ok()) << "setup_local_state failed: " << st.to_string(); + + auto* sink_local_state = reinterpret_cast<PartitionedAggSinkLocalState*>( + _helper.runtime_state->get_sink_local_state()); + ASSERT_TRUE(sink_local_state != nullptr); + + st = sink_local_state->open(_helper.runtime_state.get()); + ASSERT_TRUE(st.ok()) << "open failed: " << st.to_string(); + + auto block = vectorized::ColumnHelper::create_block<vectorized::DataTypeInt32>( + {1, 2, 3, 4, 2, 3, 4, 3, 4, 4}); + + block.insert(vectorized::ColumnHelper::create_column_with_name<vectorized::DataTypeInt32>( + {1, 2, 3, 4, 2, 3, 4, 3, 4, 4})); + + st = sink_operator->sink(_helper.runtime_state.get(), &block, false); + ASSERT_TRUE(st.ok()) << "sink failed: " << st.to_string(); + + st = sink_operator->revoke_memory(_helper.runtime_state.get(), nullptr); + ASSERT_TRUE(st.ok()) << "revoke_memory failed: " << st.to_string(); + + while (sink_local_state->_spill_dependency->is_blocked_by(nullptr) != nullptr) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + + ASSERT_TRUE(shared_state->is_spilled); + + st = sink_operator->sink(_helper.runtime_state.get(), &block, true); + ASSERT_TRUE(st.ok()) << "sink failed: " << st.to_string(); + + while (sink_local_state->_spill_dependency->is_blocked_by(nullptr) != nullptr) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + + ASSERT_EQ(sink_operator->revocable_mem_size(_helper.runtime_state.get()), 0); + + auto* inner_sink_local_state = reinterpret_cast<AggSinkLocalState*>( + sink_local_state->_runtime_state->get_sink_local_state()); + ASSERT_EQ(inner_sink_local_state->_get_hash_table_size(), 0); + + LocalStateInfo info { + .parent_profile = _helper.runtime_profile.get(), + .scan_ranges = {}, + .shared_state = shared_state.get(), + .le_state_map = {}, + .task_idx = 0, + }; + st = source_operator->setup_local_state(_helper.runtime_state.get(), info); + ASSERT_TRUE(st.ok()) << "setup_local_state failed: " << st.to_string(); + + auto* local_state = reinterpret_cast<PartitionedAggLocalState*>( + _helper.runtime_state->get_local_state(source_operator->operator_id())); + ASSERT_TRUE(local_state != nullptr); + + local_state->_copy_shared_spill_profile = false; + + st = local_state->open(_helper.runtime_state.get()); + ASSERT_TRUE(st.ok()) << "open failed: " << st.to_string(); + + SpillableDebugPointHelper dp_helper("fault_inject::spill_stream::read_next_block"); + + block.clear(); + bool eos = false; + while (!eos && dp_helper.get_spill_status().ok()) { + st = source_operator->get_block(_helper.runtime_state.get(), &block, &eos); + ASSERT_TRUE(st.ok()) << "get_block failed: " << st.to_string(); + + while (local_state->_spill_dependency->is_blocked_by(nullptr) != nullptr) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + block.clear_column_data(); + } + + ASSERT_FALSE(dp_helper.get_spill_status().ok()); +} +} // namespace doris::pipeline diff --git a/be/test/pipeline/operator/partitioned_aggregation_test_helper.cpp b/be/test/pipeline/operator/partitioned_aggregation_test_helper.cpp new file mode 100644 index 00000000000..0d2a653c6c3 --- /dev/null +++ b/be/test/pipeline/operator/partitioned_aggregation_test_helper.cpp @@ -0,0 +1,238 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "partitioned_aggregation_test_helper.h" + +#include <gen_cpp/DataSinks_types.h> +#include <gen_cpp/Descriptors_types.h> +#include <gmock/gmock-actions.h> +#include <gmock/gmock-function-mocker.h> +#include <gmock/gmock-spec-builders.h> +#include <gmock/gmock.h> +#include <gtest/gtest.h> + +#include <memory> +#include <vector> + +#include "runtime/define_primitive_type.h" +#include "testutil/creators.h" +#include "testutil/mock/mock_operators.h" + +namespace doris::pipeline { +TPlanNode PartitionedAggregationTestHelper::create_test_plan_node() { + TPlanNode tnode; + tnode.node_id = 0; + tnode.node_type = TPlanNodeType::AGGREGATION_NODE; + tnode.num_children = 1; + tnode.agg_node.use_streaming_preaggregation = false; + tnode.agg_node.need_finalize = false; + tnode.agg_node.intermediate_tuple_id = 1; + tnode.agg_node.output_tuple_id = 2; + tnode.limit = -1; + + auto& grouping_expr = tnode.agg_node.grouping_exprs.emplace_back(); + auto& expr_node = grouping_expr.nodes.emplace_back(); + expr_node.node_type = TExprNodeType::SLOT_REF; + + TTypeNode type_node; + type_node.type = TTypeNodeType::SCALAR; + type_node.scalar_type.type = TPrimitiveType::INT; + type_node.__isset.scalar_type = true; + + expr_node.type.types.emplace_back(type_node); + expr_node.__set_is_nullable(false); + expr_node.num_children = 0; + expr_node.slot_ref.slot_id = 0; + expr_node.slot_ref.tuple_id = 0; + + auto& agg_function = tnode.agg_node.aggregate_functions.emplace_back(); + auto& fn_node = agg_function.nodes.emplace_back(); + fn_node.node_type = TExprNodeType::FUNCTION_CALL; + fn_node.__set_is_nullable(false); + fn_node.num_children = 1; + + TFunctionName fn_name; + fn_name.function_name = "sum"; + + fn_node.fn.__set_name(fn_name); + + TTypeDesc ret_type; + auto& ret_type_node = ret_type.types.emplace_back(); + ret_type_node.scalar_type.type = TPrimitiveType::BIGINT; + ret_type_node.__isset.scalar_type = true; + ret_type_node.type = TTypeNodeType::SCALAR; + ret_type.__set_is_nullable(false); + + TTypeDesc arg_type; + auto& arg_type_node = arg_type.types.emplace_back(); + arg_type_node.scalar_type.type = TPrimitiveType::INT; + arg_type_node.__isset.scalar_type = true; + arg_type_node.type = TTypeNodeType::SCALAR; + + fn_node.fn.__set_ret_type(ret_type); + + fn_node.fn.__set_arg_types({arg_type}); + fn_node.agg_expr.__set_param_types({arg_type}); + + auto& fn_child_node = agg_function.nodes.emplace_back(); + fn_child_node.node_type = TExprNodeType::SLOT_REF; + fn_child_node.__set_is_nullable(false); + fn_child_node.num_children = 0; + fn_child_node.slot_ref.slot_id = 1; + fn_child_node.slot_ref.tuple_id = 0; + fn_child_node.type.types.emplace_back(type_node); + + tnode.row_tuples.push_back(0); + tnode.nullable_tuples.push_back(false); + + return tnode; +} + +TDescriptorTable PartitionedAggregationTestHelper::create_test_table_descriptor( + bool nullable = false) { + TTupleDescriptorBuilder tuple_builder; + tuple_builder + .add_slot(TSlotDescriptorBuilder() + .type(PrimitiveType::TYPE_INT) + .column_name("col1") + .column_pos(0) + .nullable(nullable) + .build()) + .add_slot(TSlotDescriptorBuilder() + .type(PrimitiveType::TYPE_INT) + .column_name("col2") + .column_pos(0) + .nullable(nullable) + .build()); + + TDescriptorTableBuilder builder; + + tuple_builder.build(&builder); + + TTupleDescriptorBuilder() + .add_slot(TSlotDescriptorBuilder() + .type(TYPE_INT) + .column_name("col3") + .column_pos(0) + .nullable(nullable) + .build()) + .add_slot(TSlotDescriptorBuilder() + .type(TYPE_BIGINT) + .column_name("col4") + .column_pos(0) + .nullable(true) + .build()) + .build(&builder); + + TTupleDescriptorBuilder() + .add_slot(TSlotDescriptorBuilder() + .type(TYPE_INT) + .column_name("col5") + .column_pos(0) + .nullable(nullable) + .build()) + .add_slot(TSlotDescriptorBuilder() + .type(TYPE_BIGINT) + .column_name("col6") + .column_pos(0) + .nullable(true) + .build()) + .build(&builder); + + return builder.desc_tbl(); +} + +std::tuple<std::shared_ptr<PartitionedAggSourceOperatorX>, + std::shared_ptr<PartitionedAggSinkOperatorX>> +PartitionedAggregationTestHelper::create_operators() { + TPlanNode tnode = create_test_plan_node(); + auto desc_tbl = runtime_state->desc_tbl(); + + EXPECT_EQ(desc_tbl.get_tuple_descs().size(), 3); + + auto source_operator = + std::make_shared<PartitionedAggSourceOperatorX>(obj_pool.get(), tnode, 0, desc_tbl); + auto sink_operator = std::make_shared<PartitionedAggSinkOperatorX>(obj_pool.get(), 0, 0, tnode, + desc_tbl, false); + + auto child_operator = std::make_shared<MockChildOperator>(); + auto probe_side_source_operator = std::make_shared<MockChildOperator>(); + auto source_side_sink_operator = std::make_shared<MockSinkOperator>(); + auto [source_pipeline, _] = generate_agg_pipeline(source_operator, source_side_sink_operator, + sink_operator, child_operator); + + RowDescriptor row_desc(runtime_state->desc_tbl(), {0}, {false}); + child_operator->_row_descriptor = row_desc; + + EXPECT_TRUE(sink_operator->set_child(child_operator)); + + // Setup task and state + std::map<int, std::pair<std::shared_ptr<LocalExchangeSharedState>, std::shared_ptr<Dependency>>> + le_state_map; + pipeline_task = std::make_shared<PipelineTask>(source_pipeline, 0, runtime_state.get(), nullptr, + nullptr, le_state_map, 0); + runtime_state->set_task(pipeline_task.get()); + return {std::move(source_operator), std::move(sink_operator)}; +} + +PartitionedAggLocalState* PartitionedAggregationTestHelper::create_source_local_state( + RuntimeState* state, PartitionedAggSourceOperatorX* probe_operator, + std::shared_ptr<MockPartitionedAggSharedState>& shared_state) { + auto local_state_uptr = std::make_unique<MockPartitionedAggLocalState>(state, probe_operator); + auto* local_state = local_state_uptr.get(); + shared_state = std::make_shared<MockPartitionedAggSharedState>(); + local_state->_shared_state = shared_state.get(); + shared_state->is_spilled = true; + + ADD_TIMER(local_state->profile(), "ExecTime"); + local_state->profile()->AddHighWaterMarkCounter("MemoryUsage", TUnit::BYTES, "", 0); + local_state->init_spill_read_counters(); + local_state->init_spill_write_counters(); + local_state->_copy_shared_spill_profile = false; + local_state->_internal_runtime_profile = std::make_unique<RuntimeProfile>("inner_test"); + + local_state->_spill_dependency = + Dependency::create_shared(0, 0, "PartitionedHashJoinProbeOperatorTestSpillDep", true); + + state->emplace_local_state(probe_operator->operator_id(), std::move(local_state_uptr)); + return local_state; +} + +PartitionedAggSinkLocalState* PartitionedAggregationTestHelper::create_sink_local_state( + RuntimeState* state, PartitionedAggSinkOperatorX* sink_operator, + std::shared_ptr<MockPartitionedAggSharedState>& shared_state) { + auto local_state_uptr = MockPartitionedAggSinkLocalState::create_unique(sink_operator, state); + auto* local_state = local_state_uptr.get(); + shared_state = std::make_shared<MockPartitionedAggSharedState>(); + local_state->init_spill_counters(); + + ADD_TIMER(local_state->profile(), "ExecTime"); + local_state->profile()->AddHighWaterMarkCounter("MemoryUsage", TUnit::BYTES, "", 0); + local_state->_internal_runtime_profile = std::make_unique<RuntimeProfile>("inner_test"); + + local_state->_dependency = shared_state->create_sink_dependency( + sink_operator->dests_id().front(), sink_operator->operator_id(), + "PartitionedHashJoinTestDep"); + + local_state->_spill_dependency = + Dependency::create_shared(0, 0, "PartitionedHashJoinSinkOperatorTestSpillDep", true); + shared_state->setup_shared_profile(local_state->profile()); + + state->emplace_sink_local_state(sink_operator->operator_id(), std::move(local_state_uptr)); + return local_state; +} +} // namespace doris::pipeline \ No newline at end of file diff --git a/be/test/pipeline/operator/partitioned_aggregation_test_helper.h b/be/test/pipeline/operator/partitioned_aggregation_test_helper.h new file mode 100644 index 00000000000..cc87236d6d4 --- /dev/null +++ b/be/test/pipeline/operator/partitioned_aggregation_test_helper.h @@ -0,0 +1,155 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <gen_cpp/DataSinks_types.h> +#include <gen_cpp/Descriptors_types.h> +#include <gmock/gmock-actions.h> +#include <gmock/gmock-function-mocker.h> +#include <gmock/gmock-spec-builders.h> +#include <gmock/gmock.h> +#include <gtest/gtest.h> + +#include <memory> + +#include "common/config.h" +#include "common/factory_creator.h" +#include "common/object_pool.h" +#include "pipeline/exec/aggregation_sink_operator.h" +#include "pipeline/exec/aggregation_source_operator.h" +#include "pipeline/exec/partitioned_aggregation_sink_operator.h" +#include "pipeline/exec/partitioned_aggregation_source_operator.h" +#include "pipeline/pipeline_task.h" +#include "runtime/fragment_mgr.h" +#include "spillable_operator_test_helper.h" +#include "util/runtime_profile.h" +#include "vec/core/block.h" +#include "vec/spill/spill_stream_manager.h" + +namespace doris::pipeline { +class MockAggSharedState : public AggSharedState { +public: +}; + +class MockPartitionedAggSharedState : public PartitionedAggSharedState { + ENABLE_FACTORY_CREATOR(MockPartitionedAggSharedState); + +public: + MockPartitionedAggSharedState() { is_spilled = false; } +}; + +class MockPartitionedAggSinkLocalState : public PartitionedAggSinkLocalState { + ENABLE_FACTORY_CREATOR(MockPartitionedAggSinkLocalState) +public: + MockPartitionedAggSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state) + : PartitionedAggSinkLocalState(parent, state) { + _runtime_profile = std::make_unique<RuntimeProfile>("test"); + _profile = _runtime_profile.get(); + _memory_used_counter = + _profile->AddHighWaterMarkCounter("MemoryUsage", TUnit::BYTES, "", 1); + } + + Status init(RuntimeState* state, LocalSinkStateInfo& info) override { return Status::OK(); } + Status open(RuntimeState* state) override { return Status::OK(); } + Status close(RuntimeState* state, Status status) override { return Status::OK(); } + +private: + std::unique_ptr<RuntimeProfile> _runtime_profile; +}; + +class MockPartitionedAggSinkOperatorX : public PartitionedAggSinkOperatorX { +public: + MockPartitionedAggSinkOperatorX(ObjectPool* pool, int operator_id, int dest_id, + const TPlanNode& tnode, const DescriptorTbl& descs) + : PartitionedAggSinkOperatorX(pool, operator_id, dest_id, tnode, descs, false) {} + ~MockPartitionedAggSinkOperatorX() override = default; + + Status prepare(RuntimeState* state) override { return Status::OK(); } + + Status setup_local_state(RuntimeState* state, LocalSinkStateInfo& info) override { + state->emplace_sink_local_state( + _operator_id, MockPartitionedAggSinkLocalState::create_unique(this, state)); + return Status::OK(); + } + + Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override { + return Status::OK(); + } +}; + +class MockPartitionedAggLocalState : public PartitionedAggLocalState { + ENABLE_FACTORY_CREATOR(MockPartitionedAggLocalState); + +public: + MockPartitionedAggLocalState(RuntimeState* state, OperatorXBase* parent) + : PartitionedAggLocalState(state, parent) { + _runtime_profile = std::make_unique<RuntimeProfile>("test"); + } + + Status open(RuntimeState* state) override { return Status::OK(); } +}; + +class MockAggLocalState : public AggLocalState { + ENABLE_FACTORY_CREATOR(MockAggLocalState); + +public: + MockAggLocalState(RuntimeState* state, OperatorXBase* parent) : AggLocalState(state, parent) {}; +}; + +class MockAggSourceOperatorX : public AggSourceOperatorX { +public: + MockAggSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id, + const DescriptorTbl& descs) + : AggSourceOperatorX(pool, tnode, operator_id, descs) {} + ~MockAggSourceOperatorX() override = default; + + Status setup_local_state(RuntimeState* state, LocalStateInfo& info) override { + state->emplace_local_state(_operator_id, MockAggLocalState::create_unique(state, this)); + return Status::OK(); + } + + bool need_more_input_data(RuntimeState* state) const override { return need_more_data; } + bool need_more_data = true; + + vectorized::Block block; + bool eos = false; +}; + +class MockAggSinkOperatorX : public AggSinkOperatorX { +public: + MockAggSinkOperatorX() = default; + ~MockAggSinkOperatorX() override = default; +}; + +class PartitionedAggregationTestHelper : public SpillableOperatorTestHelper { +public: + ~PartitionedAggregationTestHelper() override = default; + TPlanNode create_test_plan_node() override; + TDescriptorTable create_test_table_descriptor(bool nullable) override; + + PartitionedAggLocalState* create_source_local_state( + RuntimeState* state, PartitionedAggSourceOperatorX* source_operator, + std::shared_ptr<MockPartitionedAggSharedState>& shared_state); + + PartitionedAggSinkLocalState* create_sink_local_state( + RuntimeState* state, PartitionedAggSinkOperatorX* sink_operator, + std::shared_ptr<MockPartitionedAggSharedState>& shared_state); + + std::tuple<std::shared_ptr<PartitionedAggSourceOperatorX>, + std::shared_ptr<PartitionedAggSinkOperatorX>> + create_operators(); +}; +} // namespace doris::pipeline \ No newline at end of file diff --git a/be/test/pipeline/operator/partitioned_hash_join_probe_operator_test.cpp b/be/test/pipeline/operator/partitioned_hash_join_probe_operator_test.cpp index 67378fe66a8..9f2c3b48790 100644 --- a/be/test/pipeline/operator/partitioned_hash_join_probe_operator_test.cpp +++ b/be/test/pipeline/operator/partitioned_hash_join_probe_operator_test.cpp @@ -22,11 +22,9 @@ #include <memory> #include "common/config.h" -#include "olap/olap_define.h" #include "partitioned_hash_join_test_helper.h" #include "pipeline/exec/hashjoin_build_sink.h" #include "pipeline/exec/partitioned_hash_join_sink_operator.h" -#include "pipeline/exec/spill_utils.h" #include "pipeline/pipeline_task.h" #include "runtime/exec_env.h" #include "runtime/fragment_mgr.h" @@ -34,7 +32,6 @@ #include "testutil/creators.h" #include "testutil/mock/mock_operators.h" #include "testutil/mock/mock_runtime_state.h" -#include "util/debug_points.h" #include "util/runtime_profile.h" #include "vec/core/block.h" #include "vec/data_types/data_type_number.h" @@ -426,13 +423,7 @@ TEST_F(PartitionedHashJoinProbeOperatorTest, RecoverProbeBlocksFromDiskError) { ASSERT_TRUE(spilling_stream->spill_eof().ok()); } - Status spill_status; - ExecEnv::GetInstance()->_fragment_mgr = - new MockFragmentManager(spill_status, ExecEnv::GetInstance()); - - const auto enable_debug_points = config::enable_debug_points; - config::enable_debug_points = true; - DebugPoints::instance()->add("fault_inject::spill_stream::read_next_block"); + SpillableDebugPointHelper dp_helper("fault_inject::spill_stream::read_next_block"); bool has_data = false; ASSERT_TRUE(local_state ->recover_probe_blocks_from_disk(_helper.runtime_state.get(), @@ -446,12 +437,10 @@ TEST_F(PartitionedHashJoinProbeOperatorTest, RecoverProbeBlocksFromDiskError) { ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(spilling_stream); spilling_stream.reset(); - config::enable_debug_points = enable_debug_points; - - ASSERT_FALSE(spill_status.ok()); - ASSERT_TRUE(spill_status.to_string().find("fault_inject spill_stream read_next_block") != - std::string::npos) - << "unexpected error: " << spill_status.to_string(); + ASSERT_FALSE(dp_helper.get_spill_status().ok()); + ASSERT_TRUE(dp_helper.get_spill_status().to_string().find( + "fault_inject spill_stream read_next_block") != std::string::npos) + << "unexpected error: " << dp_helper.get_spill_status().to_string(); } TEST_F(PartitionedHashJoinProbeOperatorTest, RecoverBuildBlocksFromDisk) { @@ -784,14 +773,9 @@ TEST_F(PartitionedHashJoinProbeOperatorTest, RecoverBuildBlocksFromDiskError) { ASSERT_TRUE(local_state->_recovered_build_block == nullptr); - Status spill_status; - ExecEnv::GetInstance()->_fragment_mgr = - new MockFragmentManager(spill_status, ExecEnv::GetInstance()); - - const auto enable_debug_points = config::enable_debug_points; - config::enable_debug_points = true; // Test error handling with fault injection - DebugPoints::instance()->add("fault_inject::partitioned_hash_join_probe::recover_build_blocks"); + SpillableDebugPointHelper dp_helper( + "fault_inject::partitioned_hash_join_probe::recover_build_blocks"); bool has_data = false; auto status = local_state->recover_build_blocks_from_disk(_helper.runtime_state.get(), test_partition, has_data); @@ -801,7 +785,7 @@ TEST_F(PartitionedHashJoinProbeOperatorTest, RecoverBuildBlocksFromDiskError) { std::this_thread::sleep_for(std::chrono::milliseconds(10)); } - config::enable_debug_points = enable_debug_points; + auto spill_status = dp_helper.get_spill_status(); ASSERT_FALSE(spill_status.ok()); ASSERT_TRUE(spill_status.to_string().find("fault_inject partitioned_hash_join_probe " "recover_build_blocks failed") != std::string::npos) diff --git a/be/test/pipeline/operator/partitioned_hash_join_sink_operator_test.cpp b/be/test/pipeline/operator/partitioned_hash_join_sink_operator_test.cpp index 0514d481d3a..29cb355b0a0 100644 --- a/be/test/pipeline/operator/partitioned_hash_join_sink_operator_test.cpp +++ b/be/test/pipeline/operator/partitioned_hash_join_sink_operator_test.cpp @@ -26,33 +26,25 @@ #include <gtest/gtest.h> #include <memory> -#include <sstream> #include <vector> #include "common/config.h" -#include "common/exception.h" #include "partitioned_hash_join_test_helper.h" #include "pipeline/common/data_gen_functions/vnumbers_tvf.h" #include "pipeline/exec/operator.h" #include "pipeline/exec/partitioned_hash_join_probe_operator.h" #include "pipeline/pipeline_task.h" -#include "runtime/define_primitive_type.h" -#include "runtime/descriptor_helper.h" #include "runtime/descriptors.h" #include "runtime/exec_env.h" #include "runtime/query_context.h" #include "runtime/runtime_state.h" #include "testutil/column_helper.h" -#include "testutil/mock/mock_data_stream_sender.h" -#include "testutil/mock/mock_descriptors.h" #include "testutil/mock/mock_operators.h" #include "testutil/mock/mock_runtime_state.h" #include "util/runtime_profile.h" #include "vec/core/block.h" -#include "vec/data_types/data_type.h" #include "vec/data_types/data_type_number.h" #include "vec/exprs/vexpr_context.h" -#include "vec/exprs/vexpr_fwd.h" #include "vec/spill/spill_stream_manager.h" namespace doris::pipeline { diff --git a/be/test/pipeline/operator/partitioned_hash_join_test_helper.cpp b/be/test/pipeline/operator/partitioned_hash_join_test_helper.cpp index 03f9b3f8ae0..001bcd8e224 100644 --- a/be/test/pipeline/operator/partitioned_hash_join_test_helper.cpp +++ b/be/test/pipeline/operator/partitioned_hash_join_test_helper.cpp @@ -26,57 +26,19 @@ #include <gtest/gtest.h> #include <memory> -#include <sstream> #include <vector> -namespace doris::pipeline { -void PartitionedHashJoinTestHelper::SetUp() { - runtime_state = std::make_unique<MockRuntimeState>(); - obj_pool = std::make_unique<ObjectPool>(); - - runtime_profile = std::make_shared<RuntimeProfile>("test"); - - query_ctx = generate_one_query(); - - runtime_state->_query_ctx = query_ctx.get(); - runtime_state->_query_id = query_ctx->query_id(); - runtime_state->resize_op_id_to_local_state(-100); - - ADD_TIMER(runtime_profile.get(), "ExecTime"); - runtime_profile->AddHighWaterMarkCounter("MemoryUsed", TUnit::BYTES, "", 0); - - auto desc_table = create_test_table_descriptor(); - auto st = DescriptorTbl::create(obj_pool.get(), desc_table, &desc_tbl); - DCHECK(!desc_table.slotDescriptors.empty()); - EXPECT_TRUE(st.ok()) << "create descriptor table failed: " << st.to_string(); - runtime_state->set_desc_tbl(desc_tbl); - - auto spill_data_dir = std::make_unique<vectorized::SpillDataDir>("/tmp/partitioned_join_test", - 1024L * 1024 * 4); - st = io::global_local_filesystem()->create_directory(spill_data_dir->path(), false); - EXPECT_TRUE(st.ok()) << "create directory: " << spill_data_dir->path() - << " failed: " << st.to_string(); - std::unordered_map<std::string, std::unique_ptr<vectorized::SpillDataDir>> data_map; - data_map.emplace("test", std::move(spill_data_dir)); - auto* spill_stream_manager = new vectorized::SpillStreamManager(std::move(data_map)); - ExecEnv::GetInstance()->_spill_stream_mgr = spill_stream_manager; - st = spill_stream_manager->init(); - EXPECT_TRUE(st.ok()) << "init spill stream manager failed: " << st.to_string(); -} - -void PartitionedHashJoinTestHelper::TearDown() { - ExecEnv::GetInstance()->spill_stream_mgr()->async_cleanup_query(runtime_state->query_id()); - doris::ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->wait(); - doris::ExecEnv::GetInstance()->spill_stream_mgr()->stop(); - SAFE_DELETE(ExecEnv::GetInstance()->_spill_stream_mgr); -} +#include "testutil/creators.h" +#include "testutil/mock/mock_operators.h" +namespace doris::pipeline { TPlanNode PartitionedHashJoinTestHelper::create_test_plan_node() { TPlanNode tnode; tnode.node_id = 0; tnode.node_type = TPlanNodeType::HASH_JOIN_NODE; tnode.num_children = 2; tnode.hash_join_node.join_op = TJoinOp::INNER_JOIN; + tnode.limit = -1; TEqJoinCondition eq_cond; eq_cond.left = TExpr(); @@ -118,6 +80,32 @@ TPlanNode PartitionedHashJoinTestHelper::create_test_plan_node() { return tnode; } +TDescriptorTable PartitionedHashJoinTestHelper::create_test_table_descriptor( + bool nullable = false) { + TTupleDescriptorBuilder tuple_builder; + tuple_builder.add_slot(TSlotDescriptorBuilder() + .type(PrimitiveType::TYPE_INT) + .column_name("col1") + .column_pos(0) + .nullable(nullable) + .build()); + + TDescriptorTableBuilder builder; + + tuple_builder.build(&builder); + + TTupleDescriptorBuilder() + .add_slot(TSlotDescriptorBuilder() + .type(TYPE_INT) + .column_name("col2") + .column_pos(0) + .nullable(nullable) + .build()) + .build(&builder); + + return builder.desc_tbl(); +} + std::tuple<std::shared_ptr<PartitionedHashJoinProbeOperatorX>, std::shared_ptr<PartitionedHashJoinSinkOperatorX>> PartitionedHashJoinTestHelper::create_operators() { @@ -134,8 +122,8 @@ PartitionedHashJoinTestHelper::create_operators() { auto child_operator = std::make_shared<MockChildOperator>(); auto probe_side_source_operator = std::make_shared<MockChildOperator>(); auto probe_side_sink_operator = std::make_shared<MockSinkOperator>(); - auto [probe_pipeline, _] = generate_hash_join_pipeline(probe_operator, child_operator, - probe_side_sink_operator, sink_operator); + auto [probe_pipeline, _] = generate_hash_join_pipeline(probe_operator, probe_side_sink_operator, + sink_operator, child_operator); RowDescriptor row_desc(runtime_state->desc_tbl(), {1}, {false}); child_operator->_row_descriptor = row_desc; diff --git a/be/test/pipeline/operator/partitioned_hash_join_test_helper.h b/be/test/pipeline/operator/partitioned_hash_join_test_helper.h index 95865aea21e..0628a81688d 100644 --- a/be/test/pipeline/operator/partitioned_hash_join_test_helper.h +++ b/be/test/pipeline/operator/partitioned_hash_join_test_helper.h @@ -24,75 +24,22 @@ #include <gtest/gtest.h> #include <memory> -#include <sstream> #include <vector> #include "common/config.h" #include "common/object_pool.h" -#include "olap/olap_define.h" +#include "pipeline/exec/partitioned_hash_join_probe_operator.h" #include "pipeline/exec/partitioned_hash_join_sink_operator.h" -#include "pipeline/exec/spill_utils.h" #include "pipeline/pipeline_task.h" #include "runtime/exec_env.h" #include "runtime/fragment_mgr.h" -#include "testutil/column_helper.h" -#include "testutil/creators.h" -#include "testutil/mock/mock_operators.h" +#include "spillable_operator_test_helper.h" #include "testutil/mock/mock_runtime_state.h" -#include "util/debug_points.h" #include "util/runtime_profile.h" #include "vec/core/block.h" -#include "vec/data_types/data_type_number.h" #include "vec/spill/spill_stream_manager.h" namespace doris::pipeline { - -class MockPartitioner : public vectorized::PartitionerBase { -public: - MockPartitioner(size_t partition_count) : PartitionerBase(partition_count) {} - Status init(const std::vector<TExpr>& texprs) override { return Status::OK(); } - - Status prepare(RuntimeState* state, const RowDescriptor& row_desc) override { - return Status::OK(); - } - - Status open(RuntimeState* state) override { return Status::OK(); } - - Status close(RuntimeState* state) override { return Status::OK(); } - - Status do_partitioning(RuntimeState* state, vectorized::Block* block, bool eos, - bool* already_sent) const override { - if (already_sent) { - *already_sent = false; - } - return Status::OK(); - } - - Status clone(RuntimeState* state, std::unique_ptr<PartitionerBase>& partitioner) override { - partitioner = std::make_unique<MockPartitioner>(_partition_count); - return Status::OK(); - } - - vectorized::ChannelField get_channel_ids() const override { return {}; } -}; - -class MockExpr : public vectorized::VExpr { -public: - Status prepare(RuntimeState* state, const RowDescriptor& row_desc, - vectorized::VExprContext* context) override { - return Status::OK(); - } - - Status open(RuntimeState* state, vectorized::VExprContext* context, - FunctionContext::FunctionStateScope scope) override { - return Status::OK(); - } -}; - -class MockHashJoinBuildSharedState : public HashJoinSharedState { -public: -}; - class MockPartitionedHashJoinSharedState : public PartitionedHashJoinSharedState { public: MockPartitionedHashJoinSharedState() { @@ -164,16 +111,6 @@ public: std::string get_memory_usage_debug_str(RuntimeState* state) const override { return "mock"; } }; -class MockFragmentManager : public FragmentMgr { -public: - MockFragmentManager(Status& status_, ExecEnv* exec_env) - : FragmentMgr(exec_env), status(status_) {} - void cancel_query(const TUniqueId query_id, const Status reason) override { status = reason; } - -private: - Status& status; -}; - class MockHashJoinProbeLocalState : public HashJoinProbeLocalState { ENABLE_FACTORY_CREATOR(MockHashJoinProbeLocalState); @@ -262,12 +199,12 @@ public: void update_profile_from_inner() override {} }; -class PartitionedHashJoinTestHelper { +class PartitionedHashJoinTestHelper : public SpillableOperatorTestHelper { public: - void SetUp(); - void TearDown(); + ~PartitionedHashJoinTestHelper() override = default; + TPlanNode create_test_plan_node() override; - TPlanNode create_test_plan_node(); + TDescriptorTable create_test_table_descriptor(bool nullable) override; PartitionedHashJoinProbeLocalState* create_probe_local_state( RuntimeState* state, PartitionedHashJoinProbeOperatorX* probe_operator, @@ -280,13 +217,5 @@ public: std::tuple<std::shared_ptr<PartitionedHashJoinProbeOperatorX>, std::shared_ptr<PartitionedHashJoinSinkOperatorX>> create_operators(); - - std::unique_ptr<MockRuntimeState> runtime_state; - std::unique_ptr<ObjectPool> obj_pool; - std::shared_ptr<QueryContext> query_ctx; - std::shared_ptr<RuntimeProfile> runtime_profile; - std::shared_ptr<PipelineTask> pipeline_task; - DescriptorTbl* desc_tbl; - static constexpr uint32_t TEST_PARTITION_COUNT = 8; }; } // namespace doris::pipeline \ No newline at end of file diff --git a/be/test/pipeline/operator/spillable_operator_test_helper.cpp b/be/test/pipeline/operator/spillable_operator_test_helper.cpp new file mode 100644 index 00000000000..b3ebba37aa1 --- /dev/null +++ b/be/test/pipeline/operator/spillable_operator_test_helper.cpp @@ -0,0 +1,76 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "spillable_operator_test_helper.h" + +#include <gen_cpp/DataSinks_types.h> +#include <gen_cpp/Descriptors_types.h> +#include <gmock/gmock-actions.h> +#include <gmock/gmock-function-mocker.h> +#include <gmock/gmock-spec-builders.h> +#include <gmock/gmock.h> +#include <gtest/gtest.h> + +#include <memory> +#include <vector> + +#include "testutil/creators.h" + +namespace doris::pipeline { +void SpillableOperatorTestHelper::SetUp() { + runtime_state = std::make_unique<MockRuntimeState>(); + obj_pool = std::make_unique<ObjectPool>(); + + runtime_profile = std::make_shared<RuntimeProfile>("test"); + + query_ctx = generate_one_query(); + + runtime_state->_query_ctx = query_ctx.get(); + runtime_state->_query_id = query_ctx->query_id(); + runtime_state->resize_op_id_to_local_state(-100); + runtime_state->set_max_operator_id(-100); + + ADD_TIMER(runtime_profile.get(), "ExecTime"); + runtime_profile->AddHighWaterMarkCounter("MemoryUsed", TUnit::BYTES, "", 0); + + auto desc_table = create_test_table_descriptor(false); + auto st = DescriptorTbl::create(obj_pool.get(), desc_table, &desc_tbl); + DCHECK(!desc_table.slotDescriptors.empty()); + EXPECT_TRUE(st.ok()) << "create descriptor table failed: " << st.to_string(); + runtime_state->set_desc_tbl(desc_tbl); + + auto spill_data_dir = std::make_unique<vectorized::SpillDataDir>("/tmp/partitioned_join_test", + 1024L * 1024 * 4); + st = io::global_local_filesystem()->create_directory(spill_data_dir->path(), false); + EXPECT_TRUE(st.ok()) << "create directory: " << spill_data_dir->path() + << " failed: " << st.to_string(); + std::unordered_map<std::string, std::unique_ptr<vectorized::SpillDataDir>> data_map; + data_map.emplace("test", std::move(spill_data_dir)); + auto* spill_stream_manager = new vectorized::SpillStreamManager(std::move(data_map)); + ExecEnv::GetInstance()->_spill_stream_mgr = spill_stream_manager; + st = spill_stream_manager->init(); + EXPECT_TRUE(st.ok()) << "init spill stream manager failed: " << st.to_string(); +} + +void SpillableOperatorTestHelper::TearDown() { + ExecEnv::GetInstance()->spill_stream_mgr()->async_cleanup_query(runtime_state->query_id()); + doris::ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->wait(); + doris::ExecEnv::GetInstance()->spill_stream_mgr()->stop(); + SAFE_DELETE(ExecEnv::GetInstance()->_spill_stream_mgr); +} + +} // namespace doris::pipeline \ No newline at end of file diff --git a/be/test/pipeline/operator/spillable_operator_test_helper.h b/be/test/pipeline/operator/spillable_operator_test_helper.h new file mode 100644 index 00000000000..2067412ed3c --- /dev/null +++ b/be/test/pipeline/operator/spillable_operator_test_helper.h @@ -0,0 +1,132 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <gen_cpp/DataSinks_types.h> +#include <gen_cpp/Descriptors_types.h> +#include <gmock/gmock-actions.h> +#include <gmock/gmock-function-mocker.h> +#include <gmock/gmock-spec-builders.h> +#include <gmock/gmock.h> +#include <gtest/gtest.h> + +#include <memory> + +#include "common/object_pool.h" +#include "pipeline/pipeline_task.h" +#include "runtime/fragment_mgr.h" +#include "testutil/mock/mock_runtime_state.h" +#include "util/runtime_profile.h" +#include "vec/spill/spill_stream_manager.h" + +namespace doris::pipeline { + +class MockPartitioner : public vectorized::PartitionerBase { +public: + MockPartitioner(size_t partition_count) : PartitionerBase(partition_count) {} + Status init(const std::vector<TExpr>& texprs) override { return Status::OK(); } + + Status prepare(RuntimeState* state, const RowDescriptor& row_desc) override { + return Status::OK(); + } + + Status open(RuntimeState* state) override { return Status::OK(); } + + Status close(RuntimeState* state) override { return Status::OK(); } + + Status do_partitioning(RuntimeState* state, vectorized::Block* block, bool eos, + bool* already_sent) const override { + if (already_sent) { + *already_sent = false; + } + return Status::OK(); + } + + Status clone(RuntimeState* state, std::unique_ptr<PartitionerBase>& partitioner) override { + partitioner = std::make_unique<MockPartitioner>(_partition_count); + return Status::OK(); + } + + vectorized::ChannelField get_channel_ids() const override { return {}; } +}; + +class MockExpr : public vectorized::VExpr { +public: + Status prepare(RuntimeState* state, const RowDescriptor& row_desc, + vectorized::VExprContext* context) override { + return Status::OK(); + } + + Status open(RuntimeState* state, vectorized::VExprContext* context, + FunctionContext::FunctionStateScope scope) override { + return Status::OK(); + } +}; + +class MockFragmentManager : public FragmentMgr { +public: + MockFragmentManager(Status& status_, ExecEnv* exec_env) + : FragmentMgr(exec_env), status(status_) {} + void cancel_query(const TUniqueId query_id, const Status reason) override { status = reason; } + +private: + Status& status; +}; + +class SpillableDebugPointHelper { +public: + SpillableDebugPointHelper(const std::string name) + : _enable_debug_points(config::enable_debug_points), + _fragment_mgr(ExecEnv::GetInstance()->_fragment_mgr) { + config::enable_debug_points = true; + ExecEnv::GetInstance()->_fragment_mgr = + new MockFragmentManager(_spill_status, ExecEnv::GetInstance()); + DebugPoints::instance()->add(name); + } + + ~SpillableDebugPointHelper() { + config::enable_debug_points = _enable_debug_points; + ExecEnv::GetInstance()->_fragment_mgr->stop(); + SAFE_DELETE(ExecEnv::GetInstance()->_fragment_mgr); + ExecEnv::GetInstance()->_fragment_mgr = _fragment_mgr; + } + + const Status& get_spill_status() const { return _spill_status; } + +private: + Status _spill_status; + const bool _enable_debug_points; + FragmentMgr* const _fragment_mgr; +}; + +class SpillableOperatorTestHelper { +public: + virtual ~SpillableOperatorTestHelper() = default; + void SetUp(); + void TearDown(); + + virtual TPlanNode create_test_plan_node() = 0; + virtual TDescriptorTable create_test_table_descriptor(bool nullable) = 0; + + std::unique_ptr<MockRuntimeState> runtime_state; + std::unique_ptr<ObjectPool> obj_pool; + std::shared_ptr<QueryContext> query_ctx; + std::shared_ptr<RuntimeProfile> runtime_profile; + std::shared_ptr<PipelineTask> pipeline_task; + DescriptorTbl* desc_tbl; + static constexpr uint32_t TEST_PARTITION_COUNT = 8; +}; +} // namespace doris::pipeline \ No newline at end of file diff --git a/be/test/testutil/creators.h b/be/test/testutil/creators.h index 91064ade29e..db0ee465b60 100644 --- a/be/test/testutil/creators.h +++ b/be/test/testutil/creators.h @@ -55,35 +55,10 @@ inline std::shared_ptr<QueryContext> generate_one_query() { return generate_one_query(query_options); } -inline TDescriptorTable create_test_table_descriptor(bool nullable = false) { - TTupleDescriptorBuilder tuple_builder; - tuple_builder.add_slot(TSlotDescriptorBuilder() - .type(PrimitiveType::TYPE_INT) - .column_name("col1") - .column_pos(0) - .nullable(nullable) - .build()); - - TDescriptorTableBuilder builder; - - tuple_builder.build(&builder); - - TTupleDescriptorBuilder() - .add_slot(TSlotDescriptorBuilder() - .type(TYPE_INT) - .column_name("col2") - .column_pos(0) - .nullable(nullable) - .build()) - .build(&builder); - - return builder.desc_tbl(); -} - inline std::pair<pipeline::PipelinePtr, pipeline::PipelinePtr> generate_hash_join_pipeline( std::shared_ptr<OperatorXBase> probe_operator, - std::shared_ptr<OperatorXBase> build_side_source, - pipeline::DataSinkOperatorPtr probe_side_sink_operator, DataSinkOperatorPtr sink_operator) { + pipeline::DataSinkOperatorPtr probe_side_sink_operator, DataSinkOperatorPtr sink_operator, + std::shared_ptr<OperatorXBase> build_side_source) { auto probe_pipeline = std::make_shared<pipeline::Pipeline>(0, 1, 1); auto build_pipeline = std::make_shared<pipeline::Pipeline>(1, 1, 1); @@ -95,6 +70,21 @@ inline std::pair<pipeline::PipelinePtr, pipeline::PipelinePtr> generate_hash_joi return {probe_pipeline, build_pipeline}; } +inline std::pair<pipeline::PipelinePtr, pipeline::PipelinePtr> generate_agg_pipeline( + std::shared_ptr<OperatorXBase> source_operator, + pipeline::DataSinkOperatorPtr source_side_sink_operator, DataSinkOperatorPtr sink_operator, + std::shared_ptr<OperatorXBase> sink_side_source) { + auto source_pipeline = std::make_shared<pipeline::Pipeline>(0, 1, 1); + auto sink_pipeline = std::make_shared<pipeline::Pipeline>(1, 1, 1); + + static_cast<void>(source_pipeline->add_operator(source_operator, 1)); + static_cast<void>(source_pipeline->set_sink(source_side_sink_operator)); + static_cast<void>(sink_pipeline->add_operator(sink_side_source, 1)); + static_cast<void>(sink_pipeline->set_sink(sink_operator)); + + return {source_pipeline, sink_pipeline}; +} + inline std::unique_ptr<SpillPartitionerType> create_spill_partitioner( RuntimeState* state, const int32_t partition_count, const std::vector<TExpr>& exprs, const RowDescriptor& row_desc) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org