This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 083af74c255 [test](ut) add cases about partitioned aggregation 
operators (#48925)
083af74c255 is described below

commit 083af74c255b12e90749574e6b76b7958bc7dcdd
Author: Jerry Hu <hushengg...@selectdb.com>
AuthorDate: Mon Mar 17 18:49:16 2025 +0800

    [test](ut) add cases about partitioned aggregation operators (#48925)
---
 be/src/pipeline/exec/aggregation_source_operator.h |   3 +-
 .../exec/partitioned_aggregation_sink_operator.cpp |   7 +-
 .../exec/partitioned_aggregation_source_operator.h |   3 +-
 .../partitioned_aggregation_sink_operator_test.cpp | 467 ++++++++++++++++++++
 ...artitioned_aggregation_source_operator_test.cpp | 470 +++++++++++++++++++++
 .../partitioned_aggregation_test_helper.cpp        | 238 +++++++++++
 .../operator/partitioned_aggregation_test_helper.h | 155 +++++++
 .../partitioned_hash_join_probe_operator_test.cpp  |  32 +-
 .../partitioned_hash_join_sink_operator_test.cpp   |   8 -
 .../operator/partitioned_hash_join_test_helper.cpp |  76 ++--
 .../operator/partitioned_hash_join_test_helper.h   |  83 +---
 .../operator/spillable_operator_test_helper.cpp    |  76 ++++
 .../operator/spillable_operator_test_helper.h      | 132 ++++++
 be/test/testutil/creators.h                        |  44 +-
 14 files changed, 1606 insertions(+), 188 deletions(-)

diff --git a/be/src/pipeline/exec/aggregation_source_operator.h 
b/be/src/pipeline/exec/aggregation_source_operator.h
index f18e9345b44..4d29cfb603d 100644
--- a/be/src/pipeline/exec/aggregation_source_operator.h
+++ b/be/src/pipeline/exec/aggregation_source_operator.h
@@ -18,6 +18,7 @@
 
 #include <stdint.h>
 
+#include "common/be_mock_util.h"
 #include "common/status.h"
 #include "operator.h"
 
@@ -28,7 +29,7 @@ namespace pipeline {
 #include "common/compile_check_begin.h"
 class AggSourceOperatorX;
 
-class AggLocalState final : public PipelineXLocalState<AggSharedState> {
+class AggLocalState MOCK_REMOVE(final) : public 
PipelineXLocalState<AggSharedState> {
 public:
     using Base = PipelineXLocalState<AggSharedState>;
     ENABLE_FACTORY_CREATOR(AggLocalState);
diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp 
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
index e003ea23240..521da54f21f 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
@@ -130,8 +130,6 @@ Status PartitionedAggSinkOperatorX::init(const TPlanNode& 
tnode, RuntimeState* s
     
RETURN_IF_ERROR(DataSinkOperatorX<PartitionedAggSinkLocalState>::init(tnode, 
state));
     _name = "PARTITIONED_AGGREGATION_SINK_OPERATOR";
     _spill_partition_count = state->spill_aggregation_partition_count();
-    RETURN_IF_ERROR(
-            
_agg_sink_operator->set_child(DataSinkOperatorX<PartitionedAggSinkLocalState>::_child));
     return _agg_sink_operator->init(tnode, state);
 }
 
@@ -255,10 +253,6 @@ Status PartitionedAggSinkLocalState::revoke_memory(
         update_profile<true>(sink_local_state->profile());
     }
 
-    // TODO: spill thread may set_ready before the task::execute thread put 
the task to blocked state
-    if (!_eos) {
-        Base::_spill_dependency->Dependency::block();
-    }
     auto& parent = Base::_parent->template cast<Parent>();
     Status status;
     Defer defer {[&]() {
@@ -331,6 +325,7 @@ Status PartitionedAggSinkLocalState::revoke_memory(
                 return status;
             });
 
+    Base::_spill_dependency->Dependency::block();
     return 
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->submit(
             std::move(spill_runnable));
 }
diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.h 
b/be/src/pipeline/exec/partitioned_aggregation_source_operator.h
index 24e56df1be8..c7d4b21af56 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.h
+++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.h
@@ -30,7 +30,8 @@ namespace pipeline {
 class PartitionedAggSourceOperatorX;
 class PartitionedAggLocalState;
 
-class PartitionedAggLocalState final : public 
PipelineXSpillLocalState<PartitionedAggSharedState> {
+class PartitionedAggLocalState MOCK_REMOVE(final)
+        : public PipelineXSpillLocalState<PartitionedAggSharedState> {
 public:
     ENABLE_FACTORY_CREATOR(PartitionedAggLocalState);
     using Base = PipelineXSpillLocalState<PartitionedAggSharedState>;
diff --git 
a/be/test/pipeline/operator/partitioned_aggregation_sink_operator_test.cpp 
b/be/test/pipeline/operator/partitioned_aggregation_sink_operator_test.cpp
new file mode 100644
index 00000000000..930eba76fd6
--- /dev/null
+++ b/be/test/pipeline/operator/partitioned_aggregation_sink_operator_test.cpp
@@ -0,0 +1,467 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "pipeline/exec/partitioned_aggregation_sink_operator.h"
+
+#include <gtest/gtest.h>
+
+#include <memory>
+
+#include "common/config.h"
+#include "partitioned_aggregation_test_helper.h"
+#include "pipeline/exec/aggregation_sink_operator.h"
+#include "pipeline/exec/partitioned_hash_join_probe_operator.h"
+#include "pipeline/exec/partitioned_hash_join_sink_operator.h"
+#include "pipeline/pipeline_task.h"
+#include "runtime/fragment_mgr.h"
+#include "testutil/column_helper.h"
+#include "testutil/mock/mock_runtime_state.h"
+#include "util/runtime_profile.h"
+#include "vec/core/block.h"
+#include "vec/data_types/data_type_number.h"
+#include "vec/spill/spill_stream_manager.h"
+
+namespace doris::pipeline {
+class PartitionedAggregationSinkOperatorTest : public testing::Test {
+protected:
+    void SetUp() override { _helper.SetUp(); }
+    void TearDown() override { _helper.TearDown(); }
+    PartitionedAggregationTestHelper _helper;
+};
+
+TEST_F(PartitionedAggregationSinkOperatorTest, Init) {
+    auto [source_operator, sink_operator] = _helper.create_operators();
+    ASSERT_TRUE(source_operator != nullptr);
+    ASSERT_TRUE(sink_operator != nullptr);
+
+    const auto tnode = _helper.create_test_plan_node();
+    auto st = sink_operator->init(tnode, _helper.runtime_state.get());
+    ASSERT_TRUE(st.ok()) << "init failed: " << st.to_string();
+
+    st = sink_operator->prepare(_helper.runtime_state.get());
+    ASSERT_TRUE(st.ok()) << "prepare failed: " << st.to_string();
+
+    std::shared_ptr<MockPartitionedAggSharedState> shared_state =
+            MockPartitionedAggSharedState::create_shared();
+
+    LocalSinkStateInfo info {.task_idx = 0,
+                             .parent_profile = _helper.runtime_profile.get(),
+                             .sender_id = 0,
+                             .shared_state = shared_state.get(),
+                             .le_state_map = {},
+                             .tsink = TDataSink()};
+    st = sink_operator->setup_local_state(_helper.runtime_state.get(), info);
+    ASSERT_TRUE(st.ok()) << "setup_local_state failed: " << st.to_string();
+
+    auto* local_state = _helper.runtime_state->get_sink_local_state();
+    ASSERT_TRUE(local_state != nullptr);
+
+    st = local_state->open(_helper.runtime_state.get());
+    ASSERT_TRUE(st.ok()) << "open failed: " << st.to_string();
+
+    st = sink_operator->close(_helper.runtime_state.get(), st);
+    ASSERT_TRUE(st.ok()) << "close failed: " << st.to_string();
+
+    st = local_state->close(_helper.runtime_state.get(), st);
+    ASSERT_TRUE(st.ok()) << "close failed: " << st.to_string();
+}
+
+TEST_F(PartitionedAggregationSinkOperatorTest, Sink) {
+    auto [source_operator, sink_operator] = _helper.create_operators();
+    ASSERT_TRUE(source_operator != nullptr);
+    ASSERT_TRUE(sink_operator != nullptr);
+
+    const auto tnode = _helper.create_test_plan_node();
+    auto st = sink_operator->init(tnode, _helper.runtime_state.get());
+    ASSERT_TRUE(st.ok()) << "init failed: " << st.to_string();
+
+    st = sink_operator->prepare(_helper.runtime_state.get());
+    ASSERT_TRUE(st.ok()) << "prepare failed: " << st.to_string();
+
+    auto shared_state = sink_operator->create_shared_state();
+    auto* dep = 
shared_state->create_source_dependency(source_operator->operator_id(),
+                                                       
source_operator->node_id(),
+                                                       
"PartitionedAggSinkTestDep");
+
+    LocalSinkStateInfo info {.task_idx = 0,
+                             .parent_profile = _helper.runtime_profile.get(),
+                             .sender_id = 0,
+                             .shared_state = shared_state.get(),
+                             .le_state_map = {},
+                             .tsink = TDataSink()};
+    st = sink_operator->setup_local_state(_helper.runtime_state.get(), info);
+    ASSERT_TRUE(st.ok()) << "setup_local_state failed: " << st.to_string();
+
+    auto* local_state = _helper.runtime_state->get_sink_local_state();
+    ASSERT_TRUE(local_state != nullptr);
+
+    st = local_state->open(_helper.runtime_state.get());
+    ASSERT_TRUE(st.ok()) << "open failed: " << st.to_string();
+
+    auto block = 
vectorized::ColumnHelper::create_block<vectorized::DataTypeInt32>(
+            {1, 2, 3, 4, 2, 3, 4, 3, 4, 4});
+
+    
block.insert(vectorized::ColumnHelper::create_column_with_name<vectorized::DataTypeInt32>(
+            {1, 2, 3, 4, 2, 3, 4, 3, 4, 4}));
+
+    ASSERT_GT(sink_operator->get_reserve_mem_size(_helper.runtime_state.get(), 
false), 0);
+    st = sink_operator->sink(_helper.runtime_state.get(), &block, false);
+    ASSERT_TRUE(st.ok()) << "sink failed: " << st.to_string();
+
+    ASSERT_GT(sink_operator->get_reserve_mem_size(_helper.runtime_state.get(), 
true), 0);
+    st = sink_operator->sink(_helper.runtime_state.get(), &block, true);
+    ASSERT_TRUE(st.ok()) << "sink failed: " << st.to_string();
+    ASSERT_TRUE(dep->is_blocked_by(nullptr) == nullptr);
+
+    st = sink_operator->close(_helper.runtime_state.get(), st);
+    ASSERT_TRUE(st.ok()) << "close failed: " << st.to_string();
+}
+
+TEST_F(PartitionedAggregationSinkOperatorTest, SinkWithEmptyEOS) {
+    auto [source_operator, sink_operator] = _helper.create_operators();
+    ASSERT_TRUE(source_operator != nullptr);
+    ASSERT_TRUE(sink_operator != nullptr);
+
+    const auto tnode = _helper.create_test_plan_node();
+    auto st = sink_operator->init(tnode, _helper.runtime_state.get());
+    ASSERT_TRUE(st.ok()) << "init failed: " << st.to_string();
+
+    st = sink_operator->prepare(_helper.runtime_state.get());
+    ASSERT_TRUE(st.ok()) << "prepare failed: " << st.to_string();
+
+    auto shared_state = sink_operator->create_shared_state();
+    auto* dep = 
shared_state->create_source_dependency(source_operator->operator_id(),
+                                                       
source_operator->node_id(),
+                                                       
"PartitionedAggSinkTestDep");
+
+    LocalSinkStateInfo info {.task_idx = 0,
+                             .parent_profile = _helper.runtime_profile.get(),
+                             .sender_id = 0,
+                             .shared_state = shared_state.get(),
+                             .le_state_map = {},
+                             .tsink = TDataSink()};
+    st = sink_operator->setup_local_state(_helper.runtime_state.get(), info);
+    ASSERT_TRUE(st.ok()) << "setup_local_state failed: " << st.to_string();
+
+    auto* local_state = _helper.runtime_state->get_sink_local_state();
+    ASSERT_TRUE(local_state != nullptr);
+
+    st = local_state->open(_helper.runtime_state.get());
+    ASSERT_TRUE(st.ok()) << "open failed: " << st.to_string();
+
+    auto block = 
vectorized::ColumnHelper::create_block<vectorized::DataTypeInt32>(
+            {1, 2, 3, 4, 2, 3, 4, 3, 4, 4});
+
+    
block.insert(vectorized::ColumnHelper::create_column_with_name<vectorized::DataTypeInt32>(
+            {1, 2, 3, 4, 2, 3, 4, 3, 4, 4}));
+
+    ASSERT_GT(sink_operator->get_reserve_mem_size(_helper.runtime_state.get(), 
false), 0);
+    st = sink_operator->sink(_helper.runtime_state.get(), &block, false);
+    ASSERT_TRUE(st.ok()) << "sink failed: " << st.to_string();
+
+    ASSERT_GT(sink_operator->get_reserve_mem_size(_helper.runtime_state.get(), 
true), 0);
+    block.clear_column_data();
+    st = sink_operator->sink(_helper.runtime_state.get(), &block, true);
+    ASSERT_TRUE(st.ok()) << "sink failed: " << st.to_string();
+    ASSERT_TRUE(dep->is_blocked_by(nullptr) == nullptr);
+
+    st = sink_operator->close(_helper.runtime_state.get(), st);
+    ASSERT_TRUE(st.ok()) << "close failed: " << st.to_string();
+}
+
+TEST_F(PartitionedAggregationSinkOperatorTest, SinkWithSpill) {
+    auto [source_operator, sink_operator] = _helper.create_operators();
+    ASSERT_TRUE(source_operator != nullptr);
+    ASSERT_TRUE(sink_operator != nullptr);
+
+    const auto tnode = _helper.create_test_plan_node();
+    auto st = sink_operator->init(tnode, _helper.runtime_state.get());
+    ASSERT_TRUE(st.ok()) << "init failed: " << st.to_string();
+
+    st = sink_operator->prepare(_helper.runtime_state.get());
+    ASSERT_TRUE(st.ok()) << "prepare failed: " << st.to_string();
+
+    auto shared_state = sink_operator->create_shared_state();
+    auto* dep = 
shared_state->create_source_dependency(source_operator->operator_id(),
+                                                       
source_operator->node_id(),
+                                                       
"PartitionedAggSinkTestDep");
+
+    LocalSinkStateInfo info {.task_idx = 0,
+                             .parent_profile = _helper.runtime_profile.get(),
+                             .sender_id = 0,
+                             .shared_state = shared_state.get(),
+                             .le_state_map = {},
+                             .tsink = TDataSink()};
+    st = sink_operator->setup_local_state(_helper.runtime_state.get(), info);
+    ASSERT_TRUE(st.ok()) << "setup_local_state failed: " << st.to_string();
+
+    auto* local_state = reinterpret_cast<PartitionedAggSinkLocalState*>(
+            _helper.runtime_state->get_sink_local_state());
+    ASSERT_TRUE(local_state != nullptr);
+
+    st = local_state->open(_helper.runtime_state.get());
+    ASSERT_TRUE(st.ok()) << "open failed: " << st.to_string();
+
+    auto block = 
vectorized::ColumnHelper::create_block<vectorized::DataTypeInt32>(
+            {1, 2, 3, 4, 2, 3, 4, 3, 4, 4});
+
+    
block.insert(vectorized::ColumnHelper::create_column_with_name<vectorized::DataTypeInt32>(
+            {1, 2, 3, 4, 2, 3, 4, 3, 4, 4}));
+
+    st = sink_operator->sink(_helper.runtime_state.get(), &block, false);
+    ASSERT_TRUE(st.ok()) << "sink failed: " << st.to_string();
+
+    ASSERT_GT(sink_operator->revocable_mem_size(_helper.runtime_state.get()), 
0);
+
+    auto* inner_sink_local_state = reinterpret_cast<AggSinkLocalState*>(
+            local_state->_runtime_state->get_sink_local_state());
+    ASSERT_GT(inner_sink_local_state->_get_hash_table_size(), 0);
+
+    st = sink_operator->revoke_memory(_helper.runtime_state.get(), nullptr);
+    ASSERT_TRUE(st.ok()) << "revoke_memory failed: " << st.to_string();
+
+    while (local_state->_spill_dependency->is_blocked_by(nullptr) != nullptr) {
+        std::this_thread::sleep_for(std::chrono::milliseconds(10));
+    }
+
+    ASSERT_EQ(inner_sink_local_state->_get_hash_table_size(), 0);
+
+    st = sink_operator->sink(_helper.runtime_state.get(), &block, true);
+    ASSERT_TRUE(st.ok()) << "sink failed: " << st.to_string();
+
+    while (local_state->_spill_dependency->is_blocked_by(nullptr) != nullptr) {
+        std::this_thread::sleep_for(std::chrono::milliseconds(10));
+    }
+
+    ASSERT_TRUE(dep->is_blocked_by(nullptr) == nullptr);
+
+    st = sink_operator->close(_helper.runtime_state.get(), st);
+    ASSERT_TRUE(st.ok()) << "close failed: " << st.to_string();
+}
+
+TEST_F(PartitionedAggregationSinkOperatorTest, SinkWithSpillAndEmptyEOS) {
+    auto [source_operator, sink_operator] = _helper.create_operators();
+    ASSERT_TRUE(source_operator != nullptr);
+    ASSERT_TRUE(sink_operator != nullptr);
+
+    const auto tnode = _helper.create_test_plan_node();
+    auto st = sink_operator->init(tnode, _helper.runtime_state.get());
+    ASSERT_TRUE(st.ok()) << "init failed: " << st.to_string();
+
+    st = sink_operator->prepare(_helper.runtime_state.get());
+    ASSERT_TRUE(st.ok()) << "prepare failed: " << st.to_string();
+
+    auto shared_state = sink_operator->create_shared_state();
+    auto* dep = 
shared_state->create_source_dependency(source_operator->operator_id(),
+                                                       
source_operator->node_id(),
+                                                       
"PartitionedAggSinkTestDep");
+
+    LocalSinkStateInfo info {.task_idx = 0,
+                             .parent_profile = _helper.runtime_profile.get(),
+                             .sender_id = 0,
+                             .shared_state = shared_state.get(),
+                             .le_state_map = {},
+                             .tsink = TDataSink()};
+    st = sink_operator->setup_local_state(_helper.runtime_state.get(), info);
+    ASSERT_TRUE(st.ok()) << "setup_local_state failed: " << st.to_string();
+
+    auto* local_state = reinterpret_cast<PartitionedAggSinkLocalState*>(
+            _helper.runtime_state->get_sink_local_state());
+    ASSERT_TRUE(local_state != nullptr);
+
+    st = local_state->open(_helper.runtime_state.get());
+    ASSERT_TRUE(st.ok()) << "open failed: " << st.to_string();
+
+    auto block = 
vectorized::ColumnHelper::create_block<vectorized::DataTypeInt32>(
+            {1, 2, 3, 4, 2, 3, 4, 3, 4, 4});
+
+    
block.insert(vectorized::ColumnHelper::create_column_with_name<vectorized::DataTypeInt32>(
+            {1, 2, 3, 4, 2, 3, 4, 3, 4, 4}));
+
+    st = sink_operator->sink(_helper.runtime_state.get(), &block, false);
+    ASSERT_TRUE(st.ok()) << "sink failed: " << st.to_string();
+
+    ASSERT_GT(sink_operator->revocable_mem_size(_helper.runtime_state.get()), 
0);
+
+    auto* inner_sink_local_state = reinterpret_cast<AggSinkLocalState*>(
+            local_state->_runtime_state->get_sink_local_state());
+    ASSERT_GT(inner_sink_local_state->_get_hash_table_size(), 0);
+
+    st = sink_operator->revoke_memory(_helper.runtime_state.get(), nullptr);
+    ASSERT_TRUE(st.ok()) << "revoke_memory failed: " << st.to_string();
+
+    while (local_state->_spill_dependency->is_blocked_by(nullptr) != nullptr) {
+        std::this_thread::sleep_for(std::chrono::milliseconds(10));
+    }
+
+    ASSERT_EQ(inner_sink_local_state->_get_hash_table_size(), 0);
+
+    block.clear_column_data();
+    st = sink_operator->sink(_helper.runtime_state.get(), &block, true);
+    ASSERT_TRUE(st.ok()) << "sink failed: " << st.to_string();
+    ASSERT_TRUE(local_state->_spill_dependency->is_blocked_by(nullptr) == 
nullptr);
+    ASSERT_TRUE(dep->is_blocked_by(nullptr) == nullptr);
+
+    st = sink_operator->close(_helper.runtime_state.get(), st);
+    ASSERT_TRUE(st.ok()) << "close failed: " << st.to_string();
+}
+
+TEST_F(PartitionedAggregationSinkOperatorTest, SinkWithSpillLargeData) {
+    auto [source_operator, sink_operator] = _helper.create_operators();
+    ASSERT_TRUE(source_operator != nullptr);
+    ASSERT_TRUE(sink_operator != nullptr);
+
+    const auto tnode = _helper.create_test_plan_node();
+    auto st = sink_operator->init(tnode, _helper.runtime_state.get());
+    ASSERT_TRUE(st.ok()) << "init failed: " << st.to_string();
+
+    st = sink_operator->prepare(_helper.runtime_state.get());
+    ASSERT_TRUE(st.ok()) << "prepare failed: " << st.to_string();
+
+    auto shared_state = sink_operator->create_shared_state();
+    auto* dep = 
shared_state->create_source_dependency(source_operator->operator_id(),
+                                                       
source_operator->node_id(),
+                                                       
"PartitionedAggSinkTestDep");
+
+    LocalSinkStateInfo info {.task_idx = 0,
+                             .parent_profile = _helper.runtime_profile.get(),
+                             .sender_id = 0,
+                             .shared_state = shared_state.get(),
+                             .le_state_map = {},
+                             .tsink = TDataSink()};
+    st = sink_operator->setup_local_state(_helper.runtime_state.get(), info);
+    ASSERT_TRUE(st.ok()) << "setup_local_state failed: " << st.to_string();
+
+    auto* local_state = reinterpret_cast<PartitionedAggSinkLocalState*>(
+            _helper.runtime_state->get_sink_local_state());
+    ASSERT_TRUE(local_state != nullptr);
+
+    st = local_state->open(_helper.runtime_state.get());
+    ASSERT_TRUE(st.ok()) << "open failed: " << st.to_string();
+
+    auto block = 
vectorized::ColumnHelper::create_block<vectorized::DataTypeInt32>(
+            {1, 2, 3, 4, 2, 3, 4, 3, 4, 4});
+
+    
block.insert(vectorized::ColumnHelper::create_column_with_name<vectorized::DataTypeInt32>(
+            {1, 2, 3, 4, 2, 3, 4, 3, 4, 4}));
+
+    st = sink_operator->sink(_helper.runtime_state.get(), &block, false);
+    ASSERT_TRUE(st.ok()) << "sink failed: " << st.to_string();
+
+    ASSERT_GT(sink_operator->revocable_mem_size(_helper.runtime_state.get()), 
0);
+
+    auto* inner_sink_local_state = reinterpret_cast<AggSinkLocalState*>(
+            local_state->_runtime_state->get_sink_local_state());
+    ASSERT_GT(inner_sink_local_state->_get_hash_table_size(), 0);
+
+    st = sink_operator->revoke_memory(_helper.runtime_state.get(), nullptr);
+    ASSERT_TRUE(st.ok()) << "revoke_memory failed: " << st.to_string();
+
+    while (local_state->_spill_dependency->is_blocked_by(nullptr) != nullptr) {
+        std::this_thread::sleep_for(std::chrono::milliseconds(10));
+    }
+
+    auto* spill_write_rows_counter = 
local_state->profile()->get_counter("SpillWriteRows");
+    ASSERT_TRUE(spill_write_rows_counter != nullptr);
+    ASSERT_EQ(spill_write_rows_counter->value(), 4);
+
+    ASSERT_EQ(inner_sink_local_state->_get_hash_table_size(), 0);
+
+    const size_t count = 1048576;
+    std::vector<int32_t> data(count);
+    std::iota(data.begin(), data.end(), 0);
+    block = 
vectorized::ColumnHelper::create_block<vectorized::DataTypeInt32>(data);
+
+    block.insert(
+            
vectorized::ColumnHelper::create_column_with_name<vectorized::DataTypeInt32>(data));
+    st = sink_operator->sink(_helper.runtime_state.get(), &block, false);
+    ASSERT_TRUE(st.ok()) << "sink failed: " << st.to_string();
+
+    while (local_state->_spill_dependency->is_blocked_by(nullptr) != nullptr) {
+        std::this_thread::sleep_for(std::chrono::milliseconds(10));
+    }
+
+    block.clear_column_data();
+    st = sink_operator->sink(_helper.runtime_state.get(), &block, true);
+    ASSERT_TRUE(st.ok()) << "sink failed: " << st.to_string();
+    ASSERT_TRUE(local_state->_spill_dependency->is_blocked_by(nullptr) == 
nullptr);
+    ASSERT_TRUE(dep->is_blocked_by(nullptr) == nullptr);
+
+    st = sink_operator->close(_helper.runtime_state.get(), st);
+    ASSERT_EQ(spill_write_rows_counter->value(), 1048576 + 4);
+    ASSERT_TRUE(st.ok()) << "close failed: " << st.to_string();
+}
+
+TEST_F(PartitionedAggregationSinkOperatorTest, SinkWithSpilError) {
+    auto [source_operator, sink_operator] = _helper.create_operators();
+    ASSERT_TRUE(source_operator != nullptr);
+    ASSERT_TRUE(sink_operator != nullptr);
+
+    const auto tnode = _helper.create_test_plan_node();
+    auto st = sink_operator->init(tnode, _helper.runtime_state.get());
+    ASSERT_TRUE(st.ok()) << "init failed: " << st.to_string();
+
+    st = sink_operator->prepare(_helper.runtime_state.get());
+    ASSERT_TRUE(st.ok()) << "prepare failed: " << st.to_string();
+
+    auto shared_state = sink_operator->create_shared_state();
+
+    LocalSinkStateInfo info {.task_idx = 0,
+                             .parent_profile = _helper.runtime_profile.get(),
+                             .sender_id = 0,
+                             .shared_state = shared_state.get(),
+                             .le_state_map = {},
+                             .tsink = TDataSink()};
+    st = sink_operator->setup_local_state(_helper.runtime_state.get(), info);
+    ASSERT_TRUE(st.ok()) << "setup_local_state failed: " << st.to_string();
+
+    auto* local_state = reinterpret_cast<PartitionedAggSinkLocalState*>(
+            _helper.runtime_state->get_sink_local_state());
+    ASSERT_TRUE(local_state != nullptr);
+
+    st = local_state->open(_helper.runtime_state.get());
+    ASSERT_TRUE(st.ok()) << "open failed: " << st.to_string();
+
+    auto block = 
vectorized::ColumnHelper::create_block<vectorized::DataTypeInt32>(
+            {1, 2, 3, 4, 2, 3, 4, 3, 4, 4});
+
+    
block.insert(vectorized::ColumnHelper::create_column_with_name<vectorized::DataTypeInt32>(
+            {1, 2, 3, 4, 2, 3, 4, 3, 4, 4}));
+
+    st = sink_operator->sink(_helper.runtime_state.get(), &block, false);
+    ASSERT_TRUE(st.ok()) << "sink failed: " << st.to_string();
+
+    ASSERT_GT(sink_operator->revocable_mem_size(_helper.runtime_state.get()), 
0);
+
+    auto* inner_sink_local_state = reinterpret_cast<AggSinkLocalState*>(
+            local_state->_runtime_state->get_sink_local_state());
+    ASSERT_GT(inner_sink_local_state->_get_hash_table_size(), 0);
+
+    SpillableDebugPointHelper 
dp_helper("fault_inject::spill_stream::spill_block");
+    st = sink_operator->revoke_memory(_helper.runtime_state.get(), nullptr);
+    ASSERT_TRUE(st.ok()) << "revoke_memory failed: " << st.to_string();
+
+    while (local_state->_spill_dependency->is_blocked_by(nullptr) != nullptr) {
+        std::this_thread::sleep_for(std::chrono::milliseconds(10));
+    }
+
+    std::cout << "profile: " << _helper.runtime_profile->pretty_print() << 
std::endl;
+
+    ASSERT_FALSE(dp_helper.get_spill_status().ok()) << "spilll status should 
be failed";
+}
+
+} // namespace doris::pipeline
\ No newline at end of file
diff --git 
a/be/test/pipeline/operator/partitioned_aggregation_source_operator_test.cpp 
b/be/test/pipeline/operator/partitioned_aggregation_source_operator_test.cpp
new file mode 100644
index 00000000000..a23c7237979
--- /dev/null
+++ b/be/test/pipeline/operator/partitioned_aggregation_source_operator_test.cpp
@@ -0,0 +1,470 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "pipeline/exec/partitioned_aggregation_source_operator.h"
+
+#include <gtest/gtest.h>
+
+#include <memory>
+
+#include "common/config.h"
+#include "partitioned_aggregation_test_helper.h"
+#include "pipeline/dependency.h"
+#include "pipeline/exec/aggregation_sink_operator.h"
+#include "pipeline/exec/operator.h"
+#include "pipeline/exec/partitioned_aggregation_sink_operator.h"
+#include "pipeline/exec/partitioned_hash_join_probe_operator.h"
+#include "pipeline/exec/partitioned_hash_join_sink_operator.h"
+#include "pipeline/pipeline_task.h"
+#include "runtime/fragment_mgr.h"
+#include "testutil/column_helper.h"
+#include "testutil/mock/mock_runtime_state.h"
+#include "vec/core/block.h"
+#include "vec/data_types/data_type_number.h"
+
+namespace doris::pipeline {
+class PartitionedAggregationSourceOperatorTest : public testing::Test {
+protected:
+    void SetUp() override { _helper.SetUp(); }
+    void TearDown() override { _helper.TearDown(); }
+
+    PartitionedAggregationTestHelper _helper;
+};
+
+TEST_F(PartitionedAggregationSourceOperatorTest, Init) {
+    auto [source_operator, sink_operator] = _helper.create_operators();
+    ASSERT_TRUE(source_operator != nullptr);
+    ASSERT_TRUE(sink_operator != nullptr);
+
+    const auto tnode = _helper.create_test_plan_node();
+    auto st = source_operator->init(tnode, _helper.runtime_state.get());
+    ASSERT_TRUE(st.ok()) << "init failed: " << st.to_string();
+
+    st = source_operator->prepare(_helper.runtime_state.get());
+    ASSERT_TRUE(st.ok()) << "prepare failed: " << st.to_string();
+
+    std::shared_ptr<MockPartitionedAggSharedState> shared_state =
+            MockPartitionedAggSharedState::create_shared();
+
+    shared_state->in_mem_shared_state_sptr = 
std::make_shared<AggSharedState>();
+    shared_state->in_mem_shared_state =
+            
reinterpret_cast<AggSharedState*>(shared_state->in_mem_shared_state_sptr.get());
+
+    LocalStateInfo info {
+            .parent_profile = _helper.runtime_profile.get(),
+            .scan_ranges = {},
+            .shared_state = shared_state.get(),
+            .le_state_map = {},
+            .task_idx = 0,
+    };
+    st = source_operator->setup_local_state(_helper.runtime_state.get(), info);
+    ASSERT_TRUE(st.ok()) << "setup_local_state failed: " << st.to_string();
+
+    auto* local_state = 
_helper.runtime_state->get_local_state(source_operator->operator_id());
+    ASSERT_TRUE(local_state != nullptr);
+
+    st = local_state->open(_helper.runtime_state.get());
+    ASSERT_TRUE(st.ok()) << "open failed: " << st.to_string();
+
+    st = source_operator->close(_helper.runtime_state.get());
+    ASSERT_TRUE(st.ok()) << "close failed: " << st.to_string();
+
+    st = local_state->open(_helper.runtime_state.get());
+    ASSERT_TRUE(st.ok()) << "open failed: " << st.to_string();
+}
+
+TEST_F(PartitionedAggregationSourceOperatorTest, GetBlockEmpty) {
+    auto [source_operator, sink_operator] = _helper.create_operators();
+    ASSERT_TRUE(source_operator != nullptr);
+    ASSERT_TRUE(sink_operator != nullptr);
+
+    const auto tnode = _helper.create_test_plan_node();
+    auto st = source_operator->init(tnode, _helper.runtime_state.get());
+    ASSERT_TRUE(st.ok()) << "init failed: " << st.to_string();
+
+    st = source_operator->prepare(_helper.runtime_state.get());
+    ASSERT_TRUE(st.ok()) << "prepare failed: " << st.to_string();
+
+    st = sink_operator->init(tnode, _helper.runtime_state.get());
+    ASSERT_TRUE(st.ok()) << "init failed: " << st.to_string();
+
+    st = sink_operator->prepare(_helper.runtime_state.get());
+    ASSERT_TRUE(st.ok()) << "prepare failed: " << st.to_string();
+
+    auto shared_state = sink_operator->create_shared_state();
+    shared_state->create_source_dependency(source_operator->operator_id(),
+                                           source_operator->node_id(), 
"PartitionedAggSinkTestDep");
+
+    LocalSinkStateInfo sink_info {.task_idx = 0,
+                                  .parent_profile = 
_helper.runtime_profile.get(),
+                                  .sender_id = 0,
+                                  .shared_state = shared_state.get(),
+                                  .le_state_map = {},
+                                  .tsink = TDataSink()};
+    st = sink_operator->setup_local_state(_helper.runtime_state.get(), 
sink_info);
+    ASSERT_TRUE(st.ok()) << "setup_local_state failed: " << st.to_string();
+
+    auto* sink_local_state = _helper.runtime_state->get_sink_local_state();
+    ASSERT_TRUE(sink_local_state != nullptr);
+
+    st = sink_local_state->open(_helper.runtime_state.get());
+    ASSERT_TRUE(st.ok()) << "open failed: " << st.to_string();
+
+    LocalStateInfo info {
+            .parent_profile = _helper.runtime_profile.get(),
+            .scan_ranges = {},
+            .shared_state = shared_state.get(),
+            .le_state_map = {},
+            .task_idx = 0,
+    };
+    st = source_operator->setup_local_state(_helper.runtime_state.get(), info);
+    ASSERT_TRUE(st.ok()) << "setup_local_state failed: " << st.to_string();
+
+    auto* local_state = reinterpret_cast<PartitionedAggLocalState*>(
+            
_helper.runtime_state->get_local_state(source_operator->operator_id()));
+    ASSERT_TRUE(local_state != nullptr);
+
+    local_state->_copy_shared_spill_profile = false;
+
+    st = local_state->open(_helper.runtime_state.get());
+    ASSERT_TRUE(st.ok()) << "open failed: " << st.to_string();
+
+    vectorized::Block block;
+    bool eos = false;
+    st = source_operator->get_block(_helper.runtime_state.get(), &block, &eos);
+    ASSERT_TRUE(st.ok()) << "get_block failed: " << st.to_string();
+
+    ASSERT_EQ(block.rows(), 0);
+    ASSERT_TRUE(eos);
+
+    st = source_operator->close(_helper.runtime_state.get());
+    ASSERT_TRUE(st.ok()) << "close failed: " << st.to_string();
+
+    st = local_state->open(_helper.runtime_state.get());
+    ASSERT_TRUE(st.ok()) << "open failed: " << st.to_string();
+}
+
+TEST_F(PartitionedAggregationSourceOperatorTest, GetBlock) {
+    auto [source_operator, sink_operator] = _helper.create_operators();
+    ASSERT_TRUE(source_operator != nullptr);
+    ASSERT_TRUE(sink_operator != nullptr);
+
+    const auto tnode = _helper.create_test_plan_node();
+    auto st = source_operator->init(tnode, _helper.runtime_state.get());
+    ASSERT_TRUE(st.ok()) << "init failed: " << st.to_string();
+
+    st = source_operator->prepare(_helper.runtime_state.get());
+    ASSERT_TRUE(st.ok()) << "prepare failed: " << st.to_string();
+
+    st = sink_operator->init(tnode, _helper.runtime_state.get());
+    ASSERT_TRUE(st.ok()) << "init failed: " << st.to_string();
+
+    st = sink_operator->prepare(_helper.runtime_state.get());
+    ASSERT_TRUE(st.ok()) << "prepare failed: " << st.to_string();
+
+    auto shared_state = sink_operator->create_shared_state();
+    shared_state->create_source_dependency(source_operator->operator_id(),
+                                           source_operator->node_id(), 
"PartitionedAggSinkTestDep");
+
+    LocalSinkStateInfo sink_info {.task_idx = 0,
+                                  .parent_profile = 
_helper.runtime_profile.get(),
+                                  .sender_id = 0,
+                                  .shared_state = shared_state.get(),
+                                  .le_state_map = {},
+                                  .tsink = TDataSink()};
+    st = sink_operator->setup_local_state(_helper.runtime_state.get(), 
sink_info);
+    ASSERT_TRUE(st.ok()) << "setup_local_state failed: " << st.to_string();
+
+    auto* sink_local_state = reinterpret_cast<PartitionedAggSinkLocalState*>(
+            _helper.runtime_state->get_sink_local_state());
+    ASSERT_TRUE(sink_local_state != nullptr);
+
+    st = sink_local_state->open(_helper.runtime_state.get());
+    ASSERT_TRUE(st.ok()) << "open failed: " << st.to_string();
+
+    auto block = 
vectorized::ColumnHelper::create_block<vectorized::DataTypeInt32>(
+            {1, 2, 3, 4, 2, 3, 4, 3, 4, 4});
+
+    
block.insert(vectorized::ColumnHelper::create_column_with_name<vectorized::DataTypeInt32>(
+            {1, 2, 3, 4, 2, 3, 4, 3, 4, 4}));
+
+    st = sink_operator->sink(_helper.runtime_state.get(), &block, false);
+    ASSERT_TRUE(st.ok()) << "sink failed: " << st.to_string();
+
+    st = sink_operator->sink(_helper.runtime_state.get(), &block, true);
+    ASSERT_TRUE(st.ok()) << "sink failed: " << st.to_string();
+
+    ASSERT_GT(sink_operator->revocable_mem_size(_helper.runtime_state.get()), 
0);
+
+    auto* inner_sink_local_state = reinterpret_cast<AggSinkLocalState*>(
+            sink_local_state->_runtime_state->get_sink_local_state());
+    ASSERT_GT(inner_sink_local_state->_get_hash_table_size(), 0);
+
+    LocalStateInfo info {
+            .parent_profile = _helper.runtime_profile.get(),
+            .scan_ranges = {},
+            .shared_state = shared_state.get(),
+            .le_state_map = {},
+            .task_idx = 0,
+    };
+    st = source_operator->setup_local_state(_helper.runtime_state.get(), info);
+    ASSERT_TRUE(st.ok()) << "setup_local_state failed: " << st.to_string();
+
+    auto* local_state = reinterpret_cast<PartitionedAggLocalState*>(
+            
_helper.runtime_state->get_local_state(source_operator->operator_id()));
+    ASSERT_TRUE(local_state != nullptr);
+
+    local_state->_copy_shared_spill_profile = false;
+
+    st = local_state->open(_helper.runtime_state.get());
+    ASSERT_TRUE(st.ok()) << "open failed: " << st.to_string();
+
+    block.clear();
+    bool eos = false;
+    st = source_operator->get_block(_helper.runtime_state.get(), &block, &eos);
+    ASSERT_TRUE(st.ok()) << "get_block failed: " << st.to_string();
+
+    ASSERT_TRUE(eos);
+    DCHECK_EQ(block.rows(), 4);
+    ASSERT_EQ(block.rows(), 4);
+
+    st = source_operator->close(_helper.runtime_state.get());
+    ASSERT_TRUE(st.ok()) << "close failed: " << st.to_string();
+
+    st = local_state->open(_helper.runtime_state.get());
+    ASSERT_TRUE(st.ok()) << "open failed: " << st.to_string();
+}
+
+TEST_F(PartitionedAggregationSourceOperatorTest, GetBlockWithSpill) {
+    auto [source_operator, sink_operator] = _helper.create_operators();
+    ASSERT_TRUE(source_operator != nullptr);
+    ASSERT_TRUE(sink_operator != nullptr);
+
+    const auto tnode = _helper.create_test_plan_node();
+    auto st = source_operator->init(tnode, _helper.runtime_state.get());
+    ASSERT_TRUE(st.ok()) << "init failed: " << st.to_string();
+
+    st = source_operator->prepare(_helper.runtime_state.get());
+    ASSERT_TRUE(st.ok()) << "prepare failed: " << st.to_string();
+
+    st = sink_operator->init(tnode, _helper.runtime_state.get());
+    ASSERT_TRUE(st.ok()) << "init failed: " << st.to_string();
+
+    st = sink_operator->prepare(_helper.runtime_state.get());
+    ASSERT_TRUE(st.ok()) << "prepare failed: " << st.to_string();
+
+    auto shared_state = std::dynamic_pointer_cast<PartitionedAggSharedState>(
+            sink_operator->create_shared_state());
+    shared_state->create_source_dependency(source_operator->operator_id(),
+                                           source_operator->node_id(), 
"PartitionedAggSinkTestDep");
+
+    LocalSinkStateInfo sink_info {.task_idx = 0,
+                                  .parent_profile = 
_helper.runtime_profile.get(),
+                                  .sender_id = 0,
+                                  .shared_state = shared_state.get(),
+                                  .le_state_map = {},
+                                  .tsink = TDataSink()};
+    st = sink_operator->setup_local_state(_helper.runtime_state.get(), 
sink_info);
+    ASSERT_TRUE(st.ok()) << "setup_local_state failed: " << st.to_string();
+
+    auto* sink_local_state = reinterpret_cast<PartitionedAggSinkLocalState*>(
+            _helper.runtime_state->get_sink_local_state());
+    ASSERT_TRUE(sink_local_state != nullptr);
+
+    st = sink_local_state->open(_helper.runtime_state.get());
+    ASSERT_TRUE(st.ok()) << "open failed: " << st.to_string();
+
+    auto block = 
vectorized::ColumnHelper::create_block<vectorized::DataTypeInt32>(
+            {1, 2, 3, 4, 2, 3, 4, 3, 4, 4});
+
+    
block.insert(vectorized::ColumnHelper::create_column_with_name<vectorized::DataTypeInt32>(
+            {1, 2, 3, 4, 2, 3, 4, 3, 4, 4}));
+
+    st = sink_operator->sink(_helper.runtime_state.get(), &block, false);
+    ASSERT_TRUE(st.ok()) << "sink failed: " << st.to_string();
+
+    st = sink_operator->revoke_memory(_helper.runtime_state.get(), nullptr);
+    ASSERT_TRUE(st.ok()) << "revoke_memory failed: " << st.to_string();
+
+    while (sink_local_state->_spill_dependency->is_blocked_by(nullptr) != 
nullptr) {
+        std::this_thread::sleep_for(std::chrono::milliseconds(10));
+    }
+
+    ASSERT_TRUE(shared_state->is_spilled);
+
+    st = sink_operator->sink(_helper.runtime_state.get(), &block, true);
+    ASSERT_TRUE(st.ok()) << "sink failed: " << st.to_string();
+
+    while (sink_local_state->_spill_dependency->is_blocked_by(nullptr) != 
nullptr) {
+        std::this_thread::sleep_for(std::chrono::milliseconds(10));
+    }
+
+    ASSERT_EQ(sink_operator->revocable_mem_size(_helper.runtime_state.get()), 
0);
+
+    auto* inner_sink_local_state = reinterpret_cast<AggSinkLocalState*>(
+            sink_local_state->_runtime_state->get_sink_local_state());
+    ASSERT_EQ(inner_sink_local_state->_get_hash_table_size(), 0);
+
+    LocalStateInfo info {
+            .parent_profile = _helper.runtime_profile.get(),
+            .scan_ranges = {},
+            .shared_state = shared_state.get(),
+            .le_state_map = {},
+            .task_idx = 0,
+    };
+    st = source_operator->setup_local_state(_helper.runtime_state.get(), info);
+    ASSERT_TRUE(st.ok()) << "setup_local_state failed: " << st.to_string();
+
+    auto* local_state = reinterpret_cast<PartitionedAggLocalState*>(
+            
_helper.runtime_state->get_local_state(source_operator->operator_id()));
+    ASSERT_TRUE(local_state != nullptr);
+
+    local_state->_copy_shared_spill_profile = false;
+
+    st = local_state->open(_helper.runtime_state.get());
+    ASSERT_TRUE(st.ok()) << "open failed: " << st.to_string();
+
+    block.clear();
+    bool eos = false;
+    size_t rows = 0;
+    while (!eos) {
+        st = source_operator->get_block(_helper.runtime_state.get(), &block, 
&eos);
+        ASSERT_TRUE(st.ok()) << "get_block failed: " << st.to_string();
+
+        while (local_state->_spill_dependency->is_blocked_by(nullptr) != 
nullptr) {
+            std::this_thread::sleep_for(std::chrono::milliseconds(10));
+        }
+        rows += block.rows();
+        block.clear_column_data();
+    }
+
+    ASSERT_TRUE(eos);
+    ASSERT_EQ(rows, 4);
+
+    st = source_operator->close(_helper.runtime_state.get());
+    ASSERT_TRUE(st.ok()) << "close failed: " << st.to_string();
+
+    st = local_state->open(_helper.runtime_state.get());
+    ASSERT_TRUE(st.ok()) << "open failed: " << st.to_string();
+}
+
+TEST_F(PartitionedAggregationSourceOperatorTest, GetBlockWithSpillError) {
+    auto [source_operator, sink_operator] = _helper.create_operators();
+    ASSERT_TRUE(source_operator != nullptr);
+    ASSERT_TRUE(sink_operator != nullptr);
+
+    const auto tnode = _helper.create_test_plan_node();
+    auto st = source_operator->init(tnode, _helper.runtime_state.get());
+    ASSERT_TRUE(st.ok()) << "init failed: " << st.to_string();
+
+    st = source_operator->prepare(_helper.runtime_state.get());
+    ASSERT_TRUE(st.ok()) << "prepare failed: " << st.to_string();
+
+    st = sink_operator->init(tnode, _helper.runtime_state.get());
+    ASSERT_TRUE(st.ok()) << "init failed: " << st.to_string();
+
+    st = sink_operator->prepare(_helper.runtime_state.get());
+    ASSERT_TRUE(st.ok()) << "prepare failed: " << st.to_string();
+
+    auto shared_state = std::dynamic_pointer_cast<PartitionedAggSharedState>(
+            sink_operator->create_shared_state());
+    shared_state->create_source_dependency(source_operator->operator_id(),
+                                           source_operator->node_id(), 
"PartitionedAggSinkTestDep");
+
+    LocalSinkStateInfo sink_info {.task_idx = 0,
+                                  .parent_profile = 
_helper.runtime_profile.get(),
+                                  .sender_id = 0,
+                                  .shared_state = shared_state.get(),
+                                  .le_state_map = {},
+                                  .tsink = TDataSink()};
+    st = sink_operator->setup_local_state(_helper.runtime_state.get(), 
sink_info);
+    ASSERT_TRUE(st.ok()) << "setup_local_state failed: " << st.to_string();
+
+    auto* sink_local_state = reinterpret_cast<PartitionedAggSinkLocalState*>(
+            _helper.runtime_state->get_sink_local_state());
+    ASSERT_TRUE(sink_local_state != nullptr);
+
+    st = sink_local_state->open(_helper.runtime_state.get());
+    ASSERT_TRUE(st.ok()) << "open failed: " << st.to_string();
+
+    auto block = 
vectorized::ColumnHelper::create_block<vectorized::DataTypeInt32>(
+            {1, 2, 3, 4, 2, 3, 4, 3, 4, 4});
+
+    
block.insert(vectorized::ColumnHelper::create_column_with_name<vectorized::DataTypeInt32>(
+            {1, 2, 3, 4, 2, 3, 4, 3, 4, 4}));
+
+    st = sink_operator->sink(_helper.runtime_state.get(), &block, false);
+    ASSERT_TRUE(st.ok()) << "sink failed: " << st.to_string();
+
+    st = sink_operator->revoke_memory(_helper.runtime_state.get(), nullptr);
+    ASSERT_TRUE(st.ok()) << "revoke_memory failed: " << st.to_string();
+
+    while (sink_local_state->_spill_dependency->is_blocked_by(nullptr) != 
nullptr) {
+        std::this_thread::sleep_for(std::chrono::milliseconds(10));
+    }
+
+    ASSERT_TRUE(shared_state->is_spilled);
+
+    st = sink_operator->sink(_helper.runtime_state.get(), &block, true);
+    ASSERT_TRUE(st.ok()) << "sink failed: " << st.to_string();
+
+    while (sink_local_state->_spill_dependency->is_blocked_by(nullptr) != 
nullptr) {
+        std::this_thread::sleep_for(std::chrono::milliseconds(10));
+    }
+
+    ASSERT_EQ(sink_operator->revocable_mem_size(_helper.runtime_state.get()), 
0);
+
+    auto* inner_sink_local_state = reinterpret_cast<AggSinkLocalState*>(
+            sink_local_state->_runtime_state->get_sink_local_state());
+    ASSERT_EQ(inner_sink_local_state->_get_hash_table_size(), 0);
+
+    LocalStateInfo info {
+            .parent_profile = _helper.runtime_profile.get(),
+            .scan_ranges = {},
+            .shared_state = shared_state.get(),
+            .le_state_map = {},
+            .task_idx = 0,
+    };
+    st = source_operator->setup_local_state(_helper.runtime_state.get(), info);
+    ASSERT_TRUE(st.ok()) << "setup_local_state failed: " << st.to_string();
+
+    auto* local_state = reinterpret_cast<PartitionedAggLocalState*>(
+            
_helper.runtime_state->get_local_state(source_operator->operator_id()));
+    ASSERT_TRUE(local_state != nullptr);
+
+    local_state->_copy_shared_spill_profile = false;
+
+    st = local_state->open(_helper.runtime_state.get());
+    ASSERT_TRUE(st.ok()) << "open failed: " << st.to_string();
+
+    SpillableDebugPointHelper 
dp_helper("fault_inject::spill_stream::read_next_block");
+
+    block.clear();
+    bool eos = false;
+    while (!eos && dp_helper.get_spill_status().ok()) {
+        st = source_operator->get_block(_helper.runtime_state.get(), &block, 
&eos);
+        ASSERT_TRUE(st.ok()) << "get_block failed: " << st.to_string();
+
+        while (local_state->_spill_dependency->is_blocked_by(nullptr) != 
nullptr) {
+            std::this_thread::sleep_for(std::chrono::milliseconds(10));
+        }
+        block.clear_column_data();
+    }
+
+    ASSERT_FALSE(dp_helper.get_spill_status().ok());
+}
+} // namespace doris::pipeline
diff --git a/be/test/pipeline/operator/partitioned_aggregation_test_helper.cpp 
b/be/test/pipeline/operator/partitioned_aggregation_test_helper.cpp
new file mode 100644
index 00000000000..0d2a653c6c3
--- /dev/null
+++ b/be/test/pipeline/operator/partitioned_aggregation_test_helper.cpp
@@ -0,0 +1,238 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "partitioned_aggregation_test_helper.h"
+
+#include <gen_cpp/DataSinks_types.h>
+#include <gen_cpp/Descriptors_types.h>
+#include <gmock/gmock-actions.h>
+#include <gmock/gmock-function-mocker.h>
+#include <gmock/gmock-spec-builders.h>
+#include <gmock/gmock.h>
+#include <gtest/gtest.h>
+
+#include <memory>
+#include <vector>
+
+#include "runtime/define_primitive_type.h"
+#include "testutil/creators.h"
+#include "testutil/mock/mock_operators.h"
+
+namespace doris::pipeline {
+TPlanNode PartitionedAggregationTestHelper::create_test_plan_node() {
+    TPlanNode tnode;
+    tnode.node_id = 0;
+    tnode.node_type = TPlanNodeType::AGGREGATION_NODE;
+    tnode.num_children = 1;
+    tnode.agg_node.use_streaming_preaggregation = false;
+    tnode.agg_node.need_finalize = false;
+    tnode.agg_node.intermediate_tuple_id = 1;
+    tnode.agg_node.output_tuple_id = 2;
+    tnode.limit = -1;
+
+    auto& grouping_expr = tnode.agg_node.grouping_exprs.emplace_back();
+    auto& expr_node = grouping_expr.nodes.emplace_back();
+    expr_node.node_type = TExprNodeType::SLOT_REF;
+
+    TTypeNode type_node;
+    type_node.type = TTypeNodeType::SCALAR;
+    type_node.scalar_type.type = TPrimitiveType::INT;
+    type_node.__isset.scalar_type = true;
+
+    expr_node.type.types.emplace_back(type_node);
+    expr_node.__set_is_nullable(false);
+    expr_node.num_children = 0;
+    expr_node.slot_ref.slot_id = 0;
+    expr_node.slot_ref.tuple_id = 0;
+
+    auto& agg_function = tnode.agg_node.aggregate_functions.emplace_back();
+    auto& fn_node = agg_function.nodes.emplace_back();
+    fn_node.node_type = TExprNodeType::FUNCTION_CALL;
+    fn_node.__set_is_nullable(false);
+    fn_node.num_children = 1;
+
+    TFunctionName fn_name;
+    fn_name.function_name = "sum";
+
+    fn_node.fn.__set_name(fn_name);
+
+    TTypeDesc ret_type;
+    auto& ret_type_node = ret_type.types.emplace_back();
+    ret_type_node.scalar_type.type = TPrimitiveType::BIGINT;
+    ret_type_node.__isset.scalar_type = true;
+    ret_type_node.type = TTypeNodeType::SCALAR;
+    ret_type.__set_is_nullable(false);
+
+    TTypeDesc arg_type;
+    auto& arg_type_node = arg_type.types.emplace_back();
+    arg_type_node.scalar_type.type = TPrimitiveType::INT;
+    arg_type_node.__isset.scalar_type = true;
+    arg_type_node.type = TTypeNodeType::SCALAR;
+
+    fn_node.fn.__set_ret_type(ret_type);
+
+    fn_node.fn.__set_arg_types({arg_type});
+    fn_node.agg_expr.__set_param_types({arg_type});
+
+    auto& fn_child_node = agg_function.nodes.emplace_back();
+    fn_child_node.node_type = TExprNodeType::SLOT_REF;
+    fn_child_node.__set_is_nullable(false);
+    fn_child_node.num_children = 0;
+    fn_child_node.slot_ref.slot_id = 1;
+    fn_child_node.slot_ref.tuple_id = 0;
+    fn_child_node.type.types.emplace_back(type_node);
+
+    tnode.row_tuples.push_back(0);
+    tnode.nullable_tuples.push_back(false);
+
+    return tnode;
+}
+
+TDescriptorTable 
PartitionedAggregationTestHelper::create_test_table_descriptor(
+        bool nullable = false) {
+    TTupleDescriptorBuilder tuple_builder;
+    tuple_builder
+            .add_slot(TSlotDescriptorBuilder()
+                              .type(PrimitiveType::TYPE_INT)
+                              .column_name("col1")
+                              .column_pos(0)
+                              .nullable(nullable)
+                              .build())
+            .add_slot(TSlotDescriptorBuilder()
+                              .type(PrimitiveType::TYPE_INT)
+                              .column_name("col2")
+                              .column_pos(0)
+                              .nullable(nullable)
+                              .build());
+
+    TDescriptorTableBuilder builder;
+
+    tuple_builder.build(&builder);
+
+    TTupleDescriptorBuilder()
+            .add_slot(TSlotDescriptorBuilder()
+                              .type(TYPE_INT)
+                              .column_name("col3")
+                              .column_pos(0)
+                              .nullable(nullable)
+                              .build())
+            .add_slot(TSlotDescriptorBuilder()
+                              .type(TYPE_BIGINT)
+                              .column_name("col4")
+                              .column_pos(0)
+                              .nullable(true)
+                              .build())
+            .build(&builder);
+
+    TTupleDescriptorBuilder()
+            .add_slot(TSlotDescriptorBuilder()
+                              .type(TYPE_INT)
+                              .column_name("col5")
+                              .column_pos(0)
+                              .nullable(nullable)
+                              .build())
+            .add_slot(TSlotDescriptorBuilder()
+                              .type(TYPE_BIGINT)
+                              .column_name("col6")
+                              .column_pos(0)
+                              .nullable(true)
+                              .build())
+            .build(&builder);
+
+    return builder.desc_tbl();
+}
+
+std::tuple<std::shared_ptr<PartitionedAggSourceOperatorX>,
+           std::shared_ptr<PartitionedAggSinkOperatorX>>
+PartitionedAggregationTestHelper::create_operators() {
+    TPlanNode tnode = create_test_plan_node();
+    auto desc_tbl = runtime_state->desc_tbl();
+
+    EXPECT_EQ(desc_tbl.get_tuple_descs().size(), 3);
+
+    auto source_operator =
+            std::make_shared<PartitionedAggSourceOperatorX>(obj_pool.get(), 
tnode, 0, desc_tbl);
+    auto sink_operator = 
std::make_shared<PartitionedAggSinkOperatorX>(obj_pool.get(), 0, 0, tnode,
+                                                                       
desc_tbl, false);
+
+    auto child_operator = std::make_shared<MockChildOperator>();
+    auto probe_side_source_operator = std::make_shared<MockChildOperator>();
+    auto source_side_sink_operator = std::make_shared<MockSinkOperator>();
+    auto [source_pipeline, _] = generate_agg_pipeline(source_operator, 
source_side_sink_operator,
+                                                      sink_operator, 
child_operator);
+
+    RowDescriptor row_desc(runtime_state->desc_tbl(), {0}, {false});
+    child_operator->_row_descriptor = row_desc;
+
+    EXPECT_TRUE(sink_operator->set_child(child_operator));
+
+    // Setup task and state
+    std::map<int, std::pair<std::shared_ptr<LocalExchangeSharedState>, 
std::shared_ptr<Dependency>>>
+            le_state_map;
+    pipeline_task = std::make_shared<PipelineTask>(source_pipeline, 0, 
runtime_state.get(), nullptr,
+                                                   nullptr, le_state_map, 0);
+    runtime_state->set_task(pipeline_task.get());
+    return {std::move(source_operator), std::move(sink_operator)};
+}
+
+PartitionedAggLocalState* 
PartitionedAggregationTestHelper::create_source_local_state(
+        RuntimeState* state, PartitionedAggSourceOperatorX* probe_operator,
+        std::shared_ptr<MockPartitionedAggSharedState>& shared_state) {
+    auto local_state_uptr = 
std::make_unique<MockPartitionedAggLocalState>(state, probe_operator);
+    auto* local_state = local_state_uptr.get();
+    shared_state = std::make_shared<MockPartitionedAggSharedState>();
+    local_state->_shared_state = shared_state.get();
+    shared_state->is_spilled = true;
+
+    ADD_TIMER(local_state->profile(), "ExecTime");
+    local_state->profile()->AddHighWaterMarkCounter("MemoryUsage", 
TUnit::BYTES, "", 0);
+    local_state->init_spill_read_counters();
+    local_state->init_spill_write_counters();
+    local_state->_copy_shared_spill_profile = false;
+    local_state->_internal_runtime_profile = 
std::make_unique<RuntimeProfile>("inner_test");
+
+    local_state->_spill_dependency =
+            Dependency::create_shared(0, 0, 
"PartitionedHashJoinProbeOperatorTestSpillDep", true);
+
+    state->emplace_local_state(probe_operator->operator_id(), 
std::move(local_state_uptr));
+    return local_state;
+}
+
+PartitionedAggSinkLocalState* 
PartitionedAggregationTestHelper::create_sink_local_state(
+        RuntimeState* state, PartitionedAggSinkOperatorX* sink_operator,
+        std::shared_ptr<MockPartitionedAggSharedState>& shared_state) {
+    auto local_state_uptr = 
MockPartitionedAggSinkLocalState::create_unique(sink_operator, state);
+    auto* local_state = local_state_uptr.get();
+    shared_state = std::make_shared<MockPartitionedAggSharedState>();
+    local_state->init_spill_counters();
+
+    ADD_TIMER(local_state->profile(), "ExecTime");
+    local_state->profile()->AddHighWaterMarkCounter("MemoryUsage", 
TUnit::BYTES, "", 0);
+    local_state->_internal_runtime_profile = 
std::make_unique<RuntimeProfile>("inner_test");
+
+    local_state->_dependency = shared_state->create_sink_dependency(
+            sink_operator->dests_id().front(), sink_operator->operator_id(),
+            "PartitionedHashJoinTestDep");
+
+    local_state->_spill_dependency =
+            Dependency::create_shared(0, 0, 
"PartitionedHashJoinSinkOperatorTestSpillDep", true);
+    shared_state->setup_shared_profile(local_state->profile());
+
+    state->emplace_sink_local_state(sink_operator->operator_id(), 
std::move(local_state_uptr));
+    return local_state;
+}
+} // namespace doris::pipeline
\ No newline at end of file
diff --git a/be/test/pipeline/operator/partitioned_aggregation_test_helper.h 
b/be/test/pipeline/operator/partitioned_aggregation_test_helper.h
new file mode 100644
index 00000000000..cc87236d6d4
--- /dev/null
+++ b/be/test/pipeline/operator/partitioned_aggregation_test_helper.h
@@ -0,0 +1,155 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gen_cpp/DataSinks_types.h>
+#include <gen_cpp/Descriptors_types.h>
+#include <gmock/gmock-actions.h>
+#include <gmock/gmock-function-mocker.h>
+#include <gmock/gmock-spec-builders.h>
+#include <gmock/gmock.h>
+#include <gtest/gtest.h>
+
+#include <memory>
+
+#include "common/config.h"
+#include "common/factory_creator.h"
+#include "common/object_pool.h"
+#include "pipeline/exec/aggregation_sink_operator.h"
+#include "pipeline/exec/aggregation_source_operator.h"
+#include "pipeline/exec/partitioned_aggregation_sink_operator.h"
+#include "pipeline/exec/partitioned_aggregation_source_operator.h"
+#include "pipeline/pipeline_task.h"
+#include "runtime/fragment_mgr.h"
+#include "spillable_operator_test_helper.h"
+#include "util/runtime_profile.h"
+#include "vec/core/block.h"
+#include "vec/spill/spill_stream_manager.h"
+
+namespace doris::pipeline {
+class MockAggSharedState : public AggSharedState {
+public:
+};
+
+class MockPartitionedAggSharedState : public PartitionedAggSharedState {
+    ENABLE_FACTORY_CREATOR(MockPartitionedAggSharedState);
+
+public:
+    MockPartitionedAggSharedState() { is_spilled = false; }
+};
+
+class MockPartitionedAggSinkLocalState : public PartitionedAggSinkLocalState {
+    ENABLE_FACTORY_CREATOR(MockPartitionedAggSinkLocalState)
+public:
+    MockPartitionedAggSinkLocalState(DataSinkOperatorXBase* parent, 
RuntimeState* state)
+            : PartitionedAggSinkLocalState(parent, state) {
+        _runtime_profile = std::make_unique<RuntimeProfile>("test");
+        _profile = _runtime_profile.get();
+        _memory_used_counter =
+                _profile->AddHighWaterMarkCounter("MemoryUsage", TUnit::BYTES, 
"", 1);
+    }
+
+    Status init(RuntimeState* state, LocalSinkStateInfo& info) override { 
return Status::OK(); }
+    Status open(RuntimeState* state) override { return Status::OK(); }
+    Status close(RuntimeState* state, Status status) override { return 
Status::OK(); }
+
+private:
+    std::unique_ptr<RuntimeProfile> _runtime_profile;
+};
+
+class MockPartitionedAggSinkOperatorX : public PartitionedAggSinkOperatorX {
+public:
+    MockPartitionedAggSinkOperatorX(ObjectPool* pool, int operator_id, int 
dest_id,
+                                    const TPlanNode& tnode, const 
DescriptorTbl& descs)
+            : PartitionedAggSinkOperatorX(pool, operator_id, dest_id, tnode, 
descs, false) {}
+    ~MockPartitionedAggSinkOperatorX() override = default;
+
+    Status prepare(RuntimeState* state) override { return Status::OK(); }
+
+    Status setup_local_state(RuntimeState* state, LocalSinkStateInfo& info) 
override {
+        state->emplace_sink_local_state(
+                _operator_id, 
MockPartitionedAggSinkLocalState::create_unique(this, state));
+        return Status::OK();
+    }
+
+    Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) 
override {
+        return Status::OK();
+    }
+};
+
+class MockPartitionedAggLocalState : public PartitionedAggLocalState {
+    ENABLE_FACTORY_CREATOR(MockPartitionedAggLocalState);
+
+public:
+    MockPartitionedAggLocalState(RuntimeState* state, OperatorXBase* parent)
+            : PartitionedAggLocalState(state, parent) {
+        _runtime_profile = std::make_unique<RuntimeProfile>("test");
+    }
+
+    Status open(RuntimeState* state) override { return Status::OK(); }
+};
+
+class MockAggLocalState : public AggLocalState {
+    ENABLE_FACTORY_CREATOR(MockAggLocalState);
+
+public:
+    MockAggLocalState(RuntimeState* state, OperatorXBase* parent) : 
AggLocalState(state, parent) {};
+};
+
+class MockAggSourceOperatorX : public AggSourceOperatorX {
+public:
+    MockAggSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, int 
operator_id,
+                           const DescriptorTbl& descs)
+            : AggSourceOperatorX(pool, tnode, operator_id, descs) {}
+    ~MockAggSourceOperatorX() override = default;
+
+    Status setup_local_state(RuntimeState* state, LocalStateInfo& info) 
override {
+        state->emplace_local_state(_operator_id, 
MockAggLocalState::create_unique(state, this));
+        return Status::OK();
+    }
+
+    bool need_more_input_data(RuntimeState* state) const override { return 
need_more_data; }
+    bool need_more_data = true;
+
+    vectorized::Block block;
+    bool eos = false;
+};
+
+class MockAggSinkOperatorX : public AggSinkOperatorX {
+public:
+    MockAggSinkOperatorX() = default;
+    ~MockAggSinkOperatorX() override = default;
+};
+
+class PartitionedAggregationTestHelper : public SpillableOperatorTestHelper {
+public:
+    ~PartitionedAggregationTestHelper() override = default;
+    TPlanNode create_test_plan_node() override;
+    TDescriptorTable create_test_table_descriptor(bool nullable) override;
+
+    PartitionedAggLocalState* create_source_local_state(
+            RuntimeState* state, PartitionedAggSourceOperatorX* 
source_operator,
+            std::shared_ptr<MockPartitionedAggSharedState>& shared_state);
+
+    PartitionedAggSinkLocalState* create_sink_local_state(
+            RuntimeState* state, PartitionedAggSinkOperatorX* sink_operator,
+            std::shared_ptr<MockPartitionedAggSharedState>& shared_state);
+
+    std::tuple<std::shared_ptr<PartitionedAggSourceOperatorX>,
+               std::shared_ptr<PartitionedAggSinkOperatorX>>
+    create_operators();
+};
+} // namespace doris::pipeline
\ No newline at end of file
diff --git 
a/be/test/pipeline/operator/partitioned_hash_join_probe_operator_test.cpp 
b/be/test/pipeline/operator/partitioned_hash_join_probe_operator_test.cpp
index 67378fe66a8..9f2c3b48790 100644
--- a/be/test/pipeline/operator/partitioned_hash_join_probe_operator_test.cpp
+++ b/be/test/pipeline/operator/partitioned_hash_join_probe_operator_test.cpp
@@ -22,11 +22,9 @@
 #include <memory>
 
 #include "common/config.h"
-#include "olap/olap_define.h"
 #include "partitioned_hash_join_test_helper.h"
 #include "pipeline/exec/hashjoin_build_sink.h"
 #include "pipeline/exec/partitioned_hash_join_sink_operator.h"
-#include "pipeline/exec/spill_utils.h"
 #include "pipeline/pipeline_task.h"
 #include "runtime/exec_env.h"
 #include "runtime/fragment_mgr.h"
@@ -34,7 +32,6 @@
 #include "testutil/creators.h"
 #include "testutil/mock/mock_operators.h"
 #include "testutil/mock/mock_runtime_state.h"
-#include "util/debug_points.h"
 #include "util/runtime_profile.h"
 #include "vec/core/block.h"
 #include "vec/data_types/data_type_number.h"
@@ -426,13 +423,7 @@ TEST_F(PartitionedHashJoinProbeOperatorTest, 
RecoverProbeBlocksFromDiskError) {
         ASSERT_TRUE(spilling_stream->spill_eof().ok());
     }
 
-    Status spill_status;
-    ExecEnv::GetInstance()->_fragment_mgr =
-            new MockFragmentManager(spill_status, ExecEnv::GetInstance());
-
-    const auto enable_debug_points = config::enable_debug_points;
-    config::enable_debug_points = true;
-    
DebugPoints::instance()->add("fault_inject::spill_stream::read_next_block");
+    SpillableDebugPointHelper 
dp_helper("fault_inject::spill_stream::read_next_block");
     bool has_data = false;
     ASSERT_TRUE(local_state
                         
->recover_probe_blocks_from_disk(_helper.runtime_state.get(),
@@ -446,12 +437,10 @@ TEST_F(PartitionedHashJoinProbeOperatorTest, 
RecoverProbeBlocksFromDiskError) {
     
ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(spilling_stream);
     spilling_stream.reset();
 
-    config::enable_debug_points = enable_debug_points;
-
-    ASSERT_FALSE(spill_status.ok());
-    ASSERT_TRUE(spill_status.to_string().find("fault_inject spill_stream 
read_next_block") !=
-                std::string::npos)
-            << "unexpected error: " << spill_status.to_string();
+    ASSERT_FALSE(dp_helper.get_spill_status().ok());
+    ASSERT_TRUE(dp_helper.get_spill_status().to_string().find(
+                        "fault_inject spill_stream read_next_block") != 
std::string::npos)
+            << "unexpected error: " << 
dp_helper.get_spill_status().to_string();
 }
 
 TEST_F(PartitionedHashJoinProbeOperatorTest, RecoverBuildBlocksFromDisk) {
@@ -784,14 +773,9 @@ TEST_F(PartitionedHashJoinProbeOperatorTest, 
RecoverBuildBlocksFromDiskError) {
 
     ASSERT_TRUE(local_state->_recovered_build_block == nullptr);
 
-    Status spill_status;
-    ExecEnv::GetInstance()->_fragment_mgr =
-            new MockFragmentManager(spill_status, ExecEnv::GetInstance());
-
-    const auto enable_debug_points = config::enable_debug_points;
-    config::enable_debug_points = true;
     // Test error handling with fault injection
-    
DebugPoints::instance()->add("fault_inject::partitioned_hash_join_probe::recover_build_blocks");
+    SpillableDebugPointHelper dp_helper(
+            "fault_inject::partitioned_hash_join_probe::recover_build_blocks");
     bool has_data = false;
     auto status = 
local_state->recover_build_blocks_from_disk(_helper.runtime_state.get(),
                                                               test_partition, 
has_data);
@@ -801,7 +785,7 @@ TEST_F(PartitionedHashJoinProbeOperatorTest, 
RecoverBuildBlocksFromDiskError) {
         std::this_thread::sleep_for(std::chrono::milliseconds(10));
     }
 
-    config::enable_debug_points = enable_debug_points;
+    auto spill_status = dp_helper.get_spill_status();
     ASSERT_FALSE(spill_status.ok());
     ASSERT_TRUE(spill_status.to_string().find("fault_inject 
partitioned_hash_join_probe "
                                               "recover_build_blocks failed") 
!= std::string::npos)
diff --git 
a/be/test/pipeline/operator/partitioned_hash_join_sink_operator_test.cpp 
b/be/test/pipeline/operator/partitioned_hash_join_sink_operator_test.cpp
index 0514d481d3a..29cb355b0a0 100644
--- a/be/test/pipeline/operator/partitioned_hash_join_sink_operator_test.cpp
+++ b/be/test/pipeline/operator/partitioned_hash_join_sink_operator_test.cpp
@@ -26,33 +26,25 @@
 #include <gtest/gtest.h>
 
 #include <memory>
-#include <sstream>
 #include <vector>
 
 #include "common/config.h"
-#include "common/exception.h"
 #include "partitioned_hash_join_test_helper.h"
 #include "pipeline/common/data_gen_functions/vnumbers_tvf.h"
 #include "pipeline/exec/operator.h"
 #include "pipeline/exec/partitioned_hash_join_probe_operator.h"
 #include "pipeline/pipeline_task.h"
-#include "runtime/define_primitive_type.h"
-#include "runtime/descriptor_helper.h"
 #include "runtime/descriptors.h"
 #include "runtime/exec_env.h"
 #include "runtime/query_context.h"
 #include "runtime/runtime_state.h"
 #include "testutil/column_helper.h"
-#include "testutil/mock/mock_data_stream_sender.h"
-#include "testutil/mock/mock_descriptors.h"
 #include "testutil/mock/mock_operators.h"
 #include "testutil/mock/mock_runtime_state.h"
 #include "util/runtime_profile.h"
 #include "vec/core/block.h"
-#include "vec/data_types/data_type.h"
 #include "vec/data_types/data_type_number.h"
 #include "vec/exprs/vexpr_context.h"
-#include "vec/exprs/vexpr_fwd.h"
 #include "vec/spill/spill_stream_manager.h"
 
 namespace doris::pipeline {
diff --git a/be/test/pipeline/operator/partitioned_hash_join_test_helper.cpp 
b/be/test/pipeline/operator/partitioned_hash_join_test_helper.cpp
index 03f9b3f8ae0..001bcd8e224 100644
--- a/be/test/pipeline/operator/partitioned_hash_join_test_helper.cpp
+++ b/be/test/pipeline/operator/partitioned_hash_join_test_helper.cpp
@@ -26,57 +26,19 @@
 #include <gtest/gtest.h>
 
 #include <memory>
-#include <sstream>
 #include <vector>
 
-namespace doris::pipeline {
-void PartitionedHashJoinTestHelper::SetUp() {
-    runtime_state = std::make_unique<MockRuntimeState>();
-    obj_pool = std::make_unique<ObjectPool>();
-
-    runtime_profile = std::make_shared<RuntimeProfile>("test");
-
-    query_ctx = generate_one_query();
-
-    runtime_state->_query_ctx = query_ctx.get();
-    runtime_state->_query_id = query_ctx->query_id();
-    runtime_state->resize_op_id_to_local_state(-100);
-
-    ADD_TIMER(runtime_profile.get(), "ExecTime");
-    runtime_profile->AddHighWaterMarkCounter("MemoryUsed", TUnit::BYTES, "", 
0);
-
-    auto desc_table = create_test_table_descriptor();
-    auto st = DescriptorTbl::create(obj_pool.get(), desc_table, &desc_tbl);
-    DCHECK(!desc_table.slotDescriptors.empty());
-    EXPECT_TRUE(st.ok()) << "create descriptor table failed: " << 
st.to_string();
-    runtime_state->set_desc_tbl(desc_tbl);
-
-    auto spill_data_dir = 
std::make_unique<vectorized::SpillDataDir>("/tmp/partitioned_join_test",
-                                                                     1024L * 
1024 * 4);
-    st = 
io::global_local_filesystem()->create_directory(spill_data_dir->path(), false);
-    EXPECT_TRUE(st.ok()) << "create directory: " << spill_data_dir->path()
-                         << " failed: " << st.to_string();
-    std::unordered_map<std::string, std::unique_ptr<vectorized::SpillDataDir>> 
data_map;
-    data_map.emplace("test", std::move(spill_data_dir));
-    auto* spill_stream_manager = new 
vectorized::SpillStreamManager(std::move(data_map));
-    ExecEnv::GetInstance()->_spill_stream_mgr = spill_stream_manager;
-    st = spill_stream_manager->init();
-    EXPECT_TRUE(st.ok()) << "init spill stream manager failed: " << 
st.to_string();
-}
-
-void PartitionedHashJoinTestHelper::TearDown() {
-    
ExecEnv::GetInstance()->spill_stream_mgr()->async_cleanup_query(runtime_state->query_id());
-    
doris::ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->wait();
-    doris::ExecEnv::GetInstance()->spill_stream_mgr()->stop();
-    SAFE_DELETE(ExecEnv::GetInstance()->_spill_stream_mgr);
-}
+#include "testutil/creators.h"
+#include "testutil/mock/mock_operators.h"
 
+namespace doris::pipeline {
 TPlanNode PartitionedHashJoinTestHelper::create_test_plan_node() {
     TPlanNode tnode;
     tnode.node_id = 0;
     tnode.node_type = TPlanNodeType::HASH_JOIN_NODE;
     tnode.num_children = 2;
     tnode.hash_join_node.join_op = TJoinOp::INNER_JOIN;
+    tnode.limit = -1;
 
     TEqJoinCondition eq_cond;
     eq_cond.left = TExpr();
@@ -118,6 +80,32 @@ TPlanNode 
PartitionedHashJoinTestHelper::create_test_plan_node() {
     return tnode;
 }
 
+TDescriptorTable PartitionedHashJoinTestHelper::create_test_table_descriptor(
+        bool nullable = false) {
+    TTupleDescriptorBuilder tuple_builder;
+    tuple_builder.add_slot(TSlotDescriptorBuilder()
+                                   .type(PrimitiveType::TYPE_INT)
+                                   .column_name("col1")
+                                   .column_pos(0)
+                                   .nullable(nullable)
+                                   .build());
+
+    TDescriptorTableBuilder builder;
+
+    tuple_builder.build(&builder);
+
+    TTupleDescriptorBuilder()
+            .add_slot(TSlotDescriptorBuilder()
+                              .type(TYPE_INT)
+                              .column_name("col2")
+                              .column_pos(0)
+                              .nullable(nullable)
+                              .build())
+            .build(&builder);
+
+    return builder.desc_tbl();
+}
+
 std::tuple<std::shared_ptr<PartitionedHashJoinProbeOperatorX>,
            std::shared_ptr<PartitionedHashJoinSinkOperatorX>>
 PartitionedHashJoinTestHelper::create_operators() {
@@ -134,8 +122,8 @@ PartitionedHashJoinTestHelper::create_operators() {
     auto child_operator = std::make_shared<MockChildOperator>();
     auto probe_side_source_operator = std::make_shared<MockChildOperator>();
     auto probe_side_sink_operator = std::make_shared<MockSinkOperator>();
-    auto [probe_pipeline, _] = generate_hash_join_pipeline(probe_operator, 
child_operator,
-                                                           
probe_side_sink_operator, sink_operator);
+    auto [probe_pipeline, _] = generate_hash_join_pipeline(probe_operator, 
probe_side_sink_operator,
+                                                           sink_operator, 
child_operator);
 
     RowDescriptor row_desc(runtime_state->desc_tbl(), {1}, {false});
     child_operator->_row_descriptor = row_desc;
diff --git a/be/test/pipeline/operator/partitioned_hash_join_test_helper.h 
b/be/test/pipeline/operator/partitioned_hash_join_test_helper.h
index 95865aea21e..0628a81688d 100644
--- a/be/test/pipeline/operator/partitioned_hash_join_test_helper.h
+++ b/be/test/pipeline/operator/partitioned_hash_join_test_helper.h
@@ -24,75 +24,22 @@
 #include <gtest/gtest.h>
 
 #include <memory>
-#include <sstream>
 #include <vector>
 
 #include "common/config.h"
 #include "common/object_pool.h"
-#include "olap/olap_define.h"
+#include "pipeline/exec/partitioned_hash_join_probe_operator.h"
 #include "pipeline/exec/partitioned_hash_join_sink_operator.h"
-#include "pipeline/exec/spill_utils.h"
 #include "pipeline/pipeline_task.h"
 #include "runtime/exec_env.h"
 #include "runtime/fragment_mgr.h"
-#include "testutil/column_helper.h"
-#include "testutil/creators.h"
-#include "testutil/mock/mock_operators.h"
+#include "spillable_operator_test_helper.h"
 #include "testutil/mock/mock_runtime_state.h"
-#include "util/debug_points.h"
 #include "util/runtime_profile.h"
 #include "vec/core/block.h"
-#include "vec/data_types/data_type_number.h"
 #include "vec/spill/spill_stream_manager.h"
 
 namespace doris::pipeline {
-
-class MockPartitioner : public vectorized::PartitionerBase {
-public:
-    MockPartitioner(size_t partition_count) : PartitionerBase(partition_count) 
{}
-    Status init(const std::vector<TExpr>& texprs) override { return 
Status::OK(); }
-
-    Status prepare(RuntimeState* state, const RowDescriptor& row_desc) 
override {
-        return Status::OK();
-    }
-
-    Status open(RuntimeState* state) override { return Status::OK(); }
-
-    Status close(RuntimeState* state) override { return Status::OK(); }
-
-    Status do_partitioning(RuntimeState* state, vectorized::Block* block, bool 
eos,
-                           bool* already_sent) const override {
-        if (already_sent) {
-            *already_sent = false;
-        }
-        return Status::OK();
-    }
-
-    Status clone(RuntimeState* state, std::unique_ptr<PartitionerBase>& 
partitioner) override {
-        partitioner = std::make_unique<MockPartitioner>(_partition_count);
-        return Status::OK();
-    }
-
-    vectorized::ChannelField get_channel_ids() const override { return {}; }
-};
-
-class MockExpr : public vectorized::VExpr {
-public:
-    Status prepare(RuntimeState* state, const RowDescriptor& row_desc,
-                   vectorized::VExprContext* context) override {
-        return Status::OK();
-    }
-
-    Status open(RuntimeState* state, vectorized::VExprContext* context,
-                FunctionContext::FunctionStateScope scope) override {
-        return Status::OK();
-    }
-};
-
-class MockHashJoinBuildSharedState : public HashJoinSharedState {
-public:
-};
-
 class MockPartitionedHashJoinSharedState : public 
PartitionedHashJoinSharedState {
 public:
     MockPartitionedHashJoinSharedState() {
@@ -164,16 +111,6 @@ public:
     std::string get_memory_usage_debug_str(RuntimeState* state) const override 
{ return "mock"; }
 };
 
-class MockFragmentManager : public FragmentMgr {
-public:
-    MockFragmentManager(Status& status_, ExecEnv* exec_env)
-            : FragmentMgr(exec_env), status(status_) {}
-    void cancel_query(const TUniqueId query_id, const Status reason) override 
{ status = reason; }
-
-private:
-    Status& status;
-};
-
 class MockHashJoinProbeLocalState : public HashJoinProbeLocalState {
     ENABLE_FACTORY_CREATOR(MockHashJoinProbeLocalState);
 
@@ -262,12 +199,12 @@ public:
     void update_profile_from_inner() override {}
 };
 
-class PartitionedHashJoinTestHelper {
+class PartitionedHashJoinTestHelper : public SpillableOperatorTestHelper {
 public:
-    void SetUp();
-    void TearDown();
+    ~PartitionedHashJoinTestHelper() override = default;
+    TPlanNode create_test_plan_node() override;
 
-    TPlanNode create_test_plan_node();
+    TDescriptorTable create_test_table_descriptor(bool nullable) override;
 
     PartitionedHashJoinProbeLocalState* create_probe_local_state(
             RuntimeState* state, PartitionedHashJoinProbeOperatorX* 
probe_operator,
@@ -280,13 +217,5 @@ public:
     std::tuple<std::shared_ptr<PartitionedHashJoinProbeOperatorX>,
                std::shared_ptr<PartitionedHashJoinSinkOperatorX>>
     create_operators();
-
-    std::unique_ptr<MockRuntimeState> runtime_state;
-    std::unique_ptr<ObjectPool> obj_pool;
-    std::shared_ptr<QueryContext> query_ctx;
-    std::shared_ptr<RuntimeProfile> runtime_profile;
-    std::shared_ptr<PipelineTask> pipeline_task;
-    DescriptorTbl* desc_tbl;
-    static constexpr uint32_t TEST_PARTITION_COUNT = 8;
 };
 } // namespace doris::pipeline
\ No newline at end of file
diff --git a/be/test/pipeline/operator/spillable_operator_test_helper.cpp 
b/be/test/pipeline/operator/spillable_operator_test_helper.cpp
new file mode 100644
index 00000000000..b3ebba37aa1
--- /dev/null
+++ b/be/test/pipeline/operator/spillable_operator_test_helper.cpp
@@ -0,0 +1,76 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "spillable_operator_test_helper.h"
+
+#include <gen_cpp/DataSinks_types.h>
+#include <gen_cpp/Descriptors_types.h>
+#include <gmock/gmock-actions.h>
+#include <gmock/gmock-function-mocker.h>
+#include <gmock/gmock-spec-builders.h>
+#include <gmock/gmock.h>
+#include <gtest/gtest.h>
+
+#include <memory>
+#include <vector>
+
+#include "testutil/creators.h"
+
+namespace doris::pipeline {
+void SpillableOperatorTestHelper::SetUp() {
+    runtime_state = std::make_unique<MockRuntimeState>();
+    obj_pool = std::make_unique<ObjectPool>();
+
+    runtime_profile = std::make_shared<RuntimeProfile>("test");
+
+    query_ctx = generate_one_query();
+
+    runtime_state->_query_ctx = query_ctx.get();
+    runtime_state->_query_id = query_ctx->query_id();
+    runtime_state->resize_op_id_to_local_state(-100);
+    runtime_state->set_max_operator_id(-100);
+
+    ADD_TIMER(runtime_profile.get(), "ExecTime");
+    runtime_profile->AddHighWaterMarkCounter("MemoryUsed", TUnit::BYTES, "", 
0);
+
+    auto desc_table = create_test_table_descriptor(false);
+    auto st = DescriptorTbl::create(obj_pool.get(), desc_table, &desc_tbl);
+    DCHECK(!desc_table.slotDescriptors.empty());
+    EXPECT_TRUE(st.ok()) << "create descriptor table failed: " << 
st.to_string();
+    runtime_state->set_desc_tbl(desc_tbl);
+
+    auto spill_data_dir = 
std::make_unique<vectorized::SpillDataDir>("/tmp/partitioned_join_test",
+                                                                     1024L * 
1024 * 4);
+    st = 
io::global_local_filesystem()->create_directory(spill_data_dir->path(), false);
+    EXPECT_TRUE(st.ok()) << "create directory: " << spill_data_dir->path()
+                         << " failed: " << st.to_string();
+    std::unordered_map<std::string, std::unique_ptr<vectorized::SpillDataDir>> 
data_map;
+    data_map.emplace("test", std::move(spill_data_dir));
+    auto* spill_stream_manager = new 
vectorized::SpillStreamManager(std::move(data_map));
+    ExecEnv::GetInstance()->_spill_stream_mgr = spill_stream_manager;
+    st = spill_stream_manager->init();
+    EXPECT_TRUE(st.ok()) << "init spill stream manager failed: " << 
st.to_string();
+}
+
+void SpillableOperatorTestHelper::TearDown() {
+    
ExecEnv::GetInstance()->spill_stream_mgr()->async_cleanup_query(runtime_state->query_id());
+    
doris::ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->wait();
+    doris::ExecEnv::GetInstance()->spill_stream_mgr()->stop();
+    SAFE_DELETE(ExecEnv::GetInstance()->_spill_stream_mgr);
+}
+
+} // namespace doris::pipeline
\ No newline at end of file
diff --git a/be/test/pipeline/operator/spillable_operator_test_helper.h 
b/be/test/pipeline/operator/spillable_operator_test_helper.h
new file mode 100644
index 00000000000..2067412ed3c
--- /dev/null
+++ b/be/test/pipeline/operator/spillable_operator_test_helper.h
@@ -0,0 +1,132 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gen_cpp/DataSinks_types.h>
+#include <gen_cpp/Descriptors_types.h>
+#include <gmock/gmock-actions.h>
+#include <gmock/gmock-function-mocker.h>
+#include <gmock/gmock-spec-builders.h>
+#include <gmock/gmock.h>
+#include <gtest/gtest.h>
+
+#include <memory>
+
+#include "common/object_pool.h"
+#include "pipeline/pipeline_task.h"
+#include "runtime/fragment_mgr.h"
+#include "testutil/mock/mock_runtime_state.h"
+#include "util/runtime_profile.h"
+#include "vec/spill/spill_stream_manager.h"
+
+namespace doris::pipeline {
+
+class MockPartitioner : public vectorized::PartitionerBase {
+public:
+    MockPartitioner(size_t partition_count) : PartitionerBase(partition_count) 
{}
+    Status init(const std::vector<TExpr>& texprs) override { return 
Status::OK(); }
+
+    Status prepare(RuntimeState* state, const RowDescriptor& row_desc) 
override {
+        return Status::OK();
+    }
+
+    Status open(RuntimeState* state) override { return Status::OK(); }
+
+    Status close(RuntimeState* state) override { return Status::OK(); }
+
+    Status do_partitioning(RuntimeState* state, vectorized::Block* block, bool 
eos,
+                           bool* already_sent) const override {
+        if (already_sent) {
+            *already_sent = false;
+        }
+        return Status::OK();
+    }
+
+    Status clone(RuntimeState* state, std::unique_ptr<PartitionerBase>& 
partitioner) override {
+        partitioner = std::make_unique<MockPartitioner>(_partition_count);
+        return Status::OK();
+    }
+
+    vectorized::ChannelField get_channel_ids() const override { return {}; }
+};
+
+class MockExpr : public vectorized::VExpr {
+public:
+    Status prepare(RuntimeState* state, const RowDescriptor& row_desc,
+                   vectorized::VExprContext* context) override {
+        return Status::OK();
+    }
+
+    Status open(RuntimeState* state, vectorized::VExprContext* context,
+                FunctionContext::FunctionStateScope scope) override {
+        return Status::OK();
+    }
+};
+
+class MockFragmentManager : public FragmentMgr {
+public:
+    MockFragmentManager(Status& status_, ExecEnv* exec_env)
+            : FragmentMgr(exec_env), status(status_) {}
+    void cancel_query(const TUniqueId query_id, const Status reason) override 
{ status = reason; }
+
+private:
+    Status& status;
+};
+
+class SpillableDebugPointHelper {
+public:
+    SpillableDebugPointHelper(const std::string name)
+            : _enable_debug_points(config::enable_debug_points),
+              _fragment_mgr(ExecEnv::GetInstance()->_fragment_mgr) {
+        config::enable_debug_points = true;
+        ExecEnv::GetInstance()->_fragment_mgr =
+                new MockFragmentManager(_spill_status, ExecEnv::GetInstance());
+        DebugPoints::instance()->add(name);
+    }
+
+    ~SpillableDebugPointHelper() {
+        config::enable_debug_points = _enable_debug_points;
+        ExecEnv::GetInstance()->_fragment_mgr->stop();
+        SAFE_DELETE(ExecEnv::GetInstance()->_fragment_mgr);
+        ExecEnv::GetInstance()->_fragment_mgr = _fragment_mgr;
+    }
+
+    const Status& get_spill_status() const { return _spill_status; }
+
+private:
+    Status _spill_status;
+    const bool _enable_debug_points;
+    FragmentMgr* const _fragment_mgr;
+};
+
+class SpillableOperatorTestHelper {
+public:
+    virtual ~SpillableOperatorTestHelper() = default;
+    void SetUp();
+    void TearDown();
+
+    virtual TPlanNode create_test_plan_node() = 0;
+    virtual TDescriptorTable create_test_table_descriptor(bool nullable) = 0;
+
+    std::unique_ptr<MockRuntimeState> runtime_state;
+    std::unique_ptr<ObjectPool> obj_pool;
+    std::shared_ptr<QueryContext> query_ctx;
+    std::shared_ptr<RuntimeProfile> runtime_profile;
+    std::shared_ptr<PipelineTask> pipeline_task;
+    DescriptorTbl* desc_tbl;
+    static constexpr uint32_t TEST_PARTITION_COUNT = 8;
+};
+} // namespace doris::pipeline
\ No newline at end of file
diff --git a/be/test/testutil/creators.h b/be/test/testutil/creators.h
index 91064ade29e..db0ee465b60 100644
--- a/be/test/testutil/creators.h
+++ b/be/test/testutil/creators.h
@@ -55,35 +55,10 @@ inline std::shared_ptr<QueryContext> generate_one_query() {
     return generate_one_query(query_options);
 }
 
-inline TDescriptorTable create_test_table_descriptor(bool nullable = false) {
-    TTupleDescriptorBuilder tuple_builder;
-    tuple_builder.add_slot(TSlotDescriptorBuilder()
-                                   .type(PrimitiveType::TYPE_INT)
-                                   .column_name("col1")
-                                   .column_pos(0)
-                                   .nullable(nullable)
-                                   .build());
-
-    TDescriptorTableBuilder builder;
-
-    tuple_builder.build(&builder);
-
-    TTupleDescriptorBuilder()
-            .add_slot(TSlotDescriptorBuilder()
-                              .type(TYPE_INT)
-                              .column_name("col2")
-                              .column_pos(0)
-                              .nullable(nullable)
-                              .build())
-            .build(&builder);
-
-    return builder.desc_tbl();
-}
-
 inline std::pair<pipeline::PipelinePtr, pipeline::PipelinePtr> 
generate_hash_join_pipeline(
         std::shared_ptr<OperatorXBase> probe_operator,
-        std::shared_ptr<OperatorXBase> build_side_source,
-        pipeline::DataSinkOperatorPtr probe_side_sink_operator, 
DataSinkOperatorPtr sink_operator) {
+        pipeline::DataSinkOperatorPtr probe_side_sink_operator, 
DataSinkOperatorPtr sink_operator,
+        std::shared_ptr<OperatorXBase> build_side_source) {
     auto probe_pipeline = std::make_shared<pipeline::Pipeline>(0, 1, 1);
     auto build_pipeline = std::make_shared<pipeline::Pipeline>(1, 1, 1);
 
@@ -95,6 +70,21 @@ inline std::pair<pipeline::PipelinePtr, 
pipeline::PipelinePtr> generate_hash_joi
     return {probe_pipeline, build_pipeline};
 }
 
+inline std::pair<pipeline::PipelinePtr, pipeline::PipelinePtr> 
generate_agg_pipeline(
+        std::shared_ptr<OperatorXBase> source_operator,
+        pipeline::DataSinkOperatorPtr source_side_sink_operator, 
DataSinkOperatorPtr sink_operator,
+        std::shared_ptr<OperatorXBase> sink_side_source) {
+    auto source_pipeline = std::make_shared<pipeline::Pipeline>(0, 1, 1);
+    auto sink_pipeline = std::make_shared<pipeline::Pipeline>(1, 1, 1);
+
+    static_cast<void>(source_pipeline->add_operator(source_operator, 1));
+    static_cast<void>(source_pipeline->set_sink(source_side_sink_operator));
+    static_cast<void>(sink_pipeline->add_operator(sink_side_source, 1));
+    static_cast<void>(sink_pipeline->set_sink(sink_operator));
+
+    return {source_pipeline, sink_pipeline};
+}
+
 inline std::unique_ptr<SpillPartitionerType> create_spill_partitioner(
         RuntimeState* state, const int32_t partition_count, const 
std::vector<TExpr>& exprs,
         const RowDescriptor& row_desc) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to