This is an automated email from the ASF dual-hosted git repository. panxiaolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new ab69416922 [Bug](pipelineX) fix streaming agg (#24449) ab69416922 is described below commit ab69416922464da09c05a7ae7044cfc6a0b1716a Author: Gabriel <gabrielleeb...@gmail.com> AuthorDate: Fri Sep 15 19:22:54 2023 +0800 [Bug](pipelineX) fix streaming agg (#24449) fix streaming agg --- be/src/pipeline/exec/aggregation_sink_operator.cpp | 2 +- .../pipeline/exec/aggregation_source_operator.cpp | 264 ++++++++++----------- be/src/pipeline/exec/aggregation_source_operator.h | 20 +- be/src/pipeline/exec/data_queue.cpp | 2 +- be/src/pipeline/exec/data_queue.h | 6 +- be/src/pipeline/exec/scan_operator.cpp | 4 +- .../exec/streaming_aggregation_sink_operator.h | 4 +- .../exec/streaming_aggregation_source_operator.cpp | 24 +- .../exec/streaming_aggregation_source_operator.h | 30 +-- be/src/pipeline/pipeline_x/dependency.h | 22 +- be/src/pipeline/pipeline_x/operator.cpp | 4 +- be/src/pipeline/pipeline_x/operator.h | 1 - .../ssb_sf0.1_p1/sql/pipelinex_flat_q1.1.sql | 2 +- .../ssb_sf0.1_p1/sql/pipelinex_flat_q1.2.sql | 2 +- .../ssb_sf0.1_p1/sql/pipelinex_flat_q1.3.sql | 2 +- .../ssb_sf0.1_p1/sql/pipelinex_flat_q2.1.sql | 2 +- .../ssb_sf0.1_p1/sql/pipelinex_flat_q2.2.sql | 2 +- .../ssb_sf0.1_p1/sql/pipelinex_flat_q2.3.sql | 2 +- .../ssb_sf0.1_p1/sql/pipelinex_flat_q3.1.sql | 2 +- .../ssb_sf0.1_p1/sql/pipelinex_flat_q3.2.sql | 2 +- .../ssb_sf0.1_p1/sql/pipelinex_flat_q3.3.sql | 2 +- .../ssb_sf0.1_p1/sql/pipelinex_flat_q3.4.sql | 2 +- .../ssb_sf0.1_p1/sql/pipelinex_flat_q4.1.sql | 2 +- .../ssb_sf0.1_p1/sql/pipelinex_flat_q4.2.sql | 2 +- .../ssb_sf0.1_p1/sql/pipelinex_flat_q4.3.sql | 2 +- .../suites/ssb_sf0.1_p1/sql/pipelinex_q1.1.sql | 2 +- .../suites/ssb_sf0.1_p1/sql/pipelinex_q1.2.sql | 2 +- .../suites/ssb_sf0.1_p1/sql/pipelinex_q1.3.sql | 2 +- .../suites/ssb_sf0.1_p1/sql/pipelinex_q2.1.sql | 2 +- .../suites/ssb_sf0.1_p1/sql/pipelinex_q2.2.sql | 2 +- .../suites/ssb_sf0.1_p1/sql/pipelinex_q2.3.sql | 2 +- .../suites/ssb_sf0.1_p1/sql/pipelinex_q3.1.sql | 2 +- .../suites/ssb_sf0.1_p1/sql/pipelinex_q3.2.sql | 2 +- .../suites/ssb_sf0.1_p1/sql/pipelinex_q3.3.sql | 2 +- .../suites/ssb_sf0.1_p1/sql/pipelinex_q3.4.sql | 2 +- .../suites/ssb_sf0.1_p1/sql/pipelinex_q4.1.sql | 2 +- .../suites/ssb_sf0.1_p1/sql/pipelinex_q4.2.sql | 2 +- .../suites/ssb_sf0.1_p1/sql/pipelinex_q4.3.sql | 2 +- .../tpch_unique_sql_zstd_p0/sql/pipelinex_q01.sql | 2 +- .../tpch_unique_sql_zstd_p0/sql/pipelinex_q02.sql | 2 +- .../tpch_unique_sql_zstd_p0/sql/pipelinex_q03.sql | 2 +- .../tpch_unique_sql_zstd_p0/sql/pipelinex_q04.sql | 2 +- .../tpch_unique_sql_zstd_p0/sql/pipelinex_q05.sql | 2 +- .../tpch_unique_sql_zstd_p0/sql/pipelinex_q06.sql | 2 +- .../tpch_unique_sql_zstd_p0/sql/pipelinex_q07.sql | 2 +- .../tpch_unique_sql_zstd_p0/sql/pipelinex_q08.sql | 2 +- .../tpch_unique_sql_zstd_p0/sql/pipelinex_q09.sql | 2 +- .../tpch_unique_sql_zstd_p0/sql/pipelinex_q10.sql | 2 +- .../tpch_unique_sql_zstd_p0/sql/pipelinex_q11.sql | 2 +- .../tpch_unique_sql_zstd_p0/sql/pipelinex_q12.sql | 2 +- .../tpch_unique_sql_zstd_p0/sql/pipelinex_q13.sql | 2 +- .../tpch_unique_sql_zstd_p0/sql/pipelinex_q14.sql | 2 +- .../tpch_unique_sql_zstd_p0/sql/pipelinex_q15.sql | 2 +- .../tpch_unique_sql_zstd_p0/sql/pipelinex_q16.sql | 2 +- .../tpch_unique_sql_zstd_p0/sql/pipelinex_q17.sql | 2 +- .../tpch_unique_sql_zstd_p0/sql/pipelinex_q18.sql | 2 +- .../tpch_unique_sql_zstd_p0/sql/pipelinex_q19.sql | 2 +- .../tpch_unique_sql_zstd_p0/sql/pipelinex_q20.sql | 2 +- .../tpch_unique_sql_zstd_p0/sql/pipelinex_q21.sql | 2 +- .../tpch_unique_sql_zstd_p0/sql/pipelinex_q22.sql | 2 +- 60 files changed, 192 insertions(+), 287 deletions(-) diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp b/be/src/pipeline/exec/aggregation_sink_operator.cpp index b30bb9b815..cc0eb3be1d 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp @@ -947,6 +947,6 @@ class StreamingAggSinkLocalState; template class AggSinkOperatorX<BlockingAggSinkLocalState>; template class AggSinkOperatorX<StreamingAggSinkLocalState>; template class AggSinkLocalState<AggDependency, BlockingAggSinkLocalState>; -template class AggSinkLocalState<StreamingAggDependency, StreamingAggSinkLocalState>; +template class AggSinkLocalState<AggDependency, StreamingAggSinkLocalState>; } // namespace doris::pipeline diff --git a/be/src/pipeline/exec/aggregation_source_operator.cpp b/be/src/pipeline/exec/aggregation_source_operator.cpp index 476683f8b8..16125c424a 100644 --- a/be/src/pipeline/exec/aggregation_source_operator.cpp +++ b/be/src/pipeline/exec/aggregation_source_operator.cpp @@ -28,8 +28,7 @@ namespace pipeline { OPERATOR_CODE_GENERATOR(AggSourceOperator, SourceOperator) -template <typename DependencyType, typename Derived> -AggLocalState<DependencyType, Derived>::AggLocalState(RuntimeState* state, OperatorXBase* parent) +AggLocalState::AggLocalState(RuntimeState* state, OperatorXBase* parent) : Base(state, parent), _get_results_timer(nullptr), _serialize_result_timer(nullptr), @@ -38,49 +37,47 @@ AggLocalState<DependencyType, Derived>::AggLocalState(RuntimeState* state, Opera _serialize_data_timer(nullptr), _hash_table_size_counter(nullptr) {} -template <typename DependencyType, typename Derived> -Status AggLocalState<DependencyType, Derived>::init(RuntimeState* state, LocalStateInfo& info) { +Status AggLocalState::init(RuntimeState* state, LocalStateInfo& info) { RETURN_IF_ERROR(Base::init(state, info)); SCOPED_TIMER(Base::profile()->total_time_counter()); - _agg_data = Base::_shared_state->agg_data.get(); + _agg_data = _shared_state->agg_data.get(); _get_results_timer = ADD_TIMER(Base::profile(), "GetResultsTime"); _serialize_result_timer = ADD_TIMER(Base::profile(), "SerializeResultTime"); _hash_table_iterate_timer = ADD_TIMER(Base::profile(), "HashTableIterateTime"); _insert_keys_to_column_timer = ADD_TIMER(Base::profile(), "InsertKeysToColumnTime"); _serialize_data_timer = ADD_TIMER(Base::profile(), "SerializeDataTime"); _hash_table_size_counter = ADD_COUNTER(Base::profile(), "HashTableSize", TUnit::UNIT); - auto& p = Base::_parent->template cast<typename Derived::Parent>(); + auto& p = _parent->template cast<AggSourceOperatorX>(); if (p._without_key) { if (p._needs_finalize) { - _executor.get_result = std::bind<Status>(&Derived::_get_without_key_result, - (Derived*)this, std::placeholders::_1, - std::placeholders::_2, std::placeholders::_3); + _executor.get_result = std::bind<Status>(&AggLocalState::_get_without_key_result, this, + std::placeholders::_1, std::placeholders::_2, + std::placeholders::_3); } else { - _executor.get_result = std::bind<Status>(&Derived::_serialize_without_key, - (Derived*)this, std::placeholders::_1, - std::placeholders::_2, std::placeholders::_3); + _executor.get_result = std::bind<Status>(&AggLocalState::_serialize_without_key, this, + std::placeholders::_1, std::placeholders::_2, + std::placeholders::_3); } - _executor.close = std::bind<void>(&Derived::_close_without_key, (Derived*)this); + _executor.close = std::bind<void>(&AggLocalState::_close_without_key, this); } else { if (p._needs_finalize) { - _executor.get_result = std::bind<Status>(&Derived::_get_with_serialized_key_result, - (Derived*)this, std::placeholders::_1, - std::placeholders::_2, std::placeholders::_3); + _executor.get_result = std::bind<Status>( + &AggLocalState::_get_with_serialized_key_result, this, std::placeholders::_1, + std::placeholders::_2, std::placeholders::_3); } else { _executor.get_result = std::bind<Status>( - &Derived::_serialize_with_serialized_key_result, (Derived*)this, + &AggLocalState::_serialize_with_serialized_key_result, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3); } - _executor.close = std::bind<void>(&Derived::_close_with_serialized_key, (Derived*)this); + _executor.close = std::bind<void>(&AggLocalState::_close_with_serialized_key, this); } _agg_data_created_without_key = p._without_key; return Status::OK(); } -template <typename DependencyType, typename Derived> -void AggLocalState<DependencyType, Derived>::_close_with_serialized_key() { +void AggLocalState::_close_with_serialized_key() { std::visit( [&](auto&& agg_method) -> void { auto& data = agg_method.data; @@ -98,8 +95,7 @@ void AggLocalState<DependencyType, Derived>::_close_with_serialized_key() { Base::_dependency->release_tracker(); } -template <typename DependencyType, typename Derived> -void AggLocalState<DependencyType, Derived>::_close_without_key() { +void AggLocalState::_close_without_key() { //because prepare maybe failed, and couldn't create agg data. //but finally call close to destory agg data, if agg data has bitmapValue //will be core dump, it's not initialized @@ -110,39 +106,37 @@ void AggLocalState<DependencyType, Derived>::_close_without_key() { Base::_dependency->release_tracker(); } -template <typename DependencyType, typename Derived> -Status AggLocalState<DependencyType, Derived>::_serialize_with_serialized_key_result( - RuntimeState* state, vectorized::Block* block, SourceState& source_state) { - if (Base::_shared_state->spill_context.has_data) { +Status AggLocalState::_serialize_with_serialized_key_result(RuntimeState* state, + vectorized::Block* block, + SourceState& source_state) { + if (_shared_state->spill_context.has_data) { return _serialize_with_serialized_key_result_with_spilt_data(state, block, source_state); } else { return _serialize_with_serialized_key_result_non_spill(state, block, source_state); } } -template <typename DependencyType, typename Derived> -Status -AggLocalState<DependencyType, Derived>::_serialize_with_serialized_key_result_with_spilt_data( +Status AggLocalState::_serialize_with_serialized_key_result_with_spilt_data( RuntimeState* state, vectorized::Block* block, SourceState& source_state) { - CHECK(!Base::_shared_state->spill_context.stream_ids.empty()); - CHECK(Base::_shared_state->spill_partition_helper != nullptr) + CHECK(!_shared_state->spill_context.stream_ids.empty()); + CHECK(_shared_state->spill_partition_helper != nullptr) << "_spill_partition_helper should not be null"; - Base::_shared_state->aggregate_data_container->init_once(); - while (Base::_shared_state->aggregate_data_container->iterator == - Base::_shared_state->aggregate_data_container->end()) { - if (Base::_shared_state->spill_context.read_cursor == - Base::_shared_state->spill_partition_helper->partition_count) { + _shared_state->aggregate_data_container->init_once(); + while (_shared_state->aggregate_data_container->iterator == + _shared_state->aggregate_data_container->end()) { + if (_shared_state->spill_context.read_cursor == + _shared_state->spill_partition_helper->partition_count) { break; } RETURN_IF_ERROR(Base::_dependency->reset_hash_table()); RETURN_IF_ERROR(Base::_dependency->merge_spilt_data()); - Base::_shared_state->aggregate_data_container->init_once(); + _shared_state->aggregate_data_container->init_once(); } RETURN_IF_ERROR(_serialize_with_serialized_key_result_non_spill(state, block, source_state)); if (source_state == SourceState::FINISHED) { - source_state = Base::_shared_state->spill_context.read_cursor == - Base::_shared_state->spill_partition_helper->partition_count + source_state = _shared_state->spill_context.read_cursor == + _shared_state->spill_partition_helper->partition_count ? SourceState::FINISHED : SourceState::DEPEND_ON_SOURCE; } @@ -150,12 +144,12 @@ AggLocalState<DependencyType, Derived>::_serialize_with_serialized_key_result_wi return Status::OK(); } -template <typename DependencyType, typename Derived> -Status AggLocalState<DependencyType, Derived>::_serialize_with_serialized_key_result_non_spill( - RuntimeState* state, vectorized::Block* block, SourceState& source_state) { +Status AggLocalState::_serialize_with_serialized_key_result_non_spill(RuntimeState* state, + vectorized::Block* block, + SourceState& source_state) { SCOPED_TIMER(_serialize_result_timer); - int key_size = Base::_shared_state->probe_expr_ctxs.size(); - int agg_size = Base::_shared_state->aggregate_evaluators.size(); + int key_size = _shared_state->probe_expr_ctxs.size(); + int agg_size = _shared_state->aggregate_evaluators.size(); vectorized::MutableColumns value_columns(agg_size); vectorized::DataTypes value_data_types(agg_size); @@ -168,7 +162,7 @@ Status AggLocalState<DependencyType, Derived>::_serialize_with_serialized_key_re key_columns.emplace_back(std::move(*block->get_by_position(i).column).mutate()); } else { key_columns.emplace_back( - Base::_shared_state->probe_expr_ctxs[i]->root()->data_type()->create_column()); + _shared_state->probe_expr_ctxs[i]->root()->data_type()->create_column()); } } @@ -180,20 +174,20 @@ Status AggLocalState<DependencyType, Derived>::_serialize_with_serialized_key_re const auto size = std::min(data.size(), size_t(state->batch_size())); using KeyType = std::decay_t<decltype(agg_method.iterator->get_first())>; std::vector<KeyType> keys(size); - if (Base::_shared_state->values.size() < size + 1) { - Base::_shared_state->values.resize(size + 1); + if (_shared_state->values.size() < size + 1) { + _shared_state->values.resize(size + 1); } size_t num_rows = 0; - Base::_shared_state->aggregate_data_container->init_once(); - auto& iter = Base::_shared_state->aggregate_data_container->iterator; + _shared_state->aggregate_data_container->init_once(); + auto& iter = _shared_state->aggregate_data_container->iterator; { SCOPED_TIMER(_hash_table_iterate_timer); - while (iter != Base::_shared_state->aggregate_data_container->end() && + while (iter != _shared_state->aggregate_data_container->end() && num_rows < state->batch_size()) { keys[num_rows] = iter.template get_key<KeyType>(); - Base::_shared_state->values[num_rows] = iter.get_aggregate_data(); + _shared_state->values[num_rows] = iter.get_aggregate_data(); ++iter; ++num_rows; } @@ -202,10 +196,10 @@ Status AggLocalState<DependencyType, Derived>::_serialize_with_serialized_key_re { SCOPED_TIMER(_insert_keys_to_column_timer); agg_method.insert_keys_into_columns(keys, key_columns, num_rows, - Base::_shared_state->probe_key_sz); + _shared_state->probe_key_sz); } - if (iter == Base::_shared_state->aggregate_data_container->end()) { + if (iter == _shared_state->aggregate_data_container->end()) { if (agg_method.data.has_null_key_data()) { // only one key of group by support wrap null key // here need additional processing logic on the null key / value @@ -213,8 +207,7 @@ Status AggLocalState<DependencyType, Derived>::_serialize_with_serialized_key_re DCHECK(key_columns[0]->is_nullable()); if (agg_method.data.has_null_key_data()) { key_columns[0]->insert_data(nullptr, 0); - Base::_shared_state->values[num_rows] = - agg_method.data.get_null_key_data(); + _shared_state->values[num_rows] = agg_method.data.get_null_key_data(); ++num_rows; source_state = SourceState::FINISHED; } @@ -225,8 +218,8 @@ Status AggLocalState<DependencyType, Derived>::_serialize_with_serialized_key_re { SCOPED_TIMER(_serialize_data_timer); - for (size_t i = 0; i < Base::_shared_state->aggregate_evaluators.size(); ++i) { - value_data_types[i] = Base::_shared_state->aggregate_evaluators[i] + for (size_t i = 0; i < _shared_state->aggregate_evaluators.size(); ++i) { + value_data_types[i] = _shared_state->aggregate_evaluators[i] ->function() ->get_serialized_type(); if (mem_reuse) { @@ -234,16 +227,14 @@ Status AggLocalState<DependencyType, Derived>::_serialize_with_serialized_key_re std::move(*block->get_by_position(i + key_size).column) .mutate(); } else { - value_columns[i] = Base::_shared_state->aggregate_evaluators[i] + value_columns[i] = _shared_state->aggregate_evaluators[i] ->function() ->create_serialize_column(); } - Base::_shared_state->aggregate_evaluators[i] - ->function() - ->serialize_to_column( - Base::_shared_state->values, - Base::_dependency->offsets_of_aggregate_states()[i], - value_columns[i], num_rows); + _shared_state->aggregate_evaluators[i]->function()->serialize_to_column( + _shared_state->values, + Base::_dependency->offsets_of_aggregate_states()[i], + value_columns[i], num_rows); } } }, @@ -254,8 +245,8 @@ Status AggLocalState<DependencyType, Derived>::_serialize_with_serialized_key_re for (int i = 0; i < key_size; ++i) { columns_with_schema.emplace_back( std::move(key_columns[i]), - Base::_shared_state->probe_expr_ctxs[i]->root()->data_type(), - Base::_shared_state->probe_expr_ctxs[i]->root()->expr_name()); + _shared_state->probe_expr_ctxs[i]->root()->data_type(), + _shared_state->probe_expr_ctxs[i]->root()->expr_name()); } for (int i = 0; i < agg_size; ++i) { columns_with_schema.emplace_back(std::move(value_columns[i]), value_data_types[i], ""); @@ -266,38 +257,36 @@ Status AggLocalState<DependencyType, Derived>::_serialize_with_serialized_key_re return Status::OK(); } -template <typename DependencyType, typename Derived> -Status AggLocalState<DependencyType, Derived>::_get_with_serialized_key_result( - RuntimeState* state, vectorized::Block* block, SourceState& source_state) { - if (Base::_shared_state->spill_context.has_data) { +Status AggLocalState::_get_with_serialized_key_result(RuntimeState* state, vectorized::Block* block, + SourceState& source_state) { + if (_shared_state->spill_context.has_data) { return _get_result_with_spilt_data(state, block, source_state); } else { return _get_result_with_serialized_key_non_spill(state, block, source_state); } } -template <typename DependencyType, typename Derived> -Status AggLocalState<DependencyType, Derived>::_get_result_with_spilt_data( - RuntimeState* state, vectorized::Block* block, SourceState& source_state) { - CHECK(!Base::_shared_state->spill_context.stream_ids.empty()); - CHECK(Base::_shared_state->spill_partition_helper != nullptr) +Status AggLocalState::_get_result_with_spilt_data(RuntimeState* state, vectorized::Block* block, + SourceState& source_state) { + CHECK(!_shared_state->spill_context.stream_ids.empty()); + CHECK(_shared_state->spill_partition_helper != nullptr) << "_spill_partition_helper should not be null"; - Base::_shared_state->aggregate_data_container->init_once(); - while (Base::_shared_state->aggregate_data_container->iterator == - Base::_shared_state->aggregate_data_container->end()) { - if (Base::_shared_state->spill_context.read_cursor == - Base::_shared_state->spill_partition_helper->partition_count) { + _shared_state->aggregate_data_container->init_once(); + while (_shared_state->aggregate_data_container->iterator == + _shared_state->aggregate_data_container->end()) { + if (_shared_state->spill_context.read_cursor == + _shared_state->spill_partition_helper->partition_count) { break; } RETURN_IF_ERROR(Base::_dependency->reset_hash_table()); RETURN_IF_ERROR(Base::_dependency->merge_spilt_data()); - Base::_shared_state->aggregate_data_container->init_once(); + _shared_state->aggregate_data_container->init_once(); } RETURN_IF_ERROR(_get_result_with_serialized_key_non_spill(state, block, source_state)); if (source_state == SourceState::FINISHED) { - source_state = Base::_shared_state->spill_context.read_cursor == - Base::_shared_state->spill_partition_helper->partition_count + source_state = _shared_state->spill_context.read_cursor == + _shared_state->spill_partition_helper->partition_count ? SourceState::FINISHED : SourceState::DEPEND_ON_SOURCE; } @@ -305,15 +294,15 @@ Status AggLocalState<DependencyType, Derived>::_get_result_with_spilt_data( return Status::OK(); } -template <typename DependencyType, typename Derived> -Status AggLocalState<DependencyType, Derived>::_get_result_with_serialized_key_non_spill( - RuntimeState* state, vectorized::Block* block, SourceState& source_state) { +Status AggLocalState::_get_result_with_serialized_key_non_spill(RuntimeState* state, + vectorized::Block* block, + SourceState& source_state) { // non-nullable column(id in `_make_nullable_keys`) will be converted to nullable. bool mem_reuse = Base::_dependency->make_nullable_keys().empty() && block->mem_reuse(); auto columns_with_schema = vectorized::VectorizedUtils::create_columns_with_type_and_name( - Base::_parent->template cast<typename Derived::Parent>()._row_descriptor); - int key_size = Base::_shared_state->probe_expr_ctxs.size(); + _parent->cast<AggSourceOperatorX>()._row_descriptor); + int key_size = _shared_state->probe_expr_ctxs.size(); vectorized::MutableColumns key_columns; for (int i = 0; i < key_size; ++i) { @@ -340,20 +329,20 @@ Status AggLocalState<DependencyType, Derived>::_get_result_with_serialized_key_n const auto size = std::min(data.size(), size_t(state->batch_size())); using KeyType = std::decay_t<decltype(agg_method.iterator->get_first())>; std::vector<KeyType> keys(size); - if (Base::_shared_state->values.size() < size) { - Base::_shared_state->values.resize(size); + if (_shared_state->values.size() < size) { + _shared_state->values.resize(size); } size_t num_rows = 0; - Base::_shared_state->aggregate_data_container->init_once(); - auto& iter = Base::_shared_state->aggregate_data_container->iterator; + _shared_state->aggregate_data_container->init_once(); + auto& iter = _shared_state->aggregate_data_container->iterator; { SCOPED_TIMER(_hash_table_iterate_timer); - while (iter != Base::_shared_state->aggregate_data_container->end() && + while (iter != _shared_state->aggregate_data_container->end() && num_rows < state->batch_size()) { keys[num_rows] = iter.template get_key<KeyType>(); - Base::_shared_state->values[num_rows] = iter.get_aggregate_data(); + _shared_state->values[num_rows] = iter.get_aggregate_data(); ++iter; ++num_rows; } @@ -362,17 +351,17 @@ Status AggLocalState<DependencyType, Derived>::_get_result_with_serialized_key_n { SCOPED_TIMER(_insert_keys_to_column_timer); agg_method.insert_keys_into_columns(keys, key_columns, num_rows, - Base::_shared_state->probe_key_sz); + _shared_state->probe_key_sz); } - for (size_t i = 0; i < Base::_shared_state->aggregate_evaluators.size(); ++i) { - Base::_shared_state->aggregate_evaluators[i]->insert_result_info_vec( - Base::_shared_state->values, + for (size_t i = 0; i < _shared_state->aggregate_evaluators.size(); ++i) { + _shared_state->aggregate_evaluators[i]->insert_result_info_vec( + _shared_state->values, Base::_dependency->offsets_of_aggregate_states()[i], value_columns[i].get(), num_rows); } - if (iter == Base::_shared_state->aggregate_data_container->end()) { + if (iter == _shared_state->aggregate_data_container->end()) { if (agg_method.data.has_null_key_data()) { // only one key of group by support wrap null key // here need additional processing logic on the null key / value @@ -381,9 +370,8 @@ Status AggLocalState<DependencyType, Derived>::_get_result_with_serialized_key_n if (key_columns[0]->size() < state->batch_size()) { key_columns[0]->insert_data(nullptr, 0); auto mapped = agg_method.data.get_null_key_data(); - for (size_t i = 0; i < Base::_shared_state->aggregate_evaluators.size(); - ++i) - Base::_shared_state->aggregate_evaluators[i]->insert_result_info( + for (size_t i = 0; i < _shared_state->aggregate_evaluators.size(); ++i) + _shared_state->aggregate_evaluators[i]->insert_result_info( mapped + Base::_dependency->offsets_of_aggregate_states()[i], value_columns[i].get()); @@ -412,42 +400,39 @@ Status AggLocalState<DependencyType, Derived>::_get_result_with_serialized_key_n return Status::OK(); } -template <typename DependencyType, typename Derived> -Status AggLocalState<DependencyType, Derived>::_serialize_without_key(RuntimeState* state, - vectorized::Block* block, - SourceState& source_state) { +Status AggLocalState::_serialize_without_key(RuntimeState* state, vectorized::Block* block, + SourceState& source_state) { // 1. `child(0)->rows_returned() == 0` mean not data from child // in level two aggregation node should return NULL result // level one aggregation node set `eos = true` return directly SCOPED_TIMER(_serialize_result_timer); - if (UNLIKELY(Base::_shared_state->input_num_rows == 0)) { + if (UNLIKELY(_shared_state->input_num_rows == 0)) { source_state = SourceState::FINISHED; return Status::OK(); } block->clear(); DCHECK(_agg_data->without_key != nullptr); - int agg_size = Base::_shared_state->aggregate_evaluators.size(); + int agg_size = _shared_state->aggregate_evaluators.size(); vectorized::MutableColumns value_columns(agg_size); std::vector<vectorized::DataTypePtr> data_types(agg_size); // will serialize data to string column - for (int i = 0; i < Base::_shared_state->aggregate_evaluators.size(); ++i) { - data_types[i] = - Base::_shared_state->aggregate_evaluators[i]->function()->get_serialized_type(); + for (int i = 0; i < _shared_state->aggregate_evaluators.size(); ++i) { + data_types[i] = _shared_state->aggregate_evaluators[i]->function()->get_serialized_type(); value_columns[i] = - Base::_shared_state->aggregate_evaluators[i]->function()->create_serialize_column(); + _shared_state->aggregate_evaluators[i]->function()->create_serialize_column(); } - for (int i = 0; i < Base::_shared_state->aggregate_evaluators.size(); ++i) { - Base::_shared_state->aggregate_evaluators[i]->function()->serialize_without_key_to_column( + for (int i = 0; i < _shared_state->aggregate_evaluators.size(); ++i) { + _shared_state->aggregate_evaluators[i]->function()->serialize_without_key_to_column( _agg_data->without_key + Base::_dependency->offsets_of_aggregate_states()[i], *value_columns[i]); } { vectorized::ColumnsWithTypeAndName data_with_schema; - for (int i = 0; i < Base::_shared_state->aggregate_evaluators.size(); ++i) { + for (int i = 0; i < _shared_state->aggregate_evaluators.size(); ++i) { vectorized::ColumnWithTypeAndName column_with_schema = {nullptr, data_types[i], ""}; data_with_schema.push_back(std::move(column_with_schema)); } @@ -459,27 +444,25 @@ Status AggLocalState<DependencyType, Derived>::_serialize_without_key(RuntimeSta return Status::OK(); } -template <typename DependencyType, typename Derived> -Status AggLocalState<DependencyType, Derived>::_get_without_key_result(RuntimeState* state, - vectorized::Block* block, - SourceState& source_state) { +Status AggLocalState::_get_without_key_result(RuntimeState* state, vectorized::Block* block, + SourceState& source_state) { DCHECK(_agg_data->without_key != nullptr); block->clear(); - auto& p = Base::_parent->template cast<typename Derived::Parent>(); + auto& p = _parent->cast<AggSourceOperatorX>(); *block = vectorized::VectorizedUtils::create_empty_columnswithtypename(p._row_descriptor); - int agg_size = Base::_shared_state->aggregate_evaluators.size(); + int agg_size = _shared_state->aggregate_evaluators.size(); vectorized::MutableColumns columns(agg_size); std::vector<vectorized::DataTypePtr> data_types(agg_size); - for (int i = 0; i < Base::_shared_state->aggregate_evaluators.size(); ++i) { - data_types[i] = Base::_shared_state->aggregate_evaluators[i]->function()->get_return_type(); + for (int i = 0; i < _shared_state->aggregate_evaluators.size(); ++i) { + data_types[i] = _shared_state->aggregate_evaluators[i]->function()->get_return_type(); columns[i] = data_types[i]->create_column(); } - for (int i = 0; i < Base::_shared_state->aggregate_evaluators.size(); ++i) { + for (int i = 0; i < _shared_state->aggregate_evaluators.size(); ++i) { auto column = columns[i].get(); - Base::_shared_state->aggregate_evaluators[i]->insert_result_info( + _shared_state->aggregate_evaluators[i]->insert_result_info( _agg_data->without_key + Base::_dependency->offsets_of_aggregate_states()[i], column); } @@ -502,7 +485,7 @@ Status AggLocalState<DependencyType, Derived>::_get_without_key_result(RuntimeSt vectorized::ColumnPtr ptr = std::move(columns[i]); // unless `count`, other aggregate function dispose empty set should be null // so here check the children row return - ptr = make_nullable(ptr, Base::_shared_state->input_num_rows == 0); + ptr = make_nullable(ptr, _shared_state->input_num_rows == 0); columns[i] = ptr->assume_mutable(); } } @@ -521,7 +504,7 @@ AggSourceOperatorX::AggSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, Status AggSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* block, SourceState& source_state) { - auto& local_state = state->get_local_state(id())->cast<BlockingAggLocalState>(); + auto& local_state = state->get_local_state(id())->cast<AggLocalState>(); SCOPED_TIMER(local_state.profile()->total_time_counter()); RETURN_IF_ERROR(local_state._executor.get_result(state, block, source_state)); local_state.make_nullable_output_key(block); @@ -531,8 +514,7 @@ Status AggSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* blo return Status::OK(); } -template <typename DependencyType, typename Derived> -void AggLocalState<DependencyType, Derived>::make_nullable_output_key(vectorized::Block* block) { +void AggLocalState::make_nullable_output_key(vectorized::Block* block) { if (block->rows() != 0) { for (auto cid : Base::_dependency->make_nullable_keys()) { block->get_by_position(cid).column = make_nullable(block->get_by_position(cid).column); @@ -541,13 +523,12 @@ void AggLocalState<DependencyType, Derived>::make_nullable_output_key(vectorized } } -template <typename DependencyType, typename Derived> -Status AggLocalState<DependencyType, Derived>::close(RuntimeState* state) { +Status AggLocalState::close(RuntimeState* state) { SCOPED_TIMER(Base::profile()->total_time_counter()); if (Base::_closed) { return Status::OK(); } - for (auto* aggregate_evaluator : Base::_shared_state->aggregate_evaluators) { + for (auto* aggregate_evaluator : _shared_state->aggregate_evaluators) { aggregate_evaluator->close(state); } if (_executor.close) { @@ -563,26 +544,19 @@ Status AggLocalState<DependencyType, Derived>::close(RuntimeState* state) { _agg_data->method_variant); } - Base::_shared_state->agg_data = nullptr; - Base::_shared_state->aggregate_data_container = nullptr; - Base::_shared_state->agg_arena_pool = nullptr; - Base::_shared_state->agg_profile_arena = nullptr; + _shared_state->agg_data = nullptr; + _shared_state->aggregate_data_container = nullptr; + _shared_state->agg_arena_pool = nullptr; + _shared_state->agg_profile_arena = nullptr; std::vector<vectorized::AggregateDataPtr> tmp_values; - Base::_shared_state->values.swap(tmp_values); + _shared_state->values.swap(tmp_values); return Base::close(state); } Dependency* AggSourceOperatorX::wait_for_dependency(RuntimeState* state) { - return state->get_local_state(Base::id()) - ->cast<BlockingAggLocalState>() - ._dependency->read_blocked_by(); + return state->get_local_state(Base::id())->cast<AggLocalState>()._dependency->read_blocked_by(); } -class StreamingAggLocalState; - -template class AggLocalState<AggDependency, BlockingAggLocalState>; -template class AggLocalState<StreamingAggDependency, StreamingAggLocalState>; - } // namespace pipeline } // namespace doris diff --git a/be/src/pipeline/exec/aggregation_source_operator.h b/be/src/pipeline/exec/aggregation_source_operator.h index 3148f06792..a840a72f13 100644 --- a/be/src/pipeline/exec/aggregation_source_operator.h +++ b/be/src/pipeline/exec/aggregation_source_operator.h @@ -50,10 +50,9 @@ public: class AggSourceOperatorX; -template <typename DependencyType, typename Derived> -class AggLocalState : public PipelineXLocalState<DependencyType> { +class AggLocalState : public PipelineXLocalState<AggDependency> { public: - using Base = PipelineXLocalState<DependencyType>; + using Base = PipelineXLocalState<AggDependency>; ENABLE_FACTORY_CREATOR(AggLocalState); AggLocalState(RuntimeState* state, OperatorXBase* parent); @@ -110,19 +109,9 @@ protected: bool _agg_data_created_without_key = false; }; -class BlockingAggLocalState final : public AggLocalState<AggDependency, BlockingAggLocalState> { +class AggSourceOperatorX : public OperatorX<AggLocalState> { public: - ENABLE_FACTORY_CREATOR(BlockingAggLocalState); - using Parent = AggSourceOperatorX; - - BlockingAggLocalState(RuntimeState* state, OperatorXBase* parent) - : AggLocalState<AggDependency, BlockingAggLocalState>(state, parent) {} - ~BlockingAggLocalState() = default; -}; - -class AggSourceOperatorX final : public OperatorX<BlockingAggLocalState> { -public: - using Base = OperatorX<BlockingAggLocalState>; + using Base = OperatorX<AggLocalState>; AggSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs); ~AggSourceOperatorX() = default; Dependency* wait_for_dependency(RuntimeState* state) override; @@ -133,7 +122,6 @@ public: bool is_source() const override { return true; } private: - template <typename DependencyType, typename Derived> friend class AggLocalState; bool _needs_finalize; diff --git a/be/src/pipeline/exec/data_queue.cpp b/be/src/pipeline/exec/data_queue.cpp index eff91bdd3f..e070fe2c99 100644 --- a/be/src/pipeline/exec/data_queue.cpp +++ b/be/src/pipeline/exec/data_queue.cpp @@ -30,7 +30,7 @@ namespace doris { namespace pipeline { -DataQueue::DataQueue(int child_count, StreamingAggDependency* agg_dependency, +DataQueue::DataQueue(int child_count, AggDependency* agg_dependency, UnionDependency* union_dependency) : _queue_blocks_lock(child_count), _queue_blocks(child_count), diff --git a/be/src/pipeline/exec/data_queue.h b/be/src/pipeline/exec/data_queue.h index e6105a3264..ab65bfeea6 100644 --- a/be/src/pipeline/exec/data_queue.h +++ b/be/src/pipeline/exec/data_queue.h @@ -30,13 +30,13 @@ namespace doris { namespace pipeline { -class StreamingAggDependency; +class AggDependency; class UnionDependency; class DataQueue { public: //always one is enough, but in union node it's has more children - DataQueue(int child_count = 1, StreamingAggDependency* agg_dependency = nullptr, + DataQueue(int child_count = 1, AggDependency* agg_dependency = nullptr, UnionDependency* union_dependency = nullptr); ~DataQueue() = default; @@ -88,7 +88,7 @@ private: int64_t _max_size_of_queue = 0; static constexpr int64_t MAX_BYTE_OF_QUEUE = 1024l * 1024 * 1024 / 10; - StreamingAggDependency* _agg_dependency = nullptr; + AggDependency* _agg_dependency = nullptr; UnionDependency* _union_dependency = nullptr; }; } // namespace pipeline diff --git a/be/src/pipeline/exec/scan_operator.cpp b/be/src/pipeline/exec/scan_operator.cpp index cb4675bcfb..544bc3bc67 100644 --- a/be/src/pipeline/exec/scan_operator.cpp +++ b/be/src/pipeline/exec/scan_operator.cpp @@ -151,8 +151,7 @@ Status ScanLocalState<Derived>::init(RuntimeState* state, LocalStateInfo& info) _open_timer = ADD_TIMER(_runtime_profile, "OpenTime"); _alloc_resource_timer = ADD_TIMER(_runtime_profile, "AllocateResourceTime"); - static const std::string timer_name = - "WaitForDependency[" + _source_dependency->name() + "]Time"; + static const std::string timer_name = "WaitForDependencyTime"; _wait_for_dependency_timer = ADD_TIMER(_runtime_profile, timer_name); _wait_for_data_timer = ADD_CHILD_TIMER(_runtime_profile, "WaitForData", timer_name); _wait_for_scanner_done_timer = @@ -164,6 +163,7 @@ Status ScanLocalState<Derived>::init(RuntimeState* state, LocalStateInfo& info) template <typename Derived> Status ScanLocalState<Derived>::open(RuntimeState* state) { SCOPED_TIMER(profile()->total_time_counter()); + SCOPED_TIMER(_open_timer); if (_open_dependency == nullptr) { return Status::OK(); } diff --git a/be/src/pipeline/exec/streaming_aggregation_sink_operator.h b/be/src/pipeline/exec/streaming_aggregation_sink_operator.h index 5b7f3073ff..5b9c635580 100644 --- a/be/src/pipeline/exec/streaming_aggregation_sink_operator.h +++ b/be/src/pipeline/exec/streaming_aggregation_sink_operator.h @@ -74,10 +74,10 @@ private: class StreamingAggSinkOperatorX; class StreamingAggSinkLocalState final - : public AggSinkLocalState<StreamingAggDependency, StreamingAggSinkLocalState> { + : public AggSinkLocalState<AggDependency, StreamingAggSinkLocalState> { public: using Parent = StreamingAggSinkOperatorX; - using Base = AggSinkLocalState<StreamingAggDependency, StreamingAggSinkLocalState>; + using Base = AggSinkLocalState<AggDependency, StreamingAggSinkLocalState>; ENABLE_FACTORY_CREATOR(StreamingAggSinkLocalState); StreamingAggSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state); diff --git a/be/src/pipeline/exec/streaming_aggregation_source_operator.cpp b/be/src/pipeline/exec/streaming_aggregation_source_operator.cpp index 7b1544232d..a0a866d0f7 100644 --- a/be/src/pipeline/exec/streaming_aggregation_source_operator.cpp +++ b/be/src/pipeline/exec/streaming_aggregation_source_operator.cpp @@ -75,31 +75,27 @@ OperatorPtr StreamingAggSourceOperatorBuilder::build_operator() { StreamingAggSourceOperatorX::StreamingAggSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) - : Base(pool, tnode, descs), - _needs_finalize(tnode.agg_node.need_finalize), - _without_key(tnode.agg_node.grouping_exprs.empty()) {} + : Base(pool, tnode, descs) {} Status StreamingAggSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* block, SourceState& source_state) { - auto& local_state = state->get_local_state(id())->cast<StreamingAggLocalState>(); + auto& local_state = state->get_local_state(id())->cast<AggLocalState>(); SCOPED_TIMER(local_state.profile()->total_time_counter()); if (!local_state._shared_state->data_queue->data_exhausted()) { std::unique_ptr<vectorized::Block> agg_block; DCHECK(local_state._dependency->read_blocked_by() == nullptr); RETURN_IF_ERROR(local_state._shared_state->data_queue->get_block_from_queue(&agg_block)); - if (!local_state._shared_state->data_queue->data_exhausted()) { + if (local_state._shared_state->data_queue->data_exhausted()) { + RETURN_IF_ERROR(Base::get_block(state, block, source_state)); + } else { block->swap(*agg_block); agg_block->clear_column_data(row_desc().num_materialized_slots()); local_state._shared_state->data_queue->push_free_block(std::move(agg_block)); - return Status::OK(); } + } else { + RETURN_IF_ERROR(Base::get_block(state, block, source_state)); } - RETURN_IF_ERROR(local_state._executor.get_result(state, block, source_state)); - local_state.make_nullable_output_key(block); - // dispose the having clause, should not be execute in prestreaming agg - RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_conjuncts, block, block->columns())); - local_state.reached_limit(block, source_state); return Status::OK(); } @@ -110,11 +106,5 @@ Status StreamingAggSourceOperatorX::init(const TPlanNode& tnode, RuntimeState* s return Status::OK(); } -Dependency* StreamingAggSourceOperatorX::wait_for_dependency(RuntimeState* state) { - return state->get_local_state(id()) - ->cast<StreamingAggLocalState>() - ._dependency->read_blocked_by(); -} - } // namespace pipeline } // namespace doris diff --git a/be/src/pipeline/exec/streaming_aggregation_source_operator.h b/be/src/pipeline/exec/streaming_aggregation_source_operator.h index 41f99164d3..10dc6cd026 100644 --- a/be/src/pipeline/exec/streaming_aggregation_source_operator.h +++ b/be/src/pipeline/exec/streaming_aggregation_source_operator.h @@ -60,43 +60,17 @@ private: std::shared_ptr<DataQueue> _data_queue; }; -class StreamingAggSourceOperatorX; - -class StreamingAggLocalState final - : public AggLocalState<StreamingAggDependency, StreamingAggLocalState> { +class StreamingAggSourceOperatorX final : public AggSourceOperatorX { public: - using Parent = StreamingAggSourceOperatorX; - ENABLE_FACTORY_CREATOR(StreamingAggLocalState); - StreamingAggLocalState(RuntimeState* state, OperatorXBase* parent) - : AggLocalState<StreamingAggDependency, StreamingAggLocalState>(state, parent) {} - ~StreamingAggLocalState() = default; -}; - -class StreamingAggSourceOperatorX final : public OperatorX<StreamingAggLocalState> { -public: - using Base = OperatorX<StreamingAggLocalState>; + using Base = AggSourceOperatorX; StreamingAggSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs); ~StreamingAggSourceOperatorX() = default; Status init(const TPlanNode& tnode, RuntimeState* state) override; - Dependency* wait_for_dependency(RuntimeState* state) override; - Status get_block(RuntimeState* state, vectorized::Block* block, SourceState& source_state) override; - - bool is_source() const override { return true; } - -private: - template <typename DependencyType, typename Derived> - friend class AggLocalState; - - bool _needs_finalize; - bool _without_key; - // left / full join will change the key nullable make output/input solt - // nullable diff. so we need make nullable of it. - std::vector<size_t> _make_nullable_keys; }; } // namespace pipeline diff --git a/be/src/pipeline/pipeline_x/dependency.h b/be/src/pipeline/pipeline_x/dependency.h index cb9c8a5185..e4caf1ebdf 100644 --- a/be/src/pipeline/pipeline_x/dependency.h +++ b/be/src/pipeline/pipeline_x/dependency.h @@ -254,6 +254,7 @@ public: size_t input_num_rows = 0; std::vector<vectorized::AggregateDataPtr> values; std::unique_ptr<vectorized::Arena> agg_profile_arena; + std::unique_ptr<DataQueue> data_queue; }; class AggDependency : public Dependency { @@ -261,6 +262,7 @@ public: using SharedState = AggSharedState; AggDependency(int id) : Dependency(id, "AggDependency") { _mem_tracker = std::make_unique<MemTracker>("AggregateOperator:"); + _agg_state.data_queue = std::make_unique<DataQueue>(1, this); } ~AggDependency() override = default; @@ -317,26 +319,6 @@ private: AggSharedState _agg_state; }; -struct StreamingAggSharedState final : public AggSharedState { -public: - StreamingAggSharedState() : AggSharedState() {} - ~StreamingAggSharedState() = default; - std::unique_ptr<DataQueue> data_queue; -}; - -class StreamingAggDependency final : public AggDependency { -public: - using SharedState = StreamingAggSharedState; - StreamingAggDependency(int id) : AggDependency(id) { - _streaming_agg_state.data_queue = std::make_unique<DataQueue>(1, this); - } - - void* shared_state() override { return (void*)&_streaming_agg_state; } - -private: - StreamingAggSharedState _streaming_agg_state; -}; - struct SortSharedState { public: std::unique_ptr<vectorized::Sorter> sorter; diff --git a/be/src/pipeline/pipeline_x/operator.cpp b/be/src/pipeline/pipeline_x/operator.cpp index b35d15cb7f..3452588bad 100644 --- a/be/src/pipeline/pipeline_x/operator.cpp +++ b/be/src/pipeline/pipeline_x/operator.cpp @@ -337,8 +337,7 @@ DECLARE_OPERATOR_X(HashJoinProbeLocalState) DECLARE_OPERATOR_X(OlapScanLocalState) DECLARE_OPERATOR_X(AnalyticLocalState) DECLARE_OPERATOR_X(SortLocalState) -DECLARE_OPERATOR_X(BlockingAggLocalState) -DECLARE_OPERATOR_X(StreamingAggLocalState) +DECLARE_OPERATOR_X(AggLocalState) DECLARE_OPERATOR_X(ExchangeLocalState) DECLARE_OPERATOR_X(RepeatLocalState) DECLARE_OPERATOR_X(NestedLoopJoinProbeLocalState) @@ -368,7 +367,6 @@ template class PipelineXLocalState<SortDependency>; template class PipelineXLocalState<NestedLoopJoinDependency>; template class PipelineXLocalState<AnalyticDependency>; template class PipelineXLocalState<AggDependency>; -template class PipelineXLocalState<StreamingAggDependency>; template class PipelineXLocalState<FakeDependency>; template class PipelineXLocalState<UnionDependency>; diff --git a/be/src/pipeline/pipeline_x/operator.h b/be/src/pipeline/pipeline_x/operator.h index d6adfa7458..13cf56c65d 100644 --- a/be/src/pipeline/pipeline_x/operator.h +++ b/be/src/pipeline/pipeline_x/operator.h @@ -313,7 +313,6 @@ public: for (size_t i = 0; i < _projections.size(); i++) { RETURN_IF_ERROR(_parent->_projections[i]->clone(state, _projections[i])); } - DCHECK(_runtime_profile.get() != nullptr); _rows_returned_counter = ADD_COUNTER(_runtime_profile, "RowsReturned", TUnit::UNIT); _rows_input_counter = ADD_COUNTER(_runtime_profile, "InputRows", TUnit::UNIT); _projection_timer = ADD_TIMER(_runtime_profile, "ProjectionTime"); diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q1.1.sql b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q1.1.sql index 3a9ce71313..c3d19b67a2 100644 --- a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q1.1.sql +++ b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q1.1.sql @@ -15,7 +15,7 @@ -- specific language governing permissions and limitations -- under the License. -SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, disable_streaming_preaggregations=true) */ SUM(LO_EXTENDEDPRICE * LO_DISCOUNT) AS revenue +SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ SUM(LO_EXTENDEDPRICE * LO_DISCOUNT) AS revenue FROM lineorder_flat WHERE LO_ORDERDATE >= 19930101 diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q1.2.sql b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q1.2.sql index a37876b067..6ab6ceea34 100644 --- a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q1.2.sql +++ b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q1.2.sql @@ -15,7 +15,7 @@ -- specific language governing permissions and limitations -- under the License. --Q1.2 -SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, disable_streaming_preaggregations=true) */ SUM(LO_EXTENDEDPRICE * LO_DISCOUNT) AS revenue +SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ SUM(LO_EXTENDEDPRICE * LO_DISCOUNT) AS revenue FROM lineorder_flat WHERE LO_ORDERDATE >= 19940101 diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q1.3.sql b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q1.3.sql index 5ba5ee025c..70796c2a95 100644 --- a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q1.3.sql +++ b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q1.3.sql @@ -15,7 +15,7 @@ -- specific language governing permissions and limitations -- under the License. --Q1.3 -SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, disable_streaming_preaggregations=true) */ SUM(LO_EXTENDEDPRICE * LO_DISCOUNT) AS revenue +SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ SUM(LO_EXTENDEDPRICE * LO_DISCOUNT) AS revenue FROM lineorder_flat WHERE weekofyear(LO_ORDERDATE) = 6 diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q2.1.sql b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q2.1.sql index f42f86e685..57f2ada296 100644 --- a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q2.1.sql +++ b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q2.1.sql @@ -15,7 +15,7 @@ -- specific language governing permissions and limitations -- under the License. --Q2.1 -SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, disable_streaming_preaggregations=true) */ +SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ SUM(LO_REVENUE), (LO_ORDERDATE DIV 10000) AS YEAR, P_BRAND FROM lineorder_flat diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q2.2.sql b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q2.2.sql index ae458ca5c3..9b7a5db502 100644 --- a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q2.2.sql +++ b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q2.2.sql @@ -15,7 +15,7 @@ -- specific language governing permissions and limitations -- under the License. --Q2.2 -SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, disable_streaming_preaggregations=true) */ +SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ SUM(LO_REVENUE), (LO_ORDERDATE DIV 10000) AS YEAR, P_BRAND FROM lineorder_flat diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q2.3.sql b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q2.3.sql index 70771a50f7..3a8a5e74d4 100644 --- a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q2.3.sql +++ b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q2.3.sql @@ -15,7 +15,7 @@ -- specific language governing permissions and limitations -- under the License. --Q2.3 -SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, disable_streaming_preaggregations=true) */ +SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ SUM(LO_REVENUE), (LO_ORDERDATE DIV 10000) AS YEAR, P_BRAND FROM lineorder_flat diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q3.1.sql b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q3.1.sql index e6c31d6315..6b3257f1f3 100644 --- a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q3.1.sql +++ b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q3.1.sql @@ -15,7 +15,7 @@ -- specific language governing permissions and limitations -- under the License. --Q3.1 -SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, disable_streaming_preaggregations=true) */ +SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ C_NATION, S_NATION, (LO_ORDERDATE DIV 10000) AS YEAR, SUM(LO_REVENUE) AS revenue diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q3.2.sql b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q3.2.sql index d9160639bd..fefe727da8 100644 --- a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q3.2.sql +++ b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q3.2.sql @@ -15,7 +15,7 @@ -- specific language governing permissions and limitations -- under the License. --Q3.2 -SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, disable_streaming_preaggregations=true) */ +SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ C_CITY, S_CITY, (LO_ORDERDATE DIV 10000) AS YEAR, SUM(LO_REVENUE) AS revenue diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q3.3.sql b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q3.3.sql index 1d6b34f9d5..c4560b701e 100644 --- a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q3.3.sql +++ b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q3.3.sql @@ -15,7 +15,7 @@ -- specific language governing permissions and limitations -- under the License. --Q3.3 -SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, disable_streaming_preaggregations=true) */ +SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ C_CITY, S_CITY, (LO_ORDERDATE DIV 10000) AS YEAR, SUM(LO_REVENUE) AS revenue diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q3.4.sql b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q3.4.sql index 2c1ce76817..4ae5d956e4 100644 --- a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q3.4.sql +++ b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q3.4.sql @@ -15,7 +15,7 @@ -- specific language governing permissions and limitations -- under the License. --Q3.4 -SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, disable_streaming_preaggregations=true) */ +SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ C_CITY, S_CITY, (LO_ORDERDATE DIV 10000) AS YEAR, SUM(LO_REVENUE) AS revenue diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q4.1.sql b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q4.1.sql index 33f491eece..87b29bf160 100644 --- a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q4.1.sql +++ b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q4.1.sql @@ -15,7 +15,7 @@ -- specific language governing permissions and limitations -- under the License. --Q4.1 -SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, disable_streaming_preaggregations=true) */ (LO_ORDERDATE DIV 10000) AS YEAR, +SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ (LO_ORDERDATE DIV 10000) AS YEAR, C_NATION, SUM(LO_REVENUE - LO_SUPPLYCOST) AS profit FROM lineorder_flat diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q4.2.sql b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q4.2.sql index a71d014add..8ea28f3f12 100644 --- a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q4.2.sql +++ b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q4.2.sql @@ -15,7 +15,7 @@ -- specific language governing permissions and limitations -- under the License. --Q4.2 -SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, disable_streaming_preaggregations=true) */ (LO_ORDERDATE DIV 10000) AS YEAR, +SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ (LO_ORDERDATE DIV 10000) AS YEAR, S_NATION, P_CATEGORY, SUM(LO_REVENUE - LO_SUPPLYCOST) AS profit diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q4.3.sql b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q4.3.sql index 3c247a188f..0f7c7401ab 100644 --- a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q4.3.sql +++ b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_flat_q4.3.sql @@ -15,7 +15,7 @@ -- specific language governing permissions and limitations -- under the License. --Q4.3 -SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, disable_streaming_preaggregations=true) */ (LO_ORDERDATE DIV 10000) AS YEAR, +SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ (LO_ORDERDATE DIV 10000) AS YEAR, S_CITY, P_BRAND, SUM(LO_REVENUE - LO_SUPPLYCOST) AS profit diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q1.1.sql b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q1.1.sql index 260b1ee03b..50b50bc368 100644 --- a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q1.1.sql +++ b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q1.1.sql @@ -15,7 +15,7 @@ -- specific language governing permissions and limitations -- under the License. -SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, disable_streaming_preaggregations=true) */ SUM(lo_extendedprice*lo_discount) AS +SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ SUM(lo_extendedprice*lo_discount) AS REVENUE FROM lineorder, date WHERE lo_orderdate = d_datekey diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q1.2.sql b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q1.2.sql index b8b54ccaad..77c0262016 100644 --- a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q1.2.sql +++ b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q1.2.sql @@ -15,7 +15,7 @@ -- specific language governing permissions and limitations -- under the License. -SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, disable_streaming_preaggregations=true) */ SUM(lo_extendedprice*lo_discount) AS +SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ SUM(lo_extendedprice*lo_discount) AS REVENUE FROM lineorder, date WHERE lo_orderdate = d_datekey diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q1.3.sql b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q1.3.sql index fec034dd9b..0052db0aac 100644 --- a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q1.3.sql +++ b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q1.3.sql @@ -15,7 +15,7 @@ -- specific language governing permissions and limitations -- under the License. -SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, disable_streaming_preaggregations=true) */ SUM(lo_extendedprice*lo_discount) AS +SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ SUM(lo_extendedprice*lo_discount) AS REVENUE FROM lineorder, date WHERE lo_orderdate = d_datekey diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q2.1.sql b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q2.1.sql index d71685a91a..a47ec82b51 100644 --- a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q2.1.sql +++ b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q2.1.sql @@ -15,7 +15,7 @@ -- specific language governing permissions and limitations -- under the License. -SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, disable_streaming_preaggregations=true) */ SUM(lo_revenue), d_year, p_brand +SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ SUM(lo_revenue), d_year, p_brand FROM lineorder, date, part, supplier WHERE lo_orderdate = d_datekey AND lo_partkey = p_partkey diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q2.2.sql b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q2.2.sql index c2a5f6dc15..9ab1a95d4d 100644 --- a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q2.2.sql +++ b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q2.2.sql @@ -15,7 +15,7 @@ -- specific language governing permissions and limitations -- under the License. -SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, disable_streaming_preaggregations=true) */ SUM(lo_revenue), d_year, p_brand +SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ SUM(lo_revenue), d_year, p_brand FROM lineorder, date, part, supplier WHERE lo_orderdate = d_datekey AND lo_partkey = p_partkey diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q2.3.sql b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q2.3.sql index 19331d9ee5..b7e6bd7840 100644 --- a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q2.3.sql +++ b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q2.3.sql @@ -15,7 +15,7 @@ -- specific language governing permissions and limitations -- under the License. -SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, disable_streaming_preaggregations=true) */ SUM(lo_revenue), d_year, p_brand +SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ SUM(lo_revenue), d_year, p_brand FROM lineorder, date, part, supplier WHERE lo_orderdate = d_datekey AND lo_partkey = p_partkey diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q3.1.sql b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q3.1.sql index a99bffe752..85c470b708 100644 --- a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q3.1.sql +++ b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q3.1.sql @@ -15,7 +15,7 @@ -- specific language governing permissions and limitations -- under the License. -SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, disable_streaming_preaggregations=true) */ c_nation, s_nation, d_year, +SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ c_nation, s_nation, d_year, SUM(lo_revenue) AS REVENUE FROM customer, lineorder, supplier, date WHERE lo_custkey = c_custkey diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q3.2.sql b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q3.2.sql index 65acd47881..cd0b320f87 100644 --- a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q3.2.sql +++ b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q3.2.sql @@ -15,7 +15,7 @@ -- specific language governing permissions and limitations -- under the License. -SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, disable_streaming_preaggregations=true) */ c_city, s_city, d_year, sum(lo_revenue) +SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ c_city, s_city, d_year, sum(lo_revenue) AS REVENUE FROM customer, lineorder, supplier, date WHERE lo_custkey = c_custkey diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q3.3.sql b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q3.3.sql index 18fe99b85a..89765c02d9 100644 --- a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q3.3.sql +++ b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q3.3.sql @@ -15,7 +15,7 @@ -- specific language governing permissions and limitations -- under the License. -SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, disable_streaming_preaggregations=true) */ c_city, s_city, d_year, SUM(lo_revenue) +SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ c_city, s_city, d_year, SUM(lo_revenue) AS REVENUE FROM customer, lineorder, supplier, date WHERE lo_custkey = c_custkey diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q3.4.sql b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q3.4.sql index f59aac3ee6..5cef87a3fe 100644 --- a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q3.4.sql +++ b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q3.4.sql @@ -15,7 +15,7 @@ -- specific language governing permissions and limitations -- under the License. -SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, disable_streaming_preaggregations=true) */ c_city, s_city, d_year, SUM(lo_revenue) +SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ c_city, s_city, d_year, SUM(lo_revenue) AS REVENUE FROM customer, lineorder, supplier, date WHERE lo_custkey = c_custkey diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q4.1.sql b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q4.1.sql index 029934b62a..3e0227c2ea 100644 --- a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q4.1.sql +++ b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q4.1.sql @@ -15,7 +15,7 @@ -- specific language governing permissions and limitations -- under the License. -SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, disable_streaming_preaggregations=true) */ d_year, c_nation, +SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ d_year, c_nation, SUM(lo_revenue - lo_supplycost) AS PROFIT FROM date, customer, supplier, part, lineorder WHERE lo_custkey = c_custkey diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q4.2.sql b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q4.2.sql index fc4bf8402e..1338e780ae 100644 --- a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q4.2.sql +++ b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q4.2.sql @@ -15,7 +15,7 @@ -- specific language governing permissions and limitations -- under the License. -SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, disable_streaming_preaggregations=true) */ d_year, s_nation, p_category, +SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ d_year, s_nation, p_category, SUM(lo_revenue - lo_supplycost) AS PROFIT FROM date, customer, supplier, part, lineorder WHERE lo_custkey = c_custkey diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q4.3.sql b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q4.3.sql index 8c7c315ebc..d8e6f7c42d 100644 --- a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q4.3.sql +++ b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q4.3.sql @@ -15,7 +15,7 @@ -- specific language governing permissions and limitations -- under the License. -SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, disable_streaming_preaggregations=true) */ d_year, s_city, p_brand, +SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ d_year, s_city, p_brand, SUM(lo_revenue - lo_supplycost) AS PROFIT FROM date, customer, supplier, part, lineorder WHERE lo_custkey = c_custkey diff --git a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q01.sql b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q01.sql index 76d38414cd..ded6754a97 100644 --- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q01.sql +++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q01.sql @@ -1,5 +1,5 @@ -- tables: lineitem -SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, disable_streaming_preaggregations=true) */ +SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ l_returnflag, l_linestatus, sum(l_quantity) AS sum_qty, diff --git a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q02.sql b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q02.sql index 25ec865314..f102f7504d 100644 --- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q02.sql +++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q02.sql @@ -1,5 +1,5 @@ -- tables: part,supplier,partsupp,nation,region -SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, disable_streaming_preaggregations=true) */ +SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ s_acctbal, s_name, n_name, diff --git a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q03.sql b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q03.sql index eb6831b7ca..8bd60f0e07 100644 --- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q03.sql +++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q03.sql @@ -1,5 +1,5 @@ -- tables: customer,orders,lineitem -SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, disable_streaming_preaggregations=true) */ +SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ l_orderkey, sum(l_extendedprice * (1 - l_discount)) AS revenue, o_orderdate, diff --git a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q04.sql b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q04.sql index e05c649b93..3f44094729 100644 --- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q04.sql +++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q04.sql @@ -1,5 +1,5 @@ -- tables: orders,lineitem -SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, disable_streaming_preaggregations=true) */ +SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ o_orderpriority, count(*) AS order_count FROM orders diff --git a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q05.sql b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q05.sql index b69b05e5e0..ed179f8b86 100644 --- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q05.sql +++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q05.sql @@ -1,5 +1,5 @@ -- tables: customer,orders,lineitem,supplier,nation,region -SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, disable_streaming_preaggregations=true) */ +SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ n_name, sum(l_extendedprice * (1 - l_discount)) AS revenue FROM diff --git a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q06.sql b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q06.sql index a9a080a45b..2dd86f8c2c 100644 --- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q06.sql +++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q06.sql @@ -1,6 +1,6 @@ -- tables: lineitem -SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, disable_streaming_preaggregations=true) */ sum(l_extendedprice * l_discount) AS revenue +SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ sum(l_extendedprice * l_discount) AS revenue FROM lineitem WHERE diff --git a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q07.sql b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q07.sql index 79efbfdae4..6453c1094a 100644 --- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q07.sql +++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q07.sql @@ -1,5 +1,5 @@ -- tables: supplier,lineitem,orders,customer,nation -SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, disable_streaming_preaggregations=true) */ +SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ supp_nation, cust_nation, l_year, diff --git a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q08.sql b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q08.sql index 1faced9805..e4c46fb084 100644 --- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q08.sql +++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q08.sql @@ -1,5 +1,5 @@ -- tables: part,supplier,lineitem,orders,customer,nation,region -SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, disable_streaming_preaggregations=true) */ +SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ o_year, sum(CASE WHEN nation = 'BRAZIL' diff --git a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q09.sql b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q09.sql index a47a3b5e9c..cee9925fb5 100644 --- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q09.sql +++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q09.sql @@ -1,5 +1,5 @@ -- tables: part,supplier,lineitem,partsupp,orders,nation -SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, disable_streaming_preaggregations=true) */ +SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ nation, o_year, sum(amount) AS sum_profit diff --git a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q10.sql b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q10.sql index 3d49252d3e..c95a80fcee 100644 --- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q10.sql +++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q10.sql @@ -1,5 +1,5 @@ -- tables: customer,orders,lineitem,nation -SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, disable_streaming_preaggregations=true) */ +SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ c_custkey, c_name, sum(l_extendedprice * (1 - l_discount)) AS revenue, diff --git a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q11.sql b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q11.sql index 66140c9431..b23701e940 100644 --- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q11.sql +++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q11.sql @@ -1,5 +1,5 @@ -- tables: partsupp,supplier,nation -SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, disable_streaming_preaggregations=true) */ +SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ ps_partkey, sum(ps_supplycost * ps_availqty) AS value FROM diff --git a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q12.sql b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q12.sql index 99c9c0f574..e8893e71e4 100644 --- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q12.sql +++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q12.sql @@ -1,5 +1,5 @@ -- tables: orders,lineitem -SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, disable_streaming_preaggregations=true) */ +SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ l_shipmode, sum(CASE WHEN o_orderpriority = '1-URGENT' diff --git a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q13.sql b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q13.sql index 21a03f752c..9db2da60ee 100644 --- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q13.sql +++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q13.sql @@ -1,5 +1,5 @@ -- tables: customer -SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, disable_streaming_preaggregations=true) */ +SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ c_count, count(*) AS custdist FROM ( diff --git a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q14.sql b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q14.sql index b0e79f8da5..70d7a57d07 100644 --- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q14.sql +++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q14.sql @@ -1,5 +1,5 @@ -- tables: lineitem,part -SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, disable_streaming_preaggregations=true) */ 100.00 * sum(CASE +SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ 100.00 * sum(CASE WHEN p_type LIKE 'PROMO%' THEN l_extendedprice * (1 - l_discount) ELSE 0 diff --git a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q15.sql b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q15.sql index 03c0b97372..45f75ff985 100644 --- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q15.sql +++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q15.sql @@ -1,4 +1,4 @@ -SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, disable_streaming_preaggregations=true) */ +SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ s_suppkey, s_name, s_address, diff --git a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q16.sql b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q16.sql index 7df971f830..37a438c796 100644 --- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q16.sql +++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q16.sql @@ -1,5 +1,5 @@ -- tables: partsupp,part,supplier -SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, disable_streaming_preaggregations=true) */ +SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ p_brand, p_type, p_size, diff --git a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q17.sql b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q17.sql index 3f9203cea0..62f39a750c 100644 --- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q17.sql +++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q17.sql @@ -1,5 +1,5 @@ -- tables: lineitem,part -SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, disable_streaming_preaggregations=true) */ sum(l_extendedprice) / 7.0 AS avg_yearly +SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ sum(l_extendedprice) / 7.0 AS avg_yearly FROM lineitem, part diff --git a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q18.sql b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q18.sql index 971d457672..2eb2505c01 100644 --- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q18.sql +++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q18.sql @@ -1,5 +1,5 @@ -- tables: customer,orders,lineitem -SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, disable_streaming_preaggregations=true) */ +SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ c_name, c_custkey, o_orderkey, diff --git a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q19.sql b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q19.sql index 8cc1b890c9..16e543f87c 100644 --- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q19.sql +++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q19.sql @@ -1,5 +1,5 @@ -- tables: lineitem,part -SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, disable_streaming_preaggregations=true) */ sum(l_extendedprice * (1 - l_discount)) AS revenue +SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ sum(l_extendedprice * (1 - l_discount)) AS revenue FROM lineitem, part diff --git a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q20.sql b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q20.sql index f54d44ed8b..a2aca56790 100644 --- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q20.sql +++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q20.sql @@ -1,5 +1,5 @@ -- tables: supplier,nation,partsupp,lineitem,part -SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, disable_streaming_preaggregations=true) */ +SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ s_name, s_address FROM diff --git a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q21.sql b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q21.sql index 56d81211db..7b4874f96c 100644 --- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q21.sql +++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q21.sql @@ -1,5 +1,5 @@ -- tables: supplier,lineitem,orders,nation -SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, disable_streaming_preaggregations=true) */ +SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ s_name, count(*) AS numwait FROM diff --git a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q22.sql b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q22.sql index c3f199f124..bf784175e0 100644 --- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q22.sql +++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q22.sql @@ -1,5 +1,5 @@ -- tables: orders,customer -SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, disable_streaming_preaggregations=true) */ +SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ cntrycode, count(*) AS numcust, sum(c_acctbal) AS totacctbal --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org