This is an automated email from the ASF dual-hosted git repository. lihaopeng pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 5734bbcb287 [cherry-pick](branch-30) execute expr should use local states instead of operators(#40189) (#41324) 5734bbcb287 is described below commit 5734bbcb287a4d16cd0a7db545c406d966c714fb Author: zhangstar333 <87313068+zhangstar...@users.noreply.github.com> AuthorDate: Fri Sep 27 19:36:45 2024 +0800 [cherry-pick](branch-30) execute expr should use local states instead of operators(#40189) (#41324) The expr of operator cannot be executed concurrently, should use local state's expr. cherry-pick from master https://github.com/apache/doris/pull/40189 --- be/src/pipeline/exec/aggregation_source_operator.cpp | 3 ++- be/src/pipeline/exec/assert_num_rows_operator.cpp | 3 ++- be/src/pipeline/exec/assert_num_rows_operator.h | 3 +++ .../exec/distinct_streaming_aggregation_operator.cpp | 4 ++-- be/src/pipeline/exec/multi_cast_data_stream_source.h | 7 ++++--- be/src/pipeline/exec/operator.h | 12 +++++++----- be/src/pipeline/exec/repeat_operator.cpp | 2 +- be/src/pipeline/exec/streaming_aggregation_operator.cpp | 4 ++-- regression-test/data/javaudf_p0/test_javaudf_string.out | 3 +++ .../suites/javaudf_p0/test_javaudf_string.groovy | 13 +++++++++++++ 10 files changed, 39 insertions(+), 15 deletions(-) diff --git a/be/src/pipeline/exec/aggregation_source_operator.cpp b/be/src/pipeline/exec/aggregation_source_operator.cpp index 3264ad56f3c..a5f40a431c5 100644 --- a/be/src/pipeline/exec/aggregation_source_operator.cpp +++ b/be/src/pipeline/exec/aggregation_source_operator.cpp @@ -443,7 +443,8 @@ Status AggSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* blo RETURN_IF_ERROR(local_state._executor.get_result(state, block, eos)); local_state.make_nullable_output_key(block); // dispose the having clause, should not be execute in prestreaming agg - RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_conjuncts, block, block->columns())); + RETURN_IF_ERROR(vectorized::VExprContext::filter_block(local_state._conjuncts, block, + block->columns())); local_state.do_agg_limit(block, eos); return Status::OK(); } diff --git a/be/src/pipeline/exec/assert_num_rows_operator.cpp b/be/src/pipeline/exec/assert_num_rows_operator.cpp index 4a51002beff..5aa27b51c45 100644 --- a/be/src/pipeline/exec/assert_num_rows_operator.cpp +++ b/be/src/pipeline/exec/assert_num_rows_operator.cpp @@ -116,7 +116,8 @@ Status AssertNumRowsOperatorX::pull(doris::RuntimeState* state, vectorized::Bloc } COUNTER_SET(local_state.rows_returned_counter(), local_state.num_rows_returned()); COUNTER_UPDATE(local_state.blocks_returned_counter(), 1); - RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_conjuncts, block, block->columns())); + RETURN_IF_ERROR(vectorized::VExprContext::filter_block(local_state._conjuncts, block, + block->columns())); return Status::OK(); } diff --git a/be/src/pipeline/exec/assert_num_rows_operator.h b/be/src/pipeline/exec/assert_num_rows_operator.h index 423bd69144e..dcc64f57878 100644 --- a/be/src/pipeline/exec/assert_num_rows_operator.h +++ b/be/src/pipeline/exec/assert_num_rows_operator.h @@ -28,6 +28,9 @@ public: AssertNumRowsLocalState(RuntimeState* state, OperatorXBase* parent) : PipelineXLocalState<FakeSharedState>(state, parent) {} ~AssertNumRowsLocalState() = default; + +private: + friend class AssertNumRowsOperatorX; }; class AssertNumRowsOperatorX final : public StreamingOperatorX<AssertNumRowsLocalState> { diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp index e8efb51973e..ab71b52ae01 100644 --- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp +++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp @@ -462,8 +462,8 @@ Status DistinctStreamingAggOperatorX::pull(RuntimeState* state, vectorized::Bloc local_state._make_nullable_output_key(block); if (!_is_streaming_preagg) { // dispose the having clause, should not be execute in prestreaming agg - RETURN_IF_ERROR( - vectorized::VExprContext::filter_block(_conjuncts, block, block->columns())); + RETURN_IF_ERROR(vectorized::VExprContext::filter_block(local_state._conjuncts, block, + block->columns())); } local_state.add_num_rows_returned(block->rows()); COUNTER_UPDATE(local_state.blocks_returned_counter(), 1); diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.h b/be/src/pipeline/exec/multi_cast_data_stream_source.h index 8ecbd23764d..c71310e3ee3 100644 --- a/be/src/pipeline/exec/multi_cast_data_stream_source.h +++ b/be/src/pipeline/exec/multi_cast_data_stream_source.h @@ -62,6 +62,7 @@ public: } private: + friend class MultiCastDataStreamerSourceOperatorX; vectorized::VExprContextSPtrs _output_expr_contexts; std::vector<std::shared_ptr<RuntimeFilterDependency>> _filter_dependencies; @@ -95,8 +96,8 @@ public: if (_t_data_stream_sink.__isset.conjuncts) { RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(_t_data_stream_sink.conjuncts, - _conjuncts)); - RETURN_IF_ERROR(vectorized::VExpr::prepare(_conjuncts, state, _row_desc())); + conjuncts())); + RETURN_IF_ERROR(vectorized::VExpr::prepare(conjuncts(), state, _row_desc())); } return Status::OK(); } @@ -107,7 +108,7 @@ public: RETURN_IF_ERROR(vectorized::VExpr::open(_output_expr_contexts, state)); } if (_t_data_stream_sink.__isset.conjuncts) { - RETURN_IF_ERROR(vectorized::VExpr::open(_conjuncts, state)); + RETURN_IF_ERROR(vectorized::VExpr::open(conjuncts(), state)); } return Status::OK(); } diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h index abed7fb446a..bde2291ec3a 100644 --- a/be/src/pipeline/exec/operator.h +++ b/be/src/pipeline/exec/operator.h @@ -760,16 +760,18 @@ protected: ObjectPool* _pool = nullptr; std::vector<TupleId> _tuple_ids; +private: + // The expr of operator set to private permissions, as cannot be executed concurrently, + // should use local state's expr. vectorized::VExprContextSPtrs _conjuncts; + vectorized::VExprContextSPtrs _projections; + // Used in common subexpression elimination to compute intermediate results. + std::vector<vectorized::VExprContextSPtrs> _intermediate_projections; +protected: RowDescriptor _row_descriptor; - std::unique_ptr<RowDescriptor> _output_row_descriptor = nullptr; - vectorized::VExprContextSPtrs _projections; - std::vector<RowDescriptor> _intermediate_output_row_descriptor; - // Used in common subexpression elimination to compute intermediate results. - std::vector<vectorized::VExprContextSPtrs> _intermediate_projections; /// Resource information sent from the frontend. const TBackendResourceProfile _resource_profile; diff --git a/be/src/pipeline/exec/repeat_operator.cpp b/be/src/pipeline/exec/repeat_operator.cpp index 48cc427d85b..fe45e85c52d 100644 --- a/be/src/pipeline/exec/repeat_operator.cpp +++ b/be/src/pipeline/exec/repeat_operator.cpp @@ -234,7 +234,7 @@ Status RepeatOperatorX::pull(doris::RuntimeState* state, vectorized::Block* outp } _child_block.clear_column_data(_child_x->row_desc().num_materialized_slots()); } - RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_conjuncts, output_block, + RETURN_IF_ERROR(vectorized::VExprContext::filter_block(local_state._conjuncts, output_block, output_block->columns())); *eos = _child_eos && _child_block.rows() == 0; local_state.reached_limit(output_block, eos); diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.cpp b/be/src/pipeline/exec/streaming_aggregation_operator.cpp index 689a361c371..8aa1eb8df97 100644 --- a/be/src/pipeline/exec/streaming_aggregation_operator.cpp +++ b/be/src/pipeline/exec/streaming_aggregation_operator.cpp @@ -1285,8 +1285,8 @@ Status StreamingAggOperatorX::pull(RuntimeState* state, vectorized::Block* block RETURN_IF_ERROR(local_state._executor->get_result(&local_state, state, block, eos)); local_state.make_nullable_output_key(block); // dispose the having clause, should not be execute in prestreaming agg - RETURN_IF_ERROR( - vectorized::VExprContext::filter_block(_conjuncts, block, block->columns())); + RETURN_IF_ERROR(vectorized::VExprContext::filter_block(local_state._conjuncts, block, + block->columns())); } local_state.reached_limit(block, eos); diff --git a/regression-test/data/javaudf_p0/test_javaudf_string.out b/regression-test/data/javaudf_p0/test_javaudf_string.out index 59f2f7c776d..b42a368b028 100644 --- a/regression-test/data/javaudf_p0/test_javaudf_string.out +++ b/regression-test/data/javaudf_p0/test_javaudf_string.out @@ -65,3 +65,6 @@ ab***fg7 ab***fg7 ab***fg8 ab***fg8 ab***fg9 ab***fg9 +-- !select_5 -- +0 + diff --git a/regression-test/suites/javaudf_p0/test_javaudf_string.groovy b/regression-test/suites/javaudf_p0/test_javaudf_string.groovy index 6517c4b08c2..e6484a1fde1 100644 --- a/regression-test/suites/javaudf_p0/test_javaudf_string.groovy +++ b/regression-test/suites/javaudf_p0/test_javaudf_string.groovy @@ -86,9 +86,22 @@ suite("test_javaudf_string") { test_javaudf_string JOIN test_javaudf_string_2 ON test_javaudf_string.user_id = test_javaudf_string_2.user_id order by 1,2; """ + sql """DROP TABLE IF EXISTS tbl1""" + sql """create table tbl1(k1 int, k2 string) distributed by hash(k1) buckets 1 properties("replication_num" = "1");""" + sql """ insert into tbl1 values(1, "5");""" + Integer count = 0; + Integer maxCount = 20; + while (count < maxCount) { + sql """ insert into tbl1 select * from tbl1;""" + count++ + sleep(100); + } + sql """ insert into tbl1 select random()%10000 * 10000, "5" from tbl1;""" + qt_select_5 """ select count(0) from (select k1, max(k2) as k2 from tbl1 group by k1)v where java_udf_string_test(k2, 0, 1) = "asd" """; } finally { try_sql("DROP FUNCTION IF EXISTS java_udf_string_test(string, int, int);") try_sql("DROP TABLE IF EXISTS ${tableName}") + try_sql("DROP TABLE IF EXISTS tbl1") try_sql("DROP TABLE IF EXISTS test_javaudf_string_2") } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org