This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new ab69416922 [Bug](pipelineX) fix streaming agg (#24449)
ab69416922 is described below

commit ab69416922464da09c05a7ae7044cfc6a0b1716a
Author: Gabriel <gabrielleeb...@gmail.com>
AuthorDate: Fri Sep 15 19:22:54 2023 +0800

    [Bug](pipelineX) fix streaming agg (#24449)
    
    fix streaming agg
---
 be/src/pipeline/exec/aggregation_sink_operator.cpp |   2 +-
 .../pipeline/exec/aggregation_source_operator.cpp  | 264 ++++++++++-----------
 be/src/pipeline/exec/aggregation_source_operator.h |  20 +-
 be/src/pipeline/exec/data_queue.cpp                |   2 +-
 be/src/pipeline/exec/data_queue.h                  |   6 +-
 be/src/pipeline/exec/scan_operator.cpp             |   4 +-
 .../exec/streaming_aggregation_sink_operator.h     |   4 +-
 .../exec/streaming_aggregation_source_operator.cpp |  24 +-
 .../exec/streaming_aggregation_source_operator.h   |  30 +--
 be/src/pipeline/pipeline_x/dependency.h            |  22 +-
 be/src/pipeline/pipeline_x/operator.cpp            |   4 +-
 be/src/pipeline/pipeline_x/operator.h              |   1 -
 .../ssb_sf0.1_p1/sql/pipelinex_flat_q1.1.sql       |   2 +-
 .../ssb_sf0.1_p1/sql/pipelinex_flat_q1.2.sql       |   2 +-
 .../ssb_sf0.1_p1/sql/pipelinex_flat_q1.3.sql       |   2 +-
 .../ssb_sf0.1_p1/sql/pipelinex_flat_q2.1.sql       |   2 +-
 .../ssb_sf0.1_p1/sql/pipelinex_flat_q2.2.sql       |   2 +-
 .../ssb_sf0.1_p1/sql/pipelinex_flat_q2.3.sql       |   2 +-
 .../ssb_sf0.1_p1/sql/pipelinex_flat_q3.1.sql       |   2 +-
 .../ssb_sf0.1_p1/sql/pipelinex_flat_q3.2.sql       |   2 +-
 .../ssb_sf0.1_p1/sql/pipelinex_flat_q3.3.sql       |   2 +-
 .../ssb_sf0.1_p1/sql/pipelinex_flat_q3.4.sql       |   2 +-
 .../ssb_sf0.1_p1/sql/pipelinex_flat_q4.1.sql       |   2 +-
 .../ssb_sf0.1_p1/sql/pipelinex_flat_q4.2.sql       |   2 +-
 .../ssb_sf0.1_p1/sql/pipelinex_flat_q4.3.sql       |   2 +-
 .../suites/ssb_sf0.1_p1/sql/pipelinex_q1.1.sql     |   2 +-
 .../suites/ssb_sf0.1_p1/sql/pipelinex_q1.2.sql     |   2 +-
 .../suites/ssb_sf0.1_p1/sql/pipelinex_q1.3.sql     |   2 +-
 .../suites/ssb_sf0.1_p1/sql/pipelinex_q2.1.sql     |   2 +-
 .../suites/ssb_sf0.1_p1/sql/pipelinex_q2.2.sql     |   2 +-
 .../suites/ssb_sf0.1_p1/sql/pipelinex_q2.3.sql     |   2 +-
 .../suites/ssb_sf0.1_p1/sql/pipelinex_q3.1.sql     |   2 +-
 .../suites/ssb_sf0.1_p1/sql/pipelinex_q3.2.sql     |   2 +-
 .../suites/ssb_sf0.1_p1/sql/pipelinex_q3.3.sql     |   2 +-
 .../suites/ssb_sf0.1_p1/sql/pipelinex_q3.4.sql     |   2 +-
 .../suites/ssb_sf0.1_p1/sql/pipelinex_q4.1.sql     |   2 +-
 .../suites/ssb_sf0.1_p1/sql/pipelinex_q4.2.sql     |   2 +-
 .../suites/ssb_sf0.1_p1/sql/pipelinex_q4.3.sql     |   2 +-
 .../tpch_unique_sql_zstd_p0/sql/pipelinex_q01.sql  |   2 +-
 .../tpch_unique_sql_zstd_p0/sql/pipelinex_q02.sql  |   2 +-
 .../tpch_unique_sql_zstd_p0/sql/pipelinex_q03.sql  |   2 +-
 .../tpch_unique_sql_zstd_p0/sql/pipelinex_q04.sql  |   2 +-
 .../tpch_unique_sql_zstd_p0/sql/pipelinex_q05.sql  |   2 +-
 .../tpch_unique_sql_zstd_p0/sql/pipelinex_q06.sql  |   2 +-
 .../tpch_unique_sql_zstd_p0/sql/pipelinex_q07.sql  |   2 +-
 .../tpch_unique_sql_zstd_p0/sql/pipelinex_q08.sql  |   2 +-
 .../tpch_unique_sql_zstd_p0/sql/pipelinex_q09.sql  |   2 +-
 .../tpch_unique_sql_zstd_p0/sql/pipelinex_q10.sql  |   2 +-
 .../tpch_unique_sql_zstd_p0/sql/pipelinex_q11.sql  |   2 +-
 .../tpch_unique_sql_zstd_p0/sql/pipelinex_q12.sql  |   2 +-
 .../tpch_unique_sql_zstd_p0/sql/pipelinex_q13.sql  |   2 +-
 .../tpch_unique_sql_zstd_p0/sql/pipelinex_q14.sql  |   2 +-
 .../tpch_unique_sql_zstd_p0/sql/pipelinex_q15.sql  |   2 +-
 .../tpch_unique_sql_zstd_p0/sql/pipelinex_q16.sql  |   2 +-
 .../tpch_unique_sql_zstd_p0/sql/pipelinex_q17.sql  |   2 +-
 .../tpch_unique_sql_zstd_p0/sql/pipelinex_q18.sql  |   2 +-
 .../tpch_unique_sql_zstd_p0/sql/pipelinex_q19.sql  |   2 +-
 .../tpch_unique_sql_zstd_p0/sql/pipelinex_q20.sql  |   2 +-
 .../tpch_unique_sql_zstd_p0/sql/pipelinex_q21.sql  |   2 +-
 .../tpch_unique_sql_zstd_p0/sql/pipelinex_q22.sql  |   2 +-
 60 files changed, 192 insertions(+), 287 deletions(-)

diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp 
b/be/src/pipeline/exec/aggregation_sink_operator.cpp
index b30bb9b815..cc0eb3be1d 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp
@@ -947,6 +947,6 @@ class StreamingAggSinkLocalState;
 template class AggSinkOperatorX<BlockingAggSinkLocalState>;
 template class AggSinkOperatorX<StreamingAggSinkLocalState>;
 template class AggSinkLocalState<AggDependency, BlockingAggSinkLocalState>;
-template class AggSinkLocalState<StreamingAggDependency, 
StreamingAggSinkLocalState>;
+template class AggSinkLocalState<AggDependency, StreamingAggSinkLocalState>;
 
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/aggregation_source_operator.cpp 
b/be/src/pipeline/exec/aggregation_source_operator.cpp
index 476683f8b8..16125c424a 100644
--- a/be/src/pipeline/exec/aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_source_operator.cpp
@@ -28,8 +28,7 @@ namespace pipeline {
 
 OPERATOR_CODE_GENERATOR(AggSourceOperator, SourceOperator)
 
-template <typename DependencyType, typename Derived>
-AggLocalState<DependencyType, Derived>::AggLocalState(RuntimeState* state, 
OperatorXBase* parent)
+AggLocalState::AggLocalState(RuntimeState* state, OperatorXBase* parent)
         : Base(state, parent),
           _get_results_timer(nullptr),
           _serialize_result_timer(nullptr),
@@ -38,49 +37,47 @@ AggLocalState<DependencyType, 
Derived>::AggLocalState(RuntimeState* state, Opera
           _serialize_data_timer(nullptr),
           _hash_table_size_counter(nullptr) {}
 
-template <typename DependencyType, typename Derived>
-Status AggLocalState<DependencyType, Derived>::init(RuntimeState* state, 
LocalStateInfo& info) {
+Status AggLocalState::init(RuntimeState* state, LocalStateInfo& info) {
     RETURN_IF_ERROR(Base::init(state, info));
     SCOPED_TIMER(Base::profile()->total_time_counter());
-    _agg_data = Base::_shared_state->agg_data.get();
+    _agg_data = _shared_state->agg_data.get();
     _get_results_timer = ADD_TIMER(Base::profile(), "GetResultsTime");
     _serialize_result_timer = ADD_TIMER(Base::profile(), 
"SerializeResultTime");
     _hash_table_iterate_timer = ADD_TIMER(Base::profile(), 
"HashTableIterateTime");
     _insert_keys_to_column_timer = ADD_TIMER(Base::profile(), 
"InsertKeysToColumnTime");
     _serialize_data_timer = ADD_TIMER(Base::profile(), "SerializeDataTime");
     _hash_table_size_counter = ADD_COUNTER(Base::profile(), "HashTableSize", 
TUnit::UNIT);
-    auto& p = Base::_parent->template cast<typename Derived::Parent>();
+    auto& p = _parent->template cast<AggSourceOperatorX>();
     if (p._without_key) {
         if (p._needs_finalize) {
-            _executor.get_result = 
std::bind<Status>(&Derived::_get_without_key_result,
-                                                     (Derived*)this, 
std::placeholders::_1,
-                                                     std::placeholders::_2, 
std::placeholders::_3);
+            _executor.get_result = 
std::bind<Status>(&AggLocalState::_get_without_key_result, this,
+                                                     std::placeholders::_1, 
std::placeholders::_2,
+                                                     std::placeholders::_3);
         } else {
-            _executor.get_result = 
std::bind<Status>(&Derived::_serialize_without_key,
-                                                     (Derived*)this, 
std::placeholders::_1,
-                                                     std::placeholders::_2, 
std::placeholders::_3);
+            _executor.get_result = 
std::bind<Status>(&AggLocalState::_serialize_without_key, this,
+                                                     std::placeholders::_1, 
std::placeholders::_2,
+                                                     std::placeholders::_3);
         }
 
-        _executor.close = std::bind<void>(&Derived::_close_without_key, 
(Derived*)this);
+        _executor.close = std::bind<void>(&AggLocalState::_close_without_key, 
this);
     } else {
         if (p._needs_finalize) {
-            _executor.get_result = 
std::bind<Status>(&Derived::_get_with_serialized_key_result,
-                                                     (Derived*)this, 
std::placeholders::_1,
-                                                     std::placeholders::_2, 
std::placeholders::_3);
+            _executor.get_result = std::bind<Status>(
+                    &AggLocalState::_get_with_serialized_key_result, this, 
std::placeholders::_1,
+                    std::placeholders::_2, std::placeholders::_3);
         } else {
             _executor.get_result = std::bind<Status>(
-                    &Derived::_serialize_with_serialized_key_result, 
(Derived*)this,
+                    &AggLocalState::_serialize_with_serialized_key_result, 
this,
                     std::placeholders::_1, std::placeholders::_2, 
std::placeholders::_3);
         }
-        _executor.close = 
std::bind<void>(&Derived::_close_with_serialized_key, (Derived*)this);
+        _executor.close = 
std::bind<void>(&AggLocalState::_close_with_serialized_key, this);
     }
 
     _agg_data_created_without_key = p._without_key;
     return Status::OK();
 }
 
-template <typename DependencyType, typename Derived>
-void AggLocalState<DependencyType, Derived>::_close_with_serialized_key() {
+void AggLocalState::_close_with_serialized_key() {
     std::visit(
             [&](auto&& agg_method) -> void {
                 auto& data = agg_method.data;
@@ -98,8 +95,7 @@ void AggLocalState<DependencyType, 
Derived>::_close_with_serialized_key() {
     Base::_dependency->release_tracker();
 }
 
-template <typename DependencyType, typename Derived>
-void AggLocalState<DependencyType, Derived>::_close_without_key() {
+void AggLocalState::_close_without_key() {
     //because prepare maybe failed, and couldn't create agg data.
     //but finally call close to destory agg data, if agg data has bitmapValue
     //will be core dump, it's not initialized
@@ -110,39 +106,37 @@ void AggLocalState<DependencyType, 
Derived>::_close_without_key() {
     Base::_dependency->release_tracker();
 }
 
-template <typename DependencyType, typename Derived>
-Status AggLocalState<DependencyType, 
Derived>::_serialize_with_serialized_key_result(
-        RuntimeState* state, vectorized::Block* block, SourceState& 
source_state) {
-    if (Base::_shared_state->spill_context.has_data) {
+Status AggLocalState::_serialize_with_serialized_key_result(RuntimeState* 
state,
+                                                            vectorized::Block* 
block,
+                                                            SourceState& 
source_state) {
+    if (_shared_state->spill_context.has_data) {
         return _serialize_with_serialized_key_result_with_spilt_data(state, 
block, source_state);
     } else {
         return _serialize_with_serialized_key_result_non_spill(state, block, 
source_state);
     }
 }
 
-template <typename DependencyType, typename Derived>
-Status
-AggLocalState<DependencyType, 
Derived>::_serialize_with_serialized_key_result_with_spilt_data(
+Status AggLocalState::_serialize_with_serialized_key_result_with_spilt_data(
         RuntimeState* state, vectorized::Block* block, SourceState& 
source_state) {
-    CHECK(!Base::_shared_state->spill_context.stream_ids.empty());
-    CHECK(Base::_shared_state->spill_partition_helper != nullptr)
+    CHECK(!_shared_state->spill_context.stream_ids.empty());
+    CHECK(_shared_state->spill_partition_helper != nullptr)
             << "_spill_partition_helper should not be null";
-    Base::_shared_state->aggregate_data_container->init_once();
-    while (Base::_shared_state->aggregate_data_container->iterator ==
-           Base::_shared_state->aggregate_data_container->end()) {
-        if (Base::_shared_state->spill_context.read_cursor ==
-            Base::_shared_state->spill_partition_helper->partition_count) {
+    _shared_state->aggregate_data_container->init_once();
+    while (_shared_state->aggregate_data_container->iterator ==
+           _shared_state->aggregate_data_container->end()) {
+        if (_shared_state->spill_context.read_cursor ==
+            _shared_state->spill_partition_helper->partition_count) {
             break;
         }
         RETURN_IF_ERROR(Base::_dependency->reset_hash_table());
         RETURN_IF_ERROR(Base::_dependency->merge_spilt_data());
-        Base::_shared_state->aggregate_data_container->init_once();
+        _shared_state->aggregate_data_container->init_once();
     }
 
     RETURN_IF_ERROR(_serialize_with_serialized_key_result_non_spill(state, 
block, source_state));
     if (source_state == SourceState::FINISHED) {
-        source_state = Base::_shared_state->spill_context.read_cursor ==
-                                       
Base::_shared_state->spill_partition_helper->partition_count
+        source_state = _shared_state->spill_context.read_cursor ==
+                                       
_shared_state->spill_partition_helper->partition_count
                                ? SourceState::FINISHED
                                : SourceState::DEPEND_ON_SOURCE;
     }
@@ -150,12 +144,12 @@ AggLocalState<DependencyType, 
Derived>::_serialize_with_serialized_key_result_wi
     return Status::OK();
 }
 
-template <typename DependencyType, typename Derived>
-Status AggLocalState<DependencyType, 
Derived>::_serialize_with_serialized_key_result_non_spill(
-        RuntimeState* state, vectorized::Block* block, SourceState& 
source_state) {
+Status 
AggLocalState::_serialize_with_serialized_key_result_non_spill(RuntimeState* 
state,
+                                                                      
vectorized::Block* block,
+                                                                      
SourceState& source_state) {
     SCOPED_TIMER(_serialize_result_timer);
-    int key_size = Base::_shared_state->probe_expr_ctxs.size();
-    int agg_size = Base::_shared_state->aggregate_evaluators.size();
+    int key_size = _shared_state->probe_expr_ctxs.size();
+    int agg_size = _shared_state->aggregate_evaluators.size();
     vectorized::MutableColumns value_columns(agg_size);
     vectorized::DataTypes value_data_types(agg_size);
 
@@ -168,7 +162,7 @@ Status AggLocalState<DependencyType, 
Derived>::_serialize_with_serialized_key_re
             
key_columns.emplace_back(std::move(*block->get_by_position(i).column).mutate());
         } else {
             key_columns.emplace_back(
-                    
Base::_shared_state->probe_expr_ctxs[i]->root()->data_type()->create_column());
+                    
_shared_state->probe_expr_ctxs[i]->root()->data_type()->create_column());
         }
     }
 
@@ -180,20 +174,20 @@ Status AggLocalState<DependencyType, 
Derived>::_serialize_with_serialized_key_re
                 const auto size = std::min(data.size(), 
size_t(state->batch_size()));
                 using KeyType = 
std::decay_t<decltype(agg_method.iterator->get_first())>;
                 std::vector<KeyType> keys(size);
-                if (Base::_shared_state->values.size() < size + 1) {
-                    Base::_shared_state->values.resize(size + 1);
+                if (_shared_state->values.size() < size + 1) {
+                    _shared_state->values.resize(size + 1);
                 }
 
                 size_t num_rows = 0;
-                Base::_shared_state->aggregate_data_container->init_once();
-                auto& iter = 
Base::_shared_state->aggregate_data_container->iterator;
+                _shared_state->aggregate_data_container->init_once();
+                auto& iter = _shared_state->aggregate_data_container->iterator;
 
                 {
                     SCOPED_TIMER(_hash_table_iterate_timer);
-                    while (iter != 
Base::_shared_state->aggregate_data_container->end() &&
+                    while (iter != 
_shared_state->aggregate_data_container->end() &&
                            num_rows < state->batch_size()) {
                         keys[num_rows] = iter.template get_key<KeyType>();
-                        Base::_shared_state->values[num_rows] = 
iter.get_aggregate_data();
+                        _shared_state->values[num_rows] = 
iter.get_aggregate_data();
                         ++iter;
                         ++num_rows;
                     }
@@ -202,10 +196,10 @@ Status AggLocalState<DependencyType, 
Derived>::_serialize_with_serialized_key_re
                 {
                     SCOPED_TIMER(_insert_keys_to_column_timer);
                     agg_method.insert_keys_into_columns(keys, key_columns, 
num_rows,
-                                                        
Base::_shared_state->probe_key_sz);
+                                                        
_shared_state->probe_key_sz);
                 }
 
-                if (iter == 
Base::_shared_state->aggregate_data_container->end()) {
+                if (iter == _shared_state->aggregate_data_container->end()) {
                     if (agg_method.data.has_null_key_data()) {
                         // only one key of group by support wrap null key
                         // here need additional processing logic on the null 
key / value
@@ -213,8 +207,7 @@ Status AggLocalState<DependencyType, 
Derived>::_serialize_with_serialized_key_re
                         DCHECK(key_columns[0]->is_nullable());
                         if (agg_method.data.has_null_key_data()) {
                             key_columns[0]->insert_data(nullptr, 0);
-                            Base::_shared_state->values[num_rows] =
-                                    agg_method.data.get_null_key_data();
+                            _shared_state->values[num_rows] = 
agg_method.data.get_null_key_data();
                             ++num_rows;
                             source_state = SourceState::FINISHED;
                         }
@@ -225,8 +218,8 @@ Status AggLocalState<DependencyType, 
Derived>::_serialize_with_serialized_key_re
 
                 {
                     SCOPED_TIMER(_serialize_data_timer);
-                    for (size_t i = 0; i < 
Base::_shared_state->aggregate_evaluators.size(); ++i) {
-                        value_data_types[i] = 
Base::_shared_state->aggregate_evaluators[i]
+                    for (size_t i = 0; i < 
_shared_state->aggregate_evaluators.size(); ++i) {
+                        value_data_types[i] = 
_shared_state->aggregate_evaluators[i]
                                                       ->function()
                                                       ->get_serialized_type();
                         if (mem_reuse) {
@@ -234,16 +227,14 @@ Status AggLocalState<DependencyType, 
Derived>::_serialize_with_serialized_key_re
                                     std::move(*block->get_by_position(i + 
key_size).column)
                                             .mutate();
                         } else {
-                            value_columns[i] = 
Base::_shared_state->aggregate_evaluators[i]
+                            value_columns[i] = 
_shared_state->aggregate_evaluators[i]
                                                        ->function()
                                                        
->create_serialize_column();
                         }
-                        Base::_shared_state->aggregate_evaluators[i]
-                                ->function()
-                                ->serialize_to_column(
-                                        Base::_shared_state->values,
-                                        
Base::_dependency->offsets_of_aggregate_states()[i],
-                                        value_columns[i], num_rows);
+                        
_shared_state->aggregate_evaluators[i]->function()->serialize_to_column(
+                                _shared_state->values,
+                                
Base::_dependency->offsets_of_aggregate_states()[i],
+                                value_columns[i], num_rows);
                     }
                 }
             },
@@ -254,8 +245,8 @@ Status AggLocalState<DependencyType, 
Derived>::_serialize_with_serialized_key_re
         for (int i = 0; i < key_size; ++i) {
             columns_with_schema.emplace_back(
                     std::move(key_columns[i]),
-                    
Base::_shared_state->probe_expr_ctxs[i]->root()->data_type(),
-                    
Base::_shared_state->probe_expr_ctxs[i]->root()->expr_name());
+                    _shared_state->probe_expr_ctxs[i]->root()->data_type(),
+                    _shared_state->probe_expr_ctxs[i]->root()->expr_name());
         }
         for (int i = 0; i < agg_size; ++i) {
             columns_with_schema.emplace_back(std::move(value_columns[i]), 
value_data_types[i], "");
@@ -266,38 +257,36 @@ Status AggLocalState<DependencyType, 
Derived>::_serialize_with_serialized_key_re
     return Status::OK();
 }
 
-template <typename DependencyType, typename Derived>
-Status AggLocalState<DependencyType, Derived>::_get_with_serialized_key_result(
-        RuntimeState* state, vectorized::Block* block, SourceState& 
source_state) {
-    if (Base::_shared_state->spill_context.has_data) {
+Status AggLocalState::_get_with_serialized_key_result(RuntimeState* state, 
vectorized::Block* block,
+                                                      SourceState& 
source_state) {
+    if (_shared_state->spill_context.has_data) {
         return _get_result_with_spilt_data(state, block, source_state);
     } else {
         return _get_result_with_serialized_key_non_spill(state, block, 
source_state);
     }
 }
 
-template <typename DependencyType, typename Derived>
-Status AggLocalState<DependencyType, Derived>::_get_result_with_spilt_data(
-        RuntimeState* state, vectorized::Block* block, SourceState& 
source_state) {
-    CHECK(!Base::_shared_state->spill_context.stream_ids.empty());
-    CHECK(Base::_shared_state->spill_partition_helper != nullptr)
+Status AggLocalState::_get_result_with_spilt_data(RuntimeState* state, 
vectorized::Block* block,
+                                                  SourceState& source_state) {
+    CHECK(!_shared_state->spill_context.stream_ids.empty());
+    CHECK(_shared_state->spill_partition_helper != nullptr)
             << "_spill_partition_helper should not be null";
-    Base::_shared_state->aggregate_data_container->init_once();
-    while (Base::_shared_state->aggregate_data_container->iterator ==
-           Base::_shared_state->aggregate_data_container->end()) {
-        if (Base::_shared_state->spill_context.read_cursor ==
-            Base::_shared_state->spill_partition_helper->partition_count) {
+    _shared_state->aggregate_data_container->init_once();
+    while (_shared_state->aggregate_data_container->iterator ==
+           _shared_state->aggregate_data_container->end()) {
+        if (_shared_state->spill_context.read_cursor ==
+            _shared_state->spill_partition_helper->partition_count) {
             break;
         }
         RETURN_IF_ERROR(Base::_dependency->reset_hash_table());
         RETURN_IF_ERROR(Base::_dependency->merge_spilt_data());
-        Base::_shared_state->aggregate_data_container->init_once();
+        _shared_state->aggregate_data_container->init_once();
     }
 
     RETURN_IF_ERROR(_get_result_with_serialized_key_non_spill(state, block, 
source_state));
     if (source_state == SourceState::FINISHED) {
-        source_state = Base::_shared_state->spill_context.read_cursor ==
-                                       
Base::_shared_state->spill_partition_helper->partition_count
+        source_state = _shared_state->spill_context.read_cursor ==
+                                       
_shared_state->spill_partition_helper->partition_count
                                ? SourceState::FINISHED
                                : SourceState::DEPEND_ON_SOURCE;
     }
@@ -305,15 +294,15 @@ Status AggLocalState<DependencyType, 
Derived>::_get_result_with_spilt_data(
     return Status::OK();
 }
 
-template <typename DependencyType, typename Derived>
-Status AggLocalState<DependencyType, 
Derived>::_get_result_with_serialized_key_non_spill(
-        RuntimeState* state, vectorized::Block* block, SourceState& 
source_state) {
+Status AggLocalState::_get_result_with_serialized_key_non_spill(RuntimeState* 
state,
+                                                                
vectorized::Block* block,
+                                                                SourceState& 
source_state) {
     // non-nullable column(id in `_make_nullable_keys`) will be converted to 
nullable.
     bool mem_reuse = Base::_dependency->make_nullable_keys().empty() && 
block->mem_reuse();
 
     auto columns_with_schema = 
vectorized::VectorizedUtils::create_columns_with_type_and_name(
-            Base::_parent->template cast<typename 
Derived::Parent>()._row_descriptor);
-    int key_size = Base::_shared_state->probe_expr_ctxs.size();
+            _parent->cast<AggSourceOperatorX>()._row_descriptor);
+    int key_size = _shared_state->probe_expr_ctxs.size();
 
     vectorized::MutableColumns key_columns;
     for (int i = 0; i < key_size; ++i) {
@@ -340,20 +329,20 @@ Status AggLocalState<DependencyType, 
Derived>::_get_result_with_serialized_key_n
                 const auto size = std::min(data.size(), 
size_t(state->batch_size()));
                 using KeyType = 
std::decay_t<decltype(agg_method.iterator->get_first())>;
                 std::vector<KeyType> keys(size);
-                if (Base::_shared_state->values.size() < size) {
-                    Base::_shared_state->values.resize(size);
+                if (_shared_state->values.size() < size) {
+                    _shared_state->values.resize(size);
                 }
 
                 size_t num_rows = 0;
-                Base::_shared_state->aggregate_data_container->init_once();
-                auto& iter = 
Base::_shared_state->aggregate_data_container->iterator;
+                _shared_state->aggregate_data_container->init_once();
+                auto& iter = _shared_state->aggregate_data_container->iterator;
 
                 {
                     SCOPED_TIMER(_hash_table_iterate_timer);
-                    while (iter != 
Base::_shared_state->aggregate_data_container->end() &&
+                    while (iter != 
_shared_state->aggregate_data_container->end() &&
                            num_rows < state->batch_size()) {
                         keys[num_rows] = iter.template get_key<KeyType>();
-                        Base::_shared_state->values[num_rows] = 
iter.get_aggregate_data();
+                        _shared_state->values[num_rows] = 
iter.get_aggregate_data();
                         ++iter;
                         ++num_rows;
                     }
@@ -362,17 +351,17 @@ Status AggLocalState<DependencyType, 
Derived>::_get_result_with_serialized_key_n
                 {
                     SCOPED_TIMER(_insert_keys_to_column_timer);
                     agg_method.insert_keys_into_columns(keys, key_columns, 
num_rows,
-                                                        
Base::_shared_state->probe_key_sz);
+                                                        
_shared_state->probe_key_sz);
                 }
 
-                for (size_t i = 0; i < 
Base::_shared_state->aggregate_evaluators.size(); ++i) {
-                    
Base::_shared_state->aggregate_evaluators[i]->insert_result_info_vec(
-                            Base::_shared_state->values,
+                for (size_t i = 0; i < 
_shared_state->aggregate_evaluators.size(); ++i) {
+                    
_shared_state->aggregate_evaluators[i]->insert_result_info_vec(
+                            _shared_state->values,
                             
Base::_dependency->offsets_of_aggregate_states()[i],
                             value_columns[i].get(), num_rows);
                 }
 
-                if (iter == 
Base::_shared_state->aggregate_data_container->end()) {
+                if (iter == _shared_state->aggregate_data_container->end()) {
                     if (agg_method.data.has_null_key_data()) {
                         // only one key of group by support wrap null key
                         // here need additional processing logic on the null 
key / value
@@ -381,9 +370,8 @@ Status AggLocalState<DependencyType, 
Derived>::_get_result_with_serialized_key_n
                         if (key_columns[0]->size() < state->batch_size()) {
                             key_columns[0]->insert_data(nullptr, 0);
                             auto mapped = agg_method.data.get_null_key_data();
-                            for (size_t i = 0; i < 
Base::_shared_state->aggregate_evaluators.size();
-                                 ++i)
-                                
Base::_shared_state->aggregate_evaluators[i]->insert_result_info(
+                            for (size_t i = 0; i < 
_shared_state->aggregate_evaluators.size(); ++i)
+                                
_shared_state->aggregate_evaluators[i]->insert_result_info(
                                         mapped +
                                                 
Base::_dependency->offsets_of_aggregate_states()[i],
                                         value_columns[i].get());
@@ -412,42 +400,39 @@ Status AggLocalState<DependencyType, 
Derived>::_get_result_with_serialized_key_n
     return Status::OK();
 }
 
-template <typename DependencyType, typename Derived>
-Status AggLocalState<DependencyType, 
Derived>::_serialize_without_key(RuntimeState* state,
-                                                                      
vectorized::Block* block,
-                                                                      
SourceState& source_state) {
+Status AggLocalState::_serialize_without_key(RuntimeState* state, 
vectorized::Block* block,
+                                             SourceState& source_state) {
     // 1. `child(0)->rows_returned() == 0` mean not data from child
     // in level two aggregation node should return NULL result
     //    level one aggregation node set `eos = true` return directly
     SCOPED_TIMER(_serialize_result_timer);
-    if (UNLIKELY(Base::_shared_state->input_num_rows == 0)) {
+    if (UNLIKELY(_shared_state->input_num_rows == 0)) {
         source_state = SourceState::FINISHED;
         return Status::OK();
     }
     block->clear();
 
     DCHECK(_agg_data->without_key != nullptr);
-    int agg_size = Base::_shared_state->aggregate_evaluators.size();
+    int agg_size = _shared_state->aggregate_evaluators.size();
 
     vectorized::MutableColumns value_columns(agg_size);
     std::vector<vectorized::DataTypePtr> data_types(agg_size);
     // will serialize data to string column
-    for (int i = 0; i < Base::_shared_state->aggregate_evaluators.size(); ++i) 
{
-        data_types[i] =
-                
Base::_shared_state->aggregate_evaluators[i]->function()->get_serialized_type();
+    for (int i = 0; i < _shared_state->aggregate_evaluators.size(); ++i) {
+        data_types[i] = 
_shared_state->aggregate_evaluators[i]->function()->get_serialized_type();
         value_columns[i] =
-                
Base::_shared_state->aggregate_evaluators[i]->function()->create_serialize_column();
+                
_shared_state->aggregate_evaluators[i]->function()->create_serialize_column();
     }
 
-    for (int i = 0; i < Base::_shared_state->aggregate_evaluators.size(); ++i) 
{
-        
Base::_shared_state->aggregate_evaluators[i]->function()->serialize_without_key_to_column(
+    for (int i = 0; i < _shared_state->aggregate_evaluators.size(); ++i) {
+        
_shared_state->aggregate_evaluators[i]->function()->serialize_without_key_to_column(
                 _agg_data->without_key + 
Base::_dependency->offsets_of_aggregate_states()[i],
                 *value_columns[i]);
     }
 
     {
         vectorized::ColumnsWithTypeAndName data_with_schema;
-        for (int i = 0; i < Base::_shared_state->aggregate_evaluators.size(); 
++i) {
+        for (int i = 0; i < _shared_state->aggregate_evaluators.size(); ++i) {
             vectorized::ColumnWithTypeAndName column_with_schema = {nullptr, 
data_types[i], ""};
             data_with_schema.push_back(std::move(column_with_schema));
         }
@@ -459,27 +444,25 @@ Status AggLocalState<DependencyType, 
Derived>::_serialize_without_key(RuntimeSta
     return Status::OK();
 }
 
-template <typename DependencyType, typename Derived>
-Status AggLocalState<DependencyType, 
Derived>::_get_without_key_result(RuntimeState* state,
-                                                                       
vectorized::Block* block,
-                                                                       
SourceState& source_state) {
+Status AggLocalState::_get_without_key_result(RuntimeState* state, 
vectorized::Block* block,
+                                              SourceState& source_state) {
     DCHECK(_agg_data->without_key != nullptr);
     block->clear();
 
-    auto& p = Base::_parent->template cast<typename Derived::Parent>();
+    auto& p = _parent->cast<AggSourceOperatorX>();
     *block = 
vectorized::VectorizedUtils::create_empty_columnswithtypename(p._row_descriptor);
-    int agg_size = Base::_shared_state->aggregate_evaluators.size();
+    int agg_size = _shared_state->aggregate_evaluators.size();
 
     vectorized::MutableColumns columns(agg_size);
     std::vector<vectorized::DataTypePtr> data_types(agg_size);
-    for (int i = 0; i < Base::_shared_state->aggregate_evaluators.size(); ++i) 
{
-        data_types[i] = 
Base::_shared_state->aggregate_evaluators[i]->function()->get_return_type();
+    for (int i = 0; i < _shared_state->aggregate_evaluators.size(); ++i) {
+        data_types[i] = 
_shared_state->aggregate_evaluators[i]->function()->get_return_type();
         columns[i] = data_types[i]->create_column();
     }
 
-    for (int i = 0; i < Base::_shared_state->aggregate_evaluators.size(); ++i) 
{
+    for (int i = 0; i < _shared_state->aggregate_evaluators.size(); ++i) {
         auto column = columns[i].get();
-        Base::_shared_state->aggregate_evaluators[i]->insert_result_info(
+        _shared_state->aggregate_evaluators[i]->insert_result_info(
                 _agg_data->without_key + 
Base::_dependency->offsets_of_aggregate_states()[i],
                 column);
     }
@@ -502,7 +485,7 @@ Status AggLocalState<DependencyType, 
Derived>::_get_without_key_result(RuntimeSt
                 vectorized::ColumnPtr ptr = std::move(columns[i]);
                 // unless `count`, other aggregate function dispose empty set 
should be null
                 // so here check the children row return
-                ptr = make_nullable(ptr, Base::_shared_state->input_num_rows 
== 0);
+                ptr = make_nullable(ptr, _shared_state->input_num_rows == 0);
                 columns[i] = ptr->assume_mutable();
             }
         }
@@ -521,7 +504,7 @@ AggSourceOperatorX::AggSourceOperatorX(ObjectPool* pool, 
const TPlanNode& tnode,
 
 Status AggSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* 
block,
                                      SourceState& source_state) {
-    auto& local_state = 
state->get_local_state(id())->cast<BlockingAggLocalState>();
+    auto& local_state = state->get_local_state(id())->cast<AggLocalState>();
     SCOPED_TIMER(local_state.profile()->total_time_counter());
     RETURN_IF_ERROR(local_state._executor.get_result(state, block, 
source_state));
     local_state.make_nullable_output_key(block);
@@ -531,8 +514,7 @@ Status AggSourceOperatorX::get_block(RuntimeState* state, 
vectorized::Block* blo
     return Status::OK();
 }
 
-template <typename DependencyType, typename Derived>
-void AggLocalState<DependencyType, 
Derived>::make_nullable_output_key(vectorized::Block* block) {
+void AggLocalState::make_nullable_output_key(vectorized::Block* block) {
     if (block->rows() != 0) {
         for (auto cid : Base::_dependency->make_nullable_keys()) {
             block->get_by_position(cid).column = 
make_nullable(block->get_by_position(cid).column);
@@ -541,13 +523,12 @@ void AggLocalState<DependencyType, 
Derived>::make_nullable_output_key(vectorized
     }
 }
 
-template <typename DependencyType, typename Derived>
-Status AggLocalState<DependencyType, Derived>::close(RuntimeState* state) {
+Status AggLocalState::close(RuntimeState* state) {
     SCOPED_TIMER(Base::profile()->total_time_counter());
     if (Base::_closed) {
         return Status::OK();
     }
-    for (auto* aggregate_evaluator : 
Base::_shared_state->aggregate_evaluators) {
+    for (auto* aggregate_evaluator : _shared_state->aggregate_evaluators) {
         aggregate_evaluator->close(state);
     }
     if (_executor.close) {
@@ -563,26 +544,19 @@ Status AggLocalState<DependencyType, 
Derived>::close(RuntimeState* state) {
                 _agg_data->method_variant);
     }
 
-    Base::_shared_state->agg_data = nullptr;
-    Base::_shared_state->aggregate_data_container = nullptr;
-    Base::_shared_state->agg_arena_pool = nullptr;
-    Base::_shared_state->agg_profile_arena = nullptr;
+    _shared_state->agg_data = nullptr;
+    _shared_state->aggregate_data_container = nullptr;
+    _shared_state->agg_arena_pool = nullptr;
+    _shared_state->agg_profile_arena = nullptr;
 
     std::vector<vectorized::AggregateDataPtr> tmp_values;
-    Base::_shared_state->values.swap(tmp_values);
+    _shared_state->values.swap(tmp_values);
     return Base::close(state);
 }
 
 Dependency* AggSourceOperatorX::wait_for_dependency(RuntimeState* state) {
-    return state->get_local_state(Base::id())
-            ->cast<BlockingAggLocalState>()
-            ._dependency->read_blocked_by();
+    return 
state->get_local_state(Base::id())->cast<AggLocalState>()._dependency->read_blocked_by();
 }
 
-class StreamingAggLocalState;
-
-template class AggLocalState<AggDependency, BlockingAggLocalState>;
-template class AggLocalState<StreamingAggDependency, StreamingAggLocalState>;
-
 } // namespace pipeline
 } // namespace doris
diff --git a/be/src/pipeline/exec/aggregation_source_operator.h 
b/be/src/pipeline/exec/aggregation_source_operator.h
index 3148f06792..a840a72f13 100644
--- a/be/src/pipeline/exec/aggregation_source_operator.h
+++ b/be/src/pipeline/exec/aggregation_source_operator.h
@@ -50,10 +50,9 @@ public:
 
 class AggSourceOperatorX;
 
-template <typename DependencyType, typename Derived>
-class AggLocalState : public PipelineXLocalState<DependencyType> {
+class AggLocalState : public PipelineXLocalState<AggDependency> {
 public:
-    using Base = PipelineXLocalState<DependencyType>;
+    using Base = PipelineXLocalState<AggDependency>;
     ENABLE_FACTORY_CREATOR(AggLocalState);
     AggLocalState(RuntimeState* state, OperatorXBase* parent);
 
@@ -110,19 +109,9 @@ protected:
     bool _agg_data_created_without_key = false;
 };
 
-class BlockingAggLocalState final : public AggLocalState<AggDependency, 
BlockingAggLocalState> {
+class AggSourceOperatorX : public OperatorX<AggLocalState> {
 public:
-    ENABLE_FACTORY_CREATOR(BlockingAggLocalState);
-    using Parent = AggSourceOperatorX;
-
-    BlockingAggLocalState(RuntimeState* state, OperatorXBase* parent)
-            : AggLocalState<AggDependency, BlockingAggLocalState>(state, 
parent) {}
-    ~BlockingAggLocalState() = default;
-};
-
-class AggSourceOperatorX final : public OperatorX<BlockingAggLocalState> {
-public:
-    using Base = OperatorX<BlockingAggLocalState>;
+    using Base = OperatorX<AggLocalState>;
     AggSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, const 
DescriptorTbl& descs);
     ~AggSourceOperatorX() = default;
     Dependency* wait_for_dependency(RuntimeState* state) override;
@@ -133,7 +122,6 @@ public:
     bool is_source() const override { return true; }
 
 private:
-    template <typename DependencyType, typename Derived>
     friend class AggLocalState;
 
     bool _needs_finalize;
diff --git a/be/src/pipeline/exec/data_queue.cpp 
b/be/src/pipeline/exec/data_queue.cpp
index eff91bdd3f..e070fe2c99 100644
--- a/be/src/pipeline/exec/data_queue.cpp
+++ b/be/src/pipeline/exec/data_queue.cpp
@@ -30,7 +30,7 @@
 namespace doris {
 namespace pipeline {
 
-DataQueue::DataQueue(int child_count, StreamingAggDependency* agg_dependency,
+DataQueue::DataQueue(int child_count, AggDependency* agg_dependency,
                      UnionDependency* union_dependency)
         : _queue_blocks_lock(child_count),
           _queue_blocks(child_count),
diff --git a/be/src/pipeline/exec/data_queue.h 
b/be/src/pipeline/exec/data_queue.h
index e6105a3264..ab65bfeea6 100644
--- a/be/src/pipeline/exec/data_queue.h
+++ b/be/src/pipeline/exec/data_queue.h
@@ -30,13 +30,13 @@
 namespace doris {
 namespace pipeline {
 
-class StreamingAggDependency;
+class AggDependency;
 class UnionDependency;
 
 class DataQueue {
 public:
     //always one is enough, but in union node it's has more children
-    DataQueue(int child_count = 1, StreamingAggDependency* agg_dependency = 
nullptr,
+    DataQueue(int child_count = 1, AggDependency* agg_dependency = nullptr,
               UnionDependency* union_dependency = nullptr);
     ~DataQueue() = default;
 
@@ -88,7 +88,7 @@ private:
     int64_t _max_size_of_queue = 0;
     static constexpr int64_t MAX_BYTE_OF_QUEUE = 1024l * 1024 * 1024 / 10;
 
-    StreamingAggDependency* _agg_dependency = nullptr;
+    AggDependency* _agg_dependency = nullptr;
     UnionDependency* _union_dependency = nullptr;
 };
 } // namespace pipeline
diff --git a/be/src/pipeline/exec/scan_operator.cpp 
b/be/src/pipeline/exec/scan_operator.cpp
index cb4675bcfb..544bc3bc67 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -151,8 +151,7 @@ Status ScanLocalState<Derived>::init(RuntimeState* state, 
LocalStateInfo& info)
     _open_timer = ADD_TIMER(_runtime_profile, "OpenTime");
     _alloc_resource_timer = ADD_TIMER(_runtime_profile, 
"AllocateResourceTime");
 
-    static const std::string timer_name =
-            "WaitForDependency[" + _source_dependency->name() + "]Time";
+    static const std::string timer_name = "WaitForDependencyTime";
     _wait_for_dependency_timer = ADD_TIMER(_runtime_profile, timer_name);
     _wait_for_data_timer = ADD_CHILD_TIMER(_runtime_profile, "WaitForData", 
timer_name);
     _wait_for_scanner_done_timer =
@@ -164,6 +163,7 @@ Status ScanLocalState<Derived>::init(RuntimeState* state, 
LocalStateInfo& info)
 template <typename Derived>
 Status ScanLocalState<Derived>::open(RuntimeState* state) {
     SCOPED_TIMER(profile()->total_time_counter());
+    SCOPED_TIMER(_open_timer);
     if (_open_dependency == nullptr) {
         return Status::OK();
     }
diff --git a/be/src/pipeline/exec/streaming_aggregation_sink_operator.h 
b/be/src/pipeline/exec/streaming_aggregation_sink_operator.h
index 5b7f3073ff..5b9c635580 100644
--- a/be/src/pipeline/exec/streaming_aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/streaming_aggregation_sink_operator.h
@@ -74,10 +74,10 @@ private:
 class StreamingAggSinkOperatorX;
 
 class StreamingAggSinkLocalState final
-        : public AggSinkLocalState<StreamingAggDependency, 
StreamingAggSinkLocalState> {
+        : public AggSinkLocalState<AggDependency, StreamingAggSinkLocalState> {
 public:
     using Parent = StreamingAggSinkOperatorX;
-    using Base = AggSinkLocalState<StreamingAggDependency, 
StreamingAggSinkLocalState>;
+    using Base = AggSinkLocalState<AggDependency, StreamingAggSinkLocalState>;
     ENABLE_FACTORY_CREATOR(StreamingAggSinkLocalState);
     StreamingAggSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* 
state);
 
diff --git a/be/src/pipeline/exec/streaming_aggregation_source_operator.cpp 
b/be/src/pipeline/exec/streaming_aggregation_source_operator.cpp
index 7b1544232d..a0a866d0f7 100644
--- a/be/src/pipeline/exec/streaming_aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/streaming_aggregation_source_operator.cpp
@@ -75,31 +75,27 @@ OperatorPtr 
StreamingAggSourceOperatorBuilder::build_operator() {
 
 StreamingAggSourceOperatorX::StreamingAggSourceOperatorX(ObjectPool* pool, 
const TPlanNode& tnode,
                                                          const DescriptorTbl& 
descs)
-        : Base(pool, tnode, descs),
-          _needs_finalize(tnode.agg_node.need_finalize),
-          _without_key(tnode.agg_node.grouping_exprs.empty()) {}
+        : Base(pool, tnode, descs) {}
 
 Status StreamingAggSourceOperatorX::get_block(RuntimeState* state, 
vectorized::Block* block,
                                               SourceState& source_state) {
-    auto& local_state = 
state->get_local_state(id())->cast<StreamingAggLocalState>();
+    auto& local_state = state->get_local_state(id())->cast<AggLocalState>();
     SCOPED_TIMER(local_state.profile()->total_time_counter());
     if (!local_state._shared_state->data_queue->data_exhausted()) {
         std::unique_ptr<vectorized::Block> agg_block;
         DCHECK(local_state._dependency->read_blocked_by() == nullptr);
         
RETURN_IF_ERROR(local_state._shared_state->data_queue->get_block_from_queue(&agg_block));
 
-        if (!local_state._shared_state->data_queue->data_exhausted()) {
+        if (local_state._shared_state->data_queue->data_exhausted()) {
+            RETURN_IF_ERROR(Base::get_block(state, block, source_state));
+        } else {
             block->swap(*agg_block);
             agg_block->clear_column_data(row_desc().num_materialized_slots());
             
local_state._shared_state->data_queue->push_free_block(std::move(agg_block));
-            return Status::OK();
         }
+    } else {
+        RETURN_IF_ERROR(Base::get_block(state, block, source_state));
     }
-    RETURN_IF_ERROR(local_state._executor.get_result(state, block, 
source_state));
-    local_state.make_nullable_output_key(block);
-    // dispose the having clause, should not be execute in prestreaming agg
-    RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_conjuncts, block, 
block->columns()));
-    local_state.reached_limit(block, source_state);
 
     return Status::OK();
 }
@@ -110,11 +106,5 @@ Status StreamingAggSourceOperatorX::init(const TPlanNode& 
tnode, RuntimeState* s
     return Status::OK();
 }
 
-Dependency* StreamingAggSourceOperatorX::wait_for_dependency(RuntimeState* 
state) {
-    return state->get_local_state(id())
-            ->cast<StreamingAggLocalState>()
-            ._dependency->read_blocked_by();
-}
-
 } // namespace pipeline
 } // namespace doris
diff --git a/be/src/pipeline/exec/streaming_aggregation_source_operator.h 
b/be/src/pipeline/exec/streaming_aggregation_source_operator.h
index 41f99164d3..10dc6cd026 100644
--- a/be/src/pipeline/exec/streaming_aggregation_source_operator.h
+++ b/be/src/pipeline/exec/streaming_aggregation_source_operator.h
@@ -60,43 +60,17 @@ private:
     std::shared_ptr<DataQueue> _data_queue;
 };
 
-class StreamingAggSourceOperatorX;
-
-class StreamingAggLocalState final
-        : public AggLocalState<StreamingAggDependency, StreamingAggLocalState> 
{
+class StreamingAggSourceOperatorX final : public AggSourceOperatorX {
 public:
-    using Parent = StreamingAggSourceOperatorX;
-    ENABLE_FACTORY_CREATOR(StreamingAggLocalState);
-    StreamingAggLocalState(RuntimeState* state, OperatorXBase* parent)
-            : AggLocalState<StreamingAggDependency, 
StreamingAggLocalState>(state, parent) {}
-    ~StreamingAggLocalState() = default;
-};
-
-class StreamingAggSourceOperatorX final : public 
OperatorX<StreamingAggLocalState> {
-public:
-    using Base = OperatorX<StreamingAggLocalState>;
+    using Base = AggSourceOperatorX;
     StreamingAggSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode,
                                 const DescriptorTbl& descs);
     ~StreamingAggSourceOperatorX() = default;
 
     Status init(const TPlanNode& tnode, RuntimeState* state) override;
 
-    Dependency* wait_for_dependency(RuntimeState* state) override;
-
     Status get_block(RuntimeState* state, vectorized::Block* block,
                      SourceState& source_state) override;
-
-    bool is_source() const override { return true; }
-
-private:
-    template <typename DependencyType, typename Derived>
-    friend class AggLocalState;
-
-    bool _needs_finalize;
-    bool _without_key;
-    // left / full join will change the key nullable make output/input solt
-    // nullable diff. so we need make nullable of it.
-    std::vector<size_t> _make_nullable_keys;
 };
 
 } // namespace pipeline
diff --git a/be/src/pipeline/pipeline_x/dependency.h 
b/be/src/pipeline/pipeline_x/dependency.h
index cb9c8a5185..e4caf1ebdf 100644
--- a/be/src/pipeline/pipeline_x/dependency.h
+++ b/be/src/pipeline/pipeline_x/dependency.h
@@ -254,6 +254,7 @@ public:
     size_t input_num_rows = 0;
     std::vector<vectorized::AggregateDataPtr> values;
     std::unique_ptr<vectorized::Arena> agg_profile_arena;
+    std::unique_ptr<DataQueue> data_queue;
 };
 
 class AggDependency : public Dependency {
@@ -261,6 +262,7 @@ public:
     using SharedState = AggSharedState;
     AggDependency(int id) : Dependency(id, "AggDependency") {
         _mem_tracker = std::make_unique<MemTracker>("AggregateOperator:");
+        _agg_state.data_queue = std::make_unique<DataQueue>(1, this);
     }
     ~AggDependency() override = default;
 
@@ -317,26 +319,6 @@ private:
     AggSharedState _agg_state;
 };
 
-struct StreamingAggSharedState final : public AggSharedState {
-public:
-    StreamingAggSharedState() : AggSharedState() {}
-    ~StreamingAggSharedState() = default;
-    std::unique_ptr<DataQueue> data_queue;
-};
-
-class StreamingAggDependency final : public AggDependency {
-public:
-    using SharedState = StreamingAggSharedState;
-    StreamingAggDependency(int id) : AggDependency(id) {
-        _streaming_agg_state.data_queue = std::make_unique<DataQueue>(1, this);
-    }
-
-    void* shared_state() override { return (void*)&_streaming_agg_state; }
-
-private:
-    StreamingAggSharedState _streaming_agg_state;
-};
-
 struct SortSharedState {
 public:
     std::unique_ptr<vectorized::Sorter> sorter;
diff --git a/be/src/pipeline/pipeline_x/operator.cpp 
b/be/src/pipeline/pipeline_x/operator.cpp
index b35d15cb7f..3452588bad 100644
--- a/be/src/pipeline/pipeline_x/operator.cpp
+++ b/be/src/pipeline/pipeline_x/operator.cpp
@@ -337,8 +337,7 @@ DECLARE_OPERATOR_X(HashJoinProbeLocalState)
 DECLARE_OPERATOR_X(OlapScanLocalState)
 DECLARE_OPERATOR_X(AnalyticLocalState)
 DECLARE_OPERATOR_X(SortLocalState)
-DECLARE_OPERATOR_X(BlockingAggLocalState)
-DECLARE_OPERATOR_X(StreamingAggLocalState)
+DECLARE_OPERATOR_X(AggLocalState)
 DECLARE_OPERATOR_X(ExchangeLocalState)
 DECLARE_OPERATOR_X(RepeatLocalState)
 DECLARE_OPERATOR_X(NestedLoopJoinProbeLocalState)
@@ -368,7 +367,6 @@ template class PipelineXLocalState<SortDependency>;
 template class PipelineXLocalState<NestedLoopJoinDependency>;
 template class PipelineXLocalState<AnalyticDependency>;
 template class PipelineXLocalState<AggDependency>;
-template class PipelineXLocalState<StreamingAggDependency>;
 template class PipelineXLocalState<FakeDependency>;
 template class PipelineXLocalState<UnionDependency>;
 
diff --git a/be/src/pipeline/pipeline_x/operator.h 
b/be/src/pipeline/pipeline_x/operator.h
index d6adfa7458..13cf56c65d 100644
--- a/be/src/pipeline/pipeline_x/operator.h
+++ b/be/src/pipeline/pipeline_x/operator.h
@@ -313,7 +313,6 @@ public:
         for (size_t i = 0; i < _projections.size(); i++) {
             RETURN_IF_ERROR(_parent->_projections[i]->clone(state, 
_projections[i]));
         }
-        DCHECK(_runtime_profile.get() != nullptr);
         _rows_returned_counter = ADD_COUNTER(_runtime_profile, "RowsReturned", 
TUnit::UNIT);
         _rows_input_counter = ADD_COUNTER(_runtime_profile, "InputRows", 
TUnit::UNIT);
         _projection_timer = ADD_TIMER(_runtime_profile, "ProjectionTime");
diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q1.1.sql 
b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q1.1.sql
index 3a9ce71313..c3d19b67a2 100644
--- a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q1.1.sql
+++ b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q1.1.sql
@@ -15,7 +15,7 @@
 -- specific language governing permissions and limitations
 -- under the License.
 
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, 
disable_streaming_preaggregations=true) */ SUM(LO_EXTENDEDPRICE * LO_DISCOUNT) 
AS revenue
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ 
SUM(LO_EXTENDEDPRICE * LO_DISCOUNT) AS revenue
 FROM lineorder_flat
 WHERE
     LO_ORDERDATE >= 19930101
diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q1.2.sql 
b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q1.2.sql
index a37876b067..6ab6ceea34 100644
--- a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q1.2.sql
+++ b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q1.2.sql
@@ -15,7 +15,7 @@
 -- specific language governing permissions and limitations
 -- under the License.
 --Q1.2
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, 
disable_streaming_preaggregations=true) */ SUM(LO_EXTENDEDPRICE * LO_DISCOUNT) 
AS revenue
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ 
SUM(LO_EXTENDEDPRICE * LO_DISCOUNT) AS revenue
 FROM lineorder_flat
 WHERE
     LO_ORDERDATE >= 19940101
diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q1.3.sql 
b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q1.3.sql
index 5ba5ee025c..70796c2a95 100644
--- a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q1.3.sql
+++ b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q1.3.sql
@@ -15,7 +15,7 @@
 -- specific language governing permissions and limitations
 -- under the License.
 --Q1.3
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, 
disable_streaming_preaggregations=true) */ SUM(LO_EXTENDEDPRICE * LO_DISCOUNT) 
AS revenue
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ 
SUM(LO_EXTENDEDPRICE * LO_DISCOUNT) AS revenue
 FROM lineorder_flat
 WHERE
     weekofyear(LO_ORDERDATE) = 6
diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q2.1.sql 
b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q2.1.sql
index f42f86e685..57f2ada296 100644
--- a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q2.1.sql
+++ b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q2.1.sql
@@ -15,7 +15,7 @@
 -- specific language governing permissions and limitations
 -- under the License.
 --Q2.1
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, 
disable_streaming_preaggregations=true) */
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
     SUM(LO_REVENUE), (LO_ORDERDATE DIV 10000) AS YEAR,
     P_BRAND
 FROM lineorder_flat
diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q2.2.sql 
b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q2.2.sql
index ae458ca5c3..9b7a5db502 100644
--- a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q2.2.sql
+++ b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q2.2.sql
@@ -15,7 +15,7 @@
 -- specific language governing permissions and limitations
 -- under the License.
 --Q2.2
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, 
disable_streaming_preaggregations=true) */
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
     SUM(LO_REVENUE), (LO_ORDERDATE DIV 10000) AS YEAR,
     P_BRAND
 FROM lineorder_flat
diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q2.3.sql 
b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q2.3.sql
index 70771a50f7..3a8a5e74d4 100644
--- a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q2.3.sql
+++ b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q2.3.sql
@@ -15,7 +15,7 @@
 -- specific language governing permissions and limitations
 -- under the License.
 --Q2.3
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, 
disable_streaming_preaggregations=true) */
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
     SUM(LO_REVENUE), (LO_ORDERDATE DIV 10000) AS YEAR,
     P_BRAND
 FROM lineorder_flat
diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q3.1.sql 
b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q3.1.sql
index e6c31d6315..6b3257f1f3 100644
--- a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q3.1.sql
+++ b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q3.1.sql
@@ -15,7 +15,7 @@
 -- specific language governing permissions and limitations
 -- under the License.
 --Q3.1
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, 
disable_streaming_preaggregations=true) */
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
     C_NATION,
     S_NATION, (LO_ORDERDATE DIV 10000) AS YEAR,
     SUM(LO_REVENUE) AS revenue
diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q3.2.sql 
b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q3.2.sql
index d9160639bd..fefe727da8 100644
--- a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q3.2.sql
+++ b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q3.2.sql
@@ -15,7 +15,7 @@
 -- specific language governing permissions and limitations
 -- under the License.
 --Q3.2
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, 
disable_streaming_preaggregations=true) */
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
     C_CITY,
     S_CITY, (LO_ORDERDATE DIV 10000) AS YEAR,
     SUM(LO_REVENUE) AS revenue
diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q3.3.sql 
b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q3.3.sql
index 1d6b34f9d5..c4560b701e 100644
--- a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q3.3.sql
+++ b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q3.3.sql
@@ -15,7 +15,7 @@
 -- specific language governing permissions and limitations
 -- under the License.
 --Q3.3
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, 
disable_streaming_preaggregations=true) */
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
     C_CITY,
     S_CITY, (LO_ORDERDATE DIV 10000) AS YEAR,
     SUM(LO_REVENUE) AS revenue
diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q3.4.sql 
b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q3.4.sql
index 2c1ce76817..4ae5d956e4 100644
--- a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q3.4.sql
+++ b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q3.4.sql
@@ -15,7 +15,7 @@
 -- specific language governing permissions and limitations
 -- under the License.
 --Q3.4
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, 
disable_streaming_preaggregations=true) */
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
     C_CITY,
     S_CITY, (LO_ORDERDATE DIV 10000) AS YEAR,
     SUM(LO_REVENUE) AS revenue
diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q4.1.sql 
b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q4.1.sql
index 33f491eece..87b29bf160 100644
--- a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q4.1.sql
+++ b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q4.1.sql
@@ -15,7 +15,7 @@
 -- specific language governing permissions and limitations
 -- under the License.
 --Q4.1
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, 
disable_streaming_preaggregations=true) */ (LO_ORDERDATE DIV 10000) AS YEAR,
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ (LO_ORDERDATE 
DIV 10000) AS YEAR,
     C_NATION,
     SUM(LO_REVENUE - LO_SUPPLYCOST) AS profit
 FROM lineorder_flat
diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q4.2.sql 
b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q4.2.sql
index a71d014add..8ea28f3f12 100644
--- a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q4.2.sql
+++ b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q4.2.sql
@@ -15,7 +15,7 @@
 -- specific language governing permissions and limitations
 -- under the License.
 --Q4.2
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, 
disable_streaming_preaggregations=true) */ (LO_ORDERDATE DIV 10000) AS YEAR,
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ (LO_ORDERDATE 
DIV 10000) AS YEAR,
     S_NATION,
     P_CATEGORY,
     SUM(LO_REVENUE - LO_SUPPLYCOST) AS profit
diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q4.3.sql 
b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q4.3.sql
index 3c247a188f..0f7c7401ab 100644
--- a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q4.3.sql
+++ b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q4.3.sql
@@ -15,7 +15,7 @@
 -- specific language governing permissions and limitations
 -- under the License.
 --Q4.3
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, 
disable_streaming_preaggregations=true) */ (LO_ORDERDATE DIV 10000) AS YEAR,
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ (LO_ORDERDATE 
DIV 10000) AS YEAR,
     S_CITY,
     P_BRAND,
     SUM(LO_REVENUE - LO_SUPPLYCOST) AS profit
diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q1.1.sql 
b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q1.1.sql
index 260b1ee03b..50b50bc368 100644
--- a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q1.1.sql
+++ b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q1.1.sql
@@ -15,7 +15,7 @@
 -- specific language governing permissions and limitations
 -- under the License.
 
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, 
disable_streaming_preaggregations=true) */ SUM(lo_extendedprice*lo_discount) AS
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ 
SUM(lo_extendedprice*lo_discount) AS
 REVENUE
 FROM  lineorder, date
 WHERE  lo_orderdate = d_datekey
diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q1.2.sql 
b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q1.2.sql
index b8b54ccaad..77c0262016 100644
--- a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q1.2.sql
+++ b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q1.2.sql
@@ -15,7 +15,7 @@
 -- specific language governing permissions and limitations
 -- under the License.
 
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, 
disable_streaming_preaggregations=true) */ SUM(lo_extendedprice*lo_discount) AS
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ 
SUM(lo_extendedprice*lo_discount) AS
 REVENUE
 FROM  lineorder, date
 WHERE  lo_orderdate = d_datekey
diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q1.3.sql 
b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q1.3.sql
index fec034dd9b..0052db0aac 100644
--- a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q1.3.sql
+++ b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q1.3.sql
@@ -15,7 +15,7 @@
 -- specific language governing permissions and limitations
 -- under the License.
 
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, 
disable_streaming_preaggregations=true) */ SUM(lo_extendedprice*lo_discount) AS
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ 
SUM(lo_extendedprice*lo_discount) AS
 REVENUE
 FROM  lineorder, date
 WHERE  lo_orderdate = d_datekey
diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q2.1.sql 
b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q2.1.sql
index d71685a91a..a47ec82b51 100644
--- a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q2.1.sql
+++ b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q2.1.sql
@@ -15,7 +15,7 @@
 -- specific language governing permissions and limitations
 -- under the License.
 
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, 
disable_streaming_preaggregations=true) */ SUM(lo_revenue), d_year, p_brand
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ 
SUM(lo_revenue), d_year, p_brand
 FROM lineorder, date, part, supplier
 WHERE lo_orderdate = d_datekey
 AND lo_partkey = p_partkey
diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q2.2.sql 
b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q2.2.sql
index c2a5f6dc15..9ab1a95d4d 100644
--- a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q2.2.sql
+++ b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q2.2.sql
@@ -15,7 +15,7 @@
 -- specific language governing permissions and limitations
 -- under the License.
 
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, 
disable_streaming_preaggregations=true) */ SUM(lo_revenue), d_year, p_brand
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ 
SUM(lo_revenue), d_year, p_brand
 FROM lineorder, date, part, supplier
 WHERE lo_orderdate = d_datekey
 AND lo_partkey = p_partkey
diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q2.3.sql 
b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q2.3.sql
index 19331d9ee5..b7e6bd7840 100644
--- a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q2.3.sql
+++ b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q2.3.sql
@@ -15,7 +15,7 @@
 -- specific language governing permissions and limitations
 -- under the License.
 
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, 
disable_streaming_preaggregations=true) */ SUM(lo_revenue), d_year, p_brand
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ 
SUM(lo_revenue), d_year, p_brand
 FROM lineorder, date, part, supplier
 WHERE  lo_orderdate = d_datekey
 AND lo_partkey = p_partkey
diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q3.1.sql 
b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q3.1.sql
index a99bffe752..85c470b708 100644
--- a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q3.1.sql
+++ b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q3.1.sql
@@ -15,7 +15,7 @@
 -- specific language governing permissions and limitations
 -- under the License.
 
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, 
disable_streaming_preaggregations=true) */ c_nation, s_nation, d_year,
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ c_nation, 
s_nation, d_year,
 SUM(lo_revenue)  AS  REVENUE
 FROM customer, lineorder, supplier, date
 WHERE lo_custkey = c_custkey
diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q3.2.sql 
b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q3.2.sql
index 65acd47881..cd0b320f87 100644
--- a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q3.2.sql
+++ b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q3.2.sql
@@ -15,7 +15,7 @@
 -- specific language governing permissions and limitations
 -- under the License.
 
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, 
disable_streaming_preaggregations=true) */ c_city, s_city, d_year, 
sum(lo_revenue)
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ c_city, 
s_city, d_year, sum(lo_revenue)
 AS  REVENUE
 FROM customer, lineorder, supplier, date
 WHERE  lo_custkey = c_custkey
diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q3.3.sql 
b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q3.3.sql
index 18fe99b85a..89765c02d9 100644
--- a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q3.3.sql
+++ b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q3.3.sql
@@ -15,7 +15,7 @@
 -- specific language governing permissions and limitations
 -- under the License.
 
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, 
disable_streaming_preaggregations=true) */ c_city, s_city, d_year, 
SUM(lo_revenue)
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ c_city, 
s_city, d_year, SUM(lo_revenue)
 AS  REVENUE
 FROM customer, lineorder, supplier, date
 WHERE lo_custkey = c_custkey
diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q3.4.sql 
b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q3.4.sql
index f59aac3ee6..5cef87a3fe 100644
--- a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q3.4.sql
+++ b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q3.4.sql
@@ -15,7 +15,7 @@
 -- specific language governing permissions and limitations
 -- under the License.
 
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, 
disable_streaming_preaggregations=true) */ c_city, s_city, d_year, 
SUM(lo_revenue)
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ c_city, 
s_city, d_year, SUM(lo_revenue)
 AS  REVENUE
 FROM customer, lineorder, supplier, date
 WHERE lo_custkey = c_custkey
diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q4.1.sql 
b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q4.1.sql
index 029934b62a..3e0227c2ea 100644
--- a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q4.1.sql
+++ b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q4.1.sql
@@ -15,7 +15,7 @@
 -- specific language governing permissions and limitations
 -- under the License.
 
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, 
disable_streaming_preaggregations=true) */ d_year, c_nation,
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ d_year, 
c_nation,
 SUM(lo_revenue - lo_supplycost) AS PROFIT
 FROM date, customer, supplier, part, lineorder
 WHERE lo_custkey = c_custkey
diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q4.2.sql 
b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q4.2.sql
index fc4bf8402e..1338e780ae 100644
--- a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q4.2.sql
+++ b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q4.2.sql
@@ -15,7 +15,7 @@
 -- specific language governing permissions and limitations
 -- under the License.
 
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, 
disable_streaming_preaggregations=true) */ d_year, s_nation, p_category,
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ d_year, 
s_nation, p_category,
 SUM(lo_revenue - lo_supplycost) AS PROFIT
 FROM date, customer, supplier, part, lineorder
 WHERE lo_custkey = c_custkey
diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q4.3.sql 
b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q4.3.sql
index 8c7c315ebc..d8e6f7c42d 100644
--- a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q4.3.sql
+++ b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q4.3.sql
@@ -15,7 +15,7 @@
 -- specific language governing permissions and limitations
 -- under the License.
 
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, 
disable_streaming_preaggregations=true) */ d_year, s_city, p_brand,
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ d_year, 
s_city, p_brand,
 SUM(lo_revenue - lo_supplycost) AS PROFIT
 FROM date, customer, supplier, part, lineorder
 WHERE lo_custkey = c_custkey
diff --git 
a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q01.sql 
b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q01.sql
index 76d38414cd..ded6754a97 100644
--- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q01.sql
+++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q01.sql
@@ -1,5 +1,5 @@
 -- tables: lineitem
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, 
disable_streaming_preaggregations=true) */
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
   l_returnflag,
   l_linestatus,
   sum(l_quantity)                                       AS sum_qty,
diff --git 
a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q02.sql 
b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q02.sql
index 25ec865314..f102f7504d 100644
--- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q02.sql
+++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q02.sql
@@ -1,5 +1,5 @@
 -- tables: part,supplier,partsupp,nation,region
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, 
disable_streaming_preaggregations=true) */
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
   s_acctbal,
   s_name,
   n_name,
diff --git 
a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q03.sql 
b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q03.sql
index eb6831b7ca..8bd60f0e07 100644
--- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q03.sql
+++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q03.sql
@@ -1,5 +1,5 @@
 -- tables: customer,orders,lineitem
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, 
disable_streaming_preaggregations=true) */
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
   l_orderkey,
   sum(l_extendedprice * (1 - l_discount)) AS revenue,
   o_orderdate,
diff --git 
a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q04.sql 
b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q04.sql
index e05c649b93..3f44094729 100644
--- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q04.sql
+++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q04.sql
@@ -1,5 +1,5 @@
 -- tables: orders,lineitem
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, 
disable_streaming_preaggregations=true) */
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
   o_orderpriority,
   count(*) AS order_count
 FROM orders
diff --git 
a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q05.sql 
b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q05.sql
index b69b05e5e0..ed179f8b86 100644
--- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q05.sql
+++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q05.sql
@@ -1,5 +1,5 @@
 -- tables: customer,orders,lineitem,supplier,nation,region
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, 
disable_streaming_preaggregations=true) */
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
   n_name,
   sum(l_extendedprice * (1 - l_discount)) AS revenue
 FROM
diff --git 
a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q06.sql 
b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q06.sql
index a9a080a45b..2dd86f8c2c 100644
--- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q06.sql
+++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q06.sql
@@ -1,6 +1,6 @@
 -- tables: lineitem
 
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, 
disable_streaming_preaggregations=true) */ sum(l_extendedprice * l_discount) AS 
revenue
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ 
sum(l_extendedprice * l_discount) AS revenue
 FROM
   lineitem
 WHERE
diff --git 
a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q07.sql 
b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q07.sql
index 79efbfdae4..6453c1094a 100644
--- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q07.sql
+++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q07.sql
@@ -1,5 +1,5 @@
 -- tables: supplier,lineitem,orders,customer,nation
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, 
disable_streaming_preaggregations=true) */
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
   supp_nation,
   cust_nation,
   l_year,
diff --git 
a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q08.sql 
b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q08.sql
index 1faced9805..e4c46fb084 100644
--- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q08.sql
+++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q08.sql
@@ -1,5 +1,5 @@
 -- tables: part,supplier,lineitem,orders,customer,nation,region
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, 
disable_streaming_preaggregations=true) */
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
   o_year,
   sum(CASE
       WHEN nation = 'BRAZIL'
diff --git 
a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q09.sql 
b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q09.sql
index a47a3b5e9c..cee9925fb5 100644
--- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q09.sql
+++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q09.sql
@@ -1,5 +1,5 @@
 -- tables: part,supplier,lineitem,partsupp,orders,nation
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, 
disable_streaming_preaggregations=true) */
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
   nation,
   o_year,
   sum(amount) AS sum_profit
diff --git 
a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q10.sql 
b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q10.sql
index 3d49252d3e..c95a80fcee 100644
--- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q10.sql
+++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q10.sql
@@ -1,5 +1,5 @@
 -- tables: customer,orders,lineitem,nation
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, 
disable_streaming_preaggregations=true) */
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
   c_custkey,
   c_name,
   sum(l_extendedprice * (1 - l_discount)) AS revenue,
diff --git 
a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q11.sql 
b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q11.sql
index 66140c9431..b23701e940 100644
--- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q11.sql
+++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q11.sql
@@ -1,5 +1,5 @@
 -- tables: partsupp,supplier,nation
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, 
disable_streaming_preaggregations=true) */
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
   ps_partkey,
   sum(ps_supplycost * ps_availqty) AS value
 FROM
diff --git 
a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q12.sql 
b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q12.sql
index 99c9c0f574..e8893e71e4 100644
--- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q12.sql
+++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q12.sql
@@ -1,5 +1,5 @@
 -- tables: orders,lineitem
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, 
disable_streaming_preaggregations=true) */
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
   l_shipmode,
   sum(CASE
       WHEN o_orderpriority = '1-URGENT'
diff --git 
a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q13.sql 
b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q13.sql
index 21a03f752c..9db2da60ee 100644
--- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q13.sql
+++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q13.sql
@@ -1,5 +1,5 @@
 -- tables: customer
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, 
disable_streaming_preaggregations=true) */
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
   c_count,
   count(*) AS custdist
 FROM (
diff --git 
a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q14.sql 
b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q14.sql
index b0e79f8da5..70d7a57d07 100644
--- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q14.sql
+++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q14.sql
@@ -1,5 +1,5 @@
 -- tables: lineitem,part
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, 
disable_streaming_preaggregations=true) */ 100.00 * sum(CASE
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ 100.00 * 
sum(CASE
                     WHEN p_type LIKE 'PROMO%'
                       THEN l_extendedprice * (1 - l_discount)
                     ELSE 0
diff --git 
a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q15.sql 
b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q15.sql
index 03c0b97372..45f75ff985 100644
--- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q15.sql
+++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q15.sql
@@ -1,4 +1,4 @@
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, 
disable_streaming_preaggregations=true) */
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
   s_suppkey,
   s_name,
   s_address,
diff --git 
a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q16.sql 
b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q16.sql
index 7df971f830..37a438c796 100644
--- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q16.sql
+++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q16.sql
@@ -1,5 +1,5 @@
 -- tables: partsupp,part,supplier
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, 
disable_streaming_preaggregations=true) */
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
   p_brand,
   p_type,
   p_size,
diff --git 
a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q17.sql 
b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q17.sql
index 3f9203cea0..62f39a750c 100644
--- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q17.sql
+++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q17.sql
@@ -1,5 +1,5 @@
 -- tables: lineitem,part
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, 
disable_streaming_preaggregations=true) */ sum(l_extendedprice) / 7.0 AS 
avg_yearly
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ 
sum(l_extendedprice) / 7.0 AS avg_yearly
 FROM
   lineitem,
   part
diff --git 
a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q18.sql 
b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q18.sql
index 971d457672..2eb2505c01 100644
--- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q18.sql
+++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q18.sql
@@ -1,5 +1,5 @@
 -- tables: customer,orders,lineitem
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, 
disable_streaming_preaggregations=true) */
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
   c_name,
   c_custkey,
   o_orderkey,
diff --git 
a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q19.sql 
b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q19.sql
index 8cc1b890c9..16e543f87c 100644
--- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q19.sql
+++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q19.sql
@@ -1,5 +1,5 @@
 -- tables: lineitem,part
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, 
disable_streaming_preaggregations=true) */ sum(l_extendedprice * (1 - 
l_discount)) AS revenue
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ 
sum(l_extendedprice * (1 - l_discount)) AS revenue
 FROM
   lineitem,
   part
diff --git 
a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q20.sql 
b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q20.sql
index f54d44ed8b..a2aca56790 100644
--- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q20.sql
+++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q20.sql
@@ -1,5 +1,5 @@
 -- tables: supplier,nation,partsupp,lineitem,part
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, 
disable_streaming_preaggregations=true) */
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
   s_name,
   s_address
 FROM
diff --git 
a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q21.sql 
b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q21.sql
index 56d81211db..7b4874f96c 100644
--- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q21.sql
+++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q21.sql
@@ -1,5 +1,5 @@
 -- tables: supplier,lineitem,orders,nation
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, 
disable_streaming_preaggregations=true) */
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
   s_name,
   count(*) AS numwait
 FROM
diff --git 
a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q22.sql 
b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q22.sql
index c3f199f124..bf784175e0 100644
--- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q22.sql
+++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q22.sql
@@ -1,5 +1,5 @@
 -- tables: orders,customer
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, 
disable_streaming_preaggregations=true) */
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
   cntrycode,
   count(*)       AS numcust,
   sum(c_acctbal) AS totacctbal


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to