This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new b104e933cd4 [Bug](expr) execute expr should use local states instead of operators (#40189) (#40219) b104e933cd4 is described below commit b104e933cd46a742f7ccc380c1470735c7e5f2ed Author: zhangstar333 <87313068+zhangstar...@users.noreply.github.com> AuthorDate: Sun Sep 1 00:41:10 2024 +0800 [Bug](expr) execute expr should use local states instead of operators (#40189) (#40219) ## Proposed changes cherry-pick from master #40189 <!--Describe your changes.--> --- be/src/pipeline/exec/aggregation_source_operator.cpp | 3 ++- be/src/pipeline/exec/assert_num_rows_operator.cpp | 3 ++- be/src/pipeline/exec/assert_num_rows_operator.h | 3 +++ .../exec/distinct_streaming_aggregation_operator.cpp | 4 ++-- be/src/pipeline/exec/multi_cast_data_stream_source.h | 7 ++++--- be/src/pipeline/exec/repeat_operator.cpp | 2 +- be/src/pipeline/exec/streaming_aggregation_operator.cpp | 4 ++-- regression-test/data/javaudf_p0/test_javaudf_string.out | 3 +++ .../suites/javaudf_p0/test_javaudf_string.groovy | 13 +++++++++++++ 9 files changed, 32 insertions(+), 10 deletions(-) diff --git a/be/src/pipeline/exec/aggregation_source_operator.cpp b/be/src/pipeline/exec/aggregation_source_operator.cpp index fadddee9034..8f19c24589b 100644 --- a/be/src/pipeline/exec/aggregation_source_operator.cpp +++ b/be/src/pipeline/exec/aggregation_source_operator.cpp @@ -435,7 +435,8 @@ Status AggSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* blo RETURN_IF_ERROR(local_state._executor.get_result(state, block, eos)); local_state.make_nullable_output_key(block); // dispose the having clause, should not be execute in prestreaming agg - RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_conjuncts, block, block->columns())); + RETURN_IF_ERROR(vectorized::VExprContext::filter_block(local_state._conjuncts, block, + block->columns())); local_state.reached_limit(block, eos); return Status::OK(); } diff --git a/be/src/pipeline/exec/assert_num_rows_operator.cpp b/be/src/pipeline/exec/assert_num_rows_operator.cpp index ef0efd3f86b..8f86e3ecb87 100644 --- a/be/src/pipeline/exec/assert_num_rows_operator.cpp +++ b/be/src/pipeline/exec/assert_num_rows_operator.cpp @@ -120,7 +120,8 @@ Status AssertNumRowsOperatorX::pull(doris::RuntimeState* state, vectorized::Bloc } COUNTER_SET(local_state.rows_returned_counter(), local_state.num_rows_returned()); COUNTER_UPDATE(local_state.blocks_returned_counter(), 1); - RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_conjuncts, block, block->columns())); + RETURN_IF_ERROR(vectorized::VExprContext::filter_block(local_state._conjuncts, block, + block->columns())); return Status::OK(); } diff --git a/be/src/pipeline/exec/assert_num_rows_operator.h b/be/src/pipeline/exec/assert_num_rows_operator.h index 4d6d835f815..3874c1fc771 100644 --- a/be/src/pipeline/exec/assert_num_rows_operator.h +++ b/be/src/pipeline/exec/assert_num_rows_operator.h @@ -46,6 +46,9 @@ public: AssertNumRowsLocalState(RuntimeState* state, OperatorXBase* parent) : PipelineXLocalState<FakeSharedState>(state, parent) {} ~AssertNumRowsLocalState() = default; + +private: + friend class AssertNumRowsOperatorX; }; class AssertNumRowsOperatorX final : public StreamingOperatorX<AssertNumRowsLocalState> { diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp index 16c0df07b49..1635d927b6d 100644 --- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp +++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp @@ -515,8 +515,8 @@ Status DistinctStreamingAggOperatorX::pull(RuntimeState* state, vectorized::Bloc local_state._make_nullable_output_key(block); if (!_is_streaming_preagg) { // dispose the having clause, should not be execute in prestreaming agg - RETURN_IF_ERROR( - vectorized::VExprContext::filter_block(_conjuncts, block, block->columns())); + RETURN_IF_ERROR(vectorized::VExprContext::filter_block(local_state._conjuncts, block, + block->columns())); } local_state.add_num_rows_returned(block->rows()); COUNTER_UPDATE(local_state.blocks_returned_counter(), 1); diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.h b/be/src/pipeline/exec/multi_cast_data_stream_source.h index 761e899c3d1..aab5cb96dbe 100644 --- a/be/src/pipeline/exec/multi_cast_data_stream_source.h +++ b/be/src/pipeline/exec/multi_cast_data_stream_source.h @@ -117,6 +117,7 @@ public: } private: + friend class MultiCastDataStreamerSourceOperatorX; vectorized::VExprContextSPtrs _output_expr_contexts; std::vector<std::shared_ptr<RuntimeFilterDependency>> _filter_dependencies; @@ -151,8 +152,8 @@ public: if (_t_data_stream_sink.__isset.conjuncts) { RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(_t_data_stream_sink.conjuncts, - _conjuncts)); - RETURN_IF_ERROR(vectorized::VExpr::prepare(_conjuncts, state, _row_desc())); + conjuncts())); + RETURN_IF_ERROR(vectorized::VExpr::prepare(conjuncts(), state, _row_desc())); } return Status::OK(); } @@ -163,7 +164,7 @@ public: RETURN_IF_ERROR(vectorized::VExpr::open(_output_expr_contexts, state)); } if (_t_data_stream_sink.__isset.conjuncts) { - RETURN_IF_ERROR(vectorized::VExpr::open(_conjuncts, state)); + RETURN_IF_ERROR(vectorized::VExpr::open(conjuncts(), state)); } return Status::OK(); } diff --git a/be/src/pipeline/exec/repeat_operator.cpp b/be/src/pipeline/exec/repeat_operator.cpp index 65eccc3fd4e..991754ba122 100644 --- a/be/src/pipeline/exec/repeat_operator.cpp +++ b/be/src/pipeline/exec/repeat_operator.cpp @@ -247,7 +247,7 @@ Status RepeatOperatorX::pull(doris::RuntimeState* state, vectorized::Block* outp } _child_block.clear_column_data(_child_x->row_desc().num_materialized_slots()); } - RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_conjuncts, output_block, + RETURN_IF_ERROR(vectorized::VExprContext::filter_block(local_state._conjuncts, output_block, output_block->columns())); *eos = _child_eos && _child_block.rows() == 0; local_state.reached_limit(output_block, eos); diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.cpp b/be/src/pipeline/exec/streaming_aggregation_operator.cpp index d7589f59f9f..cfb63aae9a5 100644 --- a/be/src/pipeline/exec/streaming_aggregation_operator.cpp +++ b/be/src/pipeline/exec/streaming_aggregation_operator.cpp @@ -1299,8 +1299,8 @@ Status StreamingAggOperatorX::pull(RuntimeState* state, vectorized::Block* block RETURN_IF_ERROR(local_state._executor->get_result(&local_state, state, block, eos)); local_state.make_nullable_output_key(block); // dispose the having clause, should not be execute in prestreaming agg - RETURN_IF_ERROR( - vectorized::VExprContext::filter_block(_conjuncts, block, block->columns())); + RETURN_IF_ERROR(vectorized::VExprContext::filter_block(local_state._conjuncts, block, + block->columns())); } local_state.reached_limit(block, eos); diff --git a/regression-test/data/javaudf_p0/test_javaudf_string.out b/regression-test/data/javaudf_p0/test_javaudf_string.out index 59f2f7c776d..b42a368b028 100644 --- a/regression-test/data/javaudf_p0/test_javaudf_string.out +++ b/regression-test/data/javaudf_p0/test_javaudf_string.out @@ -65,3 +65,6 @@ ab***fg7 ab***fg7 ab***fg8 ab***fg8 ab***fg9 ab***fg9 +-- !select_5 -- +0 + diff --git a/regression-test/suites/javaudf_p0/test_javaudf_string.groovy b/regression-test/suites/javaudf_p0/test_javaudf_string.groovy index 6517c4b08c2..e6484a1fde1 100644 --- a/regression-test/suites/javaudf_p0/test_javaudf_string.groovy +++ b/regression-test/suites/javaudf_p0/test_javaudf_string.groovy @@ -86,9 +86,22 @@ suite("test_javaudf_string") { test_javaudf_string JOIN test_javaudf_string_2 ON test_javaudf_string.user_id = test_javaudf_string_2.user_id order by 1,2; """ + sql """DROP TABLE IF EXISTS tbl1""" + sql """create table tbl1(k1 int, k2 string) distributed by hash(k1) buckets 1 properties("replication_num" = "1");""" + sql """ insert into tbl1 values(1, "5");""" + Integer count = 0; + Integer maxCount = 20; + while (count < maxCount) { + sql """ insert into tbl1 select * from tbl1;""" + count++ + sleep(100); + } + sql """ insert into tbl1 select random()%10000 * 10000, "5" from tbl1;""" + qt_select_5 """ select count(0) from (select k1, max(k2) as k2 from tbl1 group by k1)v where java_udf_string_test(k2, 0, 1) = "asd" """; } finally { try_sql("DROP FUNCTION IF EXISTS java_udf_string_test(string, int, int);") try_sql("DROP TABLE IF EXISTS ${tableName}") + try_sql("DROP TABLE IF EXISTS tbl1") try_sql("DROP TABLE IF EXISTS test_javaudf_string_2") } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org